169 lines
6.5 KiB
Python
169 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
import logging
|
||
|
||
from selenium import webdriver
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.common.exceptions import TimeoutException
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
|
||
# ───────────────────────────────────────────────────────────────────────────────
|
||
# Konstanten
|
||
LOGIN_URL = "https://app.dealfront.com/login"
|
||
TARGET_SEARCH_NAME = "Facility Management"
|
||
CREDENTIALS_FILE = "dealfront_credentials.json"
|
||
OUTPUT_DIR = "output"
|
||
CHROMEDRIVER_PATH = "/usr/bin/chromedriver"
|
||
LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s"
|
||
# ───────────────────────────────────────────────────────────────────────────────
|
||
|
||
# Logging konfigurieren
|
||
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
|
||
logger = logging.getLogger(__name__)
|
||
logging.getLogger("selenium").setLevel(logging.WARNING)
|
||
|
||
def load_credentials(path):
|
||
if not os.path.isfile(path):
|
||
logger.error(f"Credentials-Datei nicht gefunden: {path}")
|
||
sys.exit(1)
|
||
with open(path, encoding="utf-8") as f:
|
||
creds = json.load(f)
|
||
user = creds.get("username")
|
||
pwd = creds.get("password")
|
||
if not user or not pwd:
|
||
logger.error("Credentials-Datei enthält keinen username/password.")
|
||
sys.exit(1)
|
||
return user, pwd
|
||
|
||
class DealfrontScraper:
|
||
def __init__(self, driver, wait, username, password):
|
||
self.driver = driver
|
||
self.wait = wait
|
||
self.username = username
|
||
self.password = password
|
||
|
||
def login_and_find_list(self):
|
||
# 1) Login-Seite öffnen
|
||
self.driver.get(LOGIN_URL)
|
||
|
||
# 2) Credentials eintragen & absenden
|
||
self.wait.until(EC.visibility_of_element_located(
|
||
(By.CSS_SELECTOR, "input[type='email'], input[type='text']")
|
||
))
|
||
self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']").send_keys(self.username)
|
||
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
|
||
self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
|
||
|
||
# 3) Quick-Link "Prospects finden" anklicken (fällt zurück auf href-Suche, falls Link-Text fehlt)
|
||
try:
|
||
btn = self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, "Prospects finden")))
|
||
except TimeoutException:
|
||
btn = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "a[href*='prospects']")))
|
||
self.driver.execute_script("arguments[0].click();", btn)
|
||
|
||
# 4) Gewünschte vordefinierte Suche anklicken
|
||
try:
|
||
btn2 = self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, TARGET_SEARCH_NAME)))
|
||
except TimeoutException:
|
||
xpath = f"//a[contains(normalize-space(.), '{TARGET_SEARCH_NAME}')]"
|
||
btn2 = self.wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
|
||
self.driver.execute_script("arguments[0].click();", btn2)
|
||
|
||
# 5) Auf erstes Daten-Element warten
|
||
first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
||
self.wait.until(EC.visibility_of_element_located(first))
|
||
time.sleep(1)
|
||
|
||
def extract_current_page_results(self):
|
||
# Implicit-Wait kurz absenken
|
||
self.driver.implicitly_wait(1)
|
||
|
||
# Auf mindestens eine Tabellenzeile warten, dann alle extrahieren
|
||
rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
|
||
self.wait.until(EC.presence_of_all_elements_located(rows_sel))
|
||
rows = self.driver.find_elements(*rows_sel)
|
||
|
||
results = []
|
||
for row in rows:
|
||
# Name
|
||
ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
|
||
if not ne:
|
||
continue
|
||
name = (ne[0].get_attribute("title") or ne[0].text).strip()
|
||
|
||
# Website aus 3. Spalte
|
||
we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
|
||
if we:
|
||
site = we[0].text.strip()
|
||
else:
|
||
td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
|
||
site = td3[0].text.strip() if td3 else ""
|
||
|
||
results.append({"name": name, "website": site})
|
||
|
||
# Implicit-Wait wieder auf Standard (10 s) setzen
|
||
self.driver.implicitly_wait(10)
|
||
return results
|
||
|
||
def click_next_page(self):
|
||
# Paginator-Buttons greifen
|
||
btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
|
||
if not btns:
|
||
return False
|
||
nxt = btns[-1]
|
||
if not nxt.is_enabled() or "disabled" in nxt.get_attribute("class"):
|
||
return False
|
||
|
||
current = self.driver.find_element(
|
||
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
||
).text
|
||
nxt.click()
|
||
self.wait.until(lambda d: d.find_element(
|
||
By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
|
||
).text != current)
|
||
return True
|
||
|
||
def run(self):
|
||
self.login_and_find_list()
|
||
all_results = []
|
||
while True:
|
||
all_results.extend(self.extract_current_page_results())
|
||
if not self.click_next_page():
|
||
break
|
||
return all_results
|
||
|
||
def main():
|
||
username, password = load_credentials(CREDENTIALS_FILE)
|
||
|
||
# WebDriver initialisieren
|
||
opts = Options()
|
||
opts.add_argument("--headless")
|
||
opts.add_argument("--no-sandbox")
|
||
opts.add_argument("--disable-dev-shm-usage")
|
||
service = Service(CHROMEDRIVER_PATH)
|
||
driver = webdriver.Chrome(service=service, options=opts)
|
||
wait = WebDriverWait(driver, 30)
|
||
|
||
try:
|
||
scraper = DealfrontScraper(driver, wait, username, password)
|
||
results = scraper.run()
|
||
finally:
|
||
driver.quit()
|
||
|
||
# Ergebnisse speichern
|
||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||
out = os.path.join(OUTPUT_DIR, "results.json")
|
||
with open(out, "w", encoding="utf-8") as f:
|
||
json.dump(results, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"✅ Fertig: {len(results)} Einträge in '{out}'")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|