diff --git a/dealfront_enrichment.py b/dealfront_enrichment.py index d5f822a3..c5b6689b 100644 --- a/dealfront_enrichment.py +++ b/dealfront_enrichment.py @@ -1,32 +1,58 @@ +```python #!/usr/bin/env python3 import os +import sys import json import time -import sys +import logging + from selenium import webdriver +from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC +# ─────────────────────────────────────────────────────────────────────────────── +# Konstanten +LOGIN_URL = "https://app.dealfront.com/login" +TARGET_SEARCH_NAME = "Facility Management" +CREDENTIALS_FILE = "dealfront_credentials.json" +OUTPUT_DIR = "output" +CHROMEDRIVER_PATH = "/usr/bin/chromedriver" +LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s" +# ─────────────────────────────────────────────────────────────────────────────── + +# Logging konfigurieren +logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True) +logger = logging.getLogger(__name__) +logging.getLogger("selenium").setLevel(logging.WARNING) + def load_credentials(path): - try: - with open(path, encoding='utf-8') as f: - return json.load(f) - except Exception as e: - print(f"Fehler beim Laden der Credentials: {e}", file=sys.stderr) + if not os.path.isfile(path): + logger.error(f"Credentials-Datei nicht gefunden: {path}") sys.exit(1) + with open(path, encoding="utf-8") as f: + creds = json.load(f) + user = creds.get("username") + pwd = creds.get("password") + if not user or not pwd: + logger.error("Credentials-Datei enthält keinen username/password.") + sys.exit(1) + return user, pwd class DealfrontScraper: def __init__(self, driver, wait, username, password): - self.driver = driver - self.wait = wait + self.driver = driver + self.wait = wait self.username = username self.password = password def login_and_find_list(self, search_name): - # 1) Login-Seite öffnen & Credentials absenden + # Login-Seite öffnen self.driver.get(LOGIN_URL) + + # E-Mail/Username und Passwort eintragen self.wait.until(EC.visibility_of_element_located( (By.CSS_SELECTOR, "input[type='email'], input[type='text']") )) @@ -36,84 +62,74 @@ class DealfrontScraper: .send_keys(self.password) self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() - # 2) Quick-Link "Prospects finden" über XPath anklicken - prospects_btn = self.wait.until(EC.element_to_be_clickable(( - By.XPATH, - "//a[normalize-space()='Prospects finden']" + # Quick-Link "Prospects finden" anklicken + btn = self.wait.until(EC.element_to_be_clickable(( + By.XPATH, "//a[normalize-space()='Prospects finden']" ))) - prospects_btn.click() + btn.click() - # 3) Gesuchte vordefinierte Suche anklicken - search_btn = self.wait.until(EC.element_to_be_clickable(( - By.XPATH, - f"//a[normalize-space()='{search_name}']" + # Gewünschte Suche auswählen + btn = self.wait.until(EC.element_to_be_clickable(( + By.XPATH, f"//a[normalize-space()='{search_name}']" ))) - search_btn.click() + btn.click() - # 4) Auf erstes Daten-Element warten + # Auf erste Datenzeile warten first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") self.wait.until(EC.visibility_of_element_located(first)) time.sleep(1) def extract_current_page_results(self): - # kurz Implicit-Wait = 1 s, damit fehlende Elemente schnell übersprungen werden + # Implicit-Wait kurz absenken self.driver.implicitly_wait(1) - # sicherstellen, dass mindestens eine Zeile im DOM ist + # Mindestens eine Tabellenzeile im DOM rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") self.wait.until(EC.presence_of_all_elements_located(rows_sel)) rows = self.driver.find_elements(*rows_sel) results = [] - for i, row in enumerate(rows, 1): - # Name - name_elems = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") - if not name_elems: - # kein Name-Element gefunden + for row in rows: + # Firmenname + ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") + if not ne: continue - ne = name_elems[0] - company_name = (ne.get_attribute("title") or ne.text).strip() + name = (ne[0].get_attribute("title") or ne[0].text).strip() - # Website - web_elems = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a") - if web_elems: - website = web_elems[0].text.strip() + # Website aus 3. Spalte + we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a") + if we: + site = we[0].text.strip() else: td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)") - website = td3[0].text.strip() if td3 else "" + site = td3[0].text.strip() if td3 else "" - results.append({'name': company_name, 'website': website}) + results.append({"name": name, "website": site}) - # Implicit-Wait wieder zurücksetzen (Standard 10 s) + # Implicit-Wait zurücksetzen self.driver.implicitly_wait(10) return results - def click_next_page(self) -> bool: - # alle Pagination-Buttons: Prev, Zahlen, Next + def click_next_page(self): + # Prev, Seiten, Next Buttons btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button") if not btns: return False nxt = btns[-1] - # falls disabled oder nicht klickbar, Schluss if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")): return False - # aktuelle Seite merken, Klick ausführen current = self.driver.find_element( By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" ).text nxt.click() - # warten, bis sich die aktive Seitenzahl ändert self.wait.until(lambda d: d.find_element( By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" ).text != current) return True def run(self, search_name): - # Login + Navigation zur Search-List self.login_and_find_list(search_name) - - # Paginieren & extrahieren all_results = [] while True: all_results.extend(self.extract_current_page_results()) @@ -122,29 +138,29 @@ class DealfrontScraper: return all_results def main(): - creds = load_credentials("dealfront_credentials.json") - username = creds.get("username") - password = creds.get("password") - # WebDriver initialisieren + username, password = load_credentials(CREDENTIALS_FILE) + + # Chrome WebDriver initialisieren opts = Options() opts.add_argument("--headless") opts.add_argument("--no-sandbox") opts.add_argument("--disable-dev-shm-usage") - driver = webdriver.Chrome(options=opts) + service = Service(CHROMEDRIVER_PATH) + driver = webdriver.Chrome(service=service, options=opts) wait = WebDriverWait(driver, 30) - # Scraper starten scraper = DealfrontScraper(driver, wait, username, password) - results = scraper.run("Facility Management") + results = scraper.run(TARGET_SEARCH_NAME) - # Output-Ordner anlegen und als JSON speichern - os.makedirs("output", exist_ok=True) - out_file = os.path.join("output", "results.json") - with open(out_file, "w", encoding="utf-8") as f: + # Ergebnisse speichern + os.makedirs(OUTPUT_DIR, exist_ok=True) + outfile = os.path.join(OUTPUT_DIR, "results.json") + with open(outfile, "w", encoding="utf-8") as f: json.dump(results, f, ensure_ascii=False, indent=2) - print(f"✅ Fertig: {len(results)} Einträge in '{out_file}'") + print(f"✅ Fertig: {len(results)} Einträge in {outfile}") driver.quit() if __name__ == "__main__": main() +```