#!/usr/bin/env python3 import os import sys import json import time import logging from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # ──────────────────────────────────────────────────────────────── # Konstanten LOGIN_URL = "https://app.dealfront.com/login" TARGET_TAB = "Target" SEARCH_NAME = "Facility Management" CREDS_FILE = "dealfront_credentials.json" OUTPUT_DIR = "output" CHROMEDRIVER_PATH = "/usr/bin/chromedriver" LOG_FORMAT = "%(asctime)s %(levelname)-8s %(message)s" # ──────────────────────────────────────────────────────────────── logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True) logger = logging.getLogger(__name__) logging.getLogger("selenium").setLevel(logging.WARNING) def load_creds(path): if not os.path.exists(path): logger.error("Credentials-Datei nicht gefunden: %s", path) sys.exit(1) creds = json.load(open(path, encoding="utf-8")) u, p = creds.get("username"), creds.get("password") if not u or not p: logger.error("username/password fehlen in %s", path) sys.exit(1) return u, p class DealfrontScraper: def __init__(self, driver, wait, user, pwd): self.driver = driver self.wait = wait self.user = user self.pwd = pwd def login_and_select_search(self): # 1) Login-Seite aufrufen self.driver.get(LOGIN_URL) self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "input[type='email'],input[type='text']"))) # 2) Credentials eintragen self.driver.find_element(By.CSS_SELECTOR, "input[type='email'],input[type='text']").send_keys(self.user) self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.pwd) self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() # 3) Auf Target-Tab klicken self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, TARGET_TAB))).click() # 4) Sidebar mit Such-List laden sidebar_sel = "ul[data-userpilot-id='sidebar-searches-list']" self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, sidebar_sel))) # 5) Deine Suche anklicken (div[title=…]) div_sel = f"div[title='{SEARCH_NAME}']" el = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, div_sel))) self.driver.execute_script("arguments[0].click()", el) # 6) Kurzes Warten, bis erste Zeile da ist self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "a.t-highlight-text.t-highlight-text-snippet"))) time.sleep(1) def extract_current_page_results(self): # 1) Kurz Implicit-Wait absenken, damit Fehlversuche sofort zurückkehren self.driver.implicitly_wait(1) # 2) Auf erstes Firmen-Element warten (bis zu 15 s), dann kurzen Puffer first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") self.wait.until(EC.visibility_of_element_located(first)) time.sleep(1) logger.info("Extrahiere Ergebnisse von der aktuellen Seite...") results = [] # 3) Auf mindestens eine Tabellenzeile warten rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") self.wait.until(EC.presence_of_all_elements_located(rows_sel)) rows = self.driver.find_elements(*rows_sel) logger.info(f"{len(rows)} Firmen-Zeilen gefunden.") # 4) Schleife ganz ohne Sleeps oder Implicit-Waits for i, row in enumerate(rows, 1): # Name per bewährtem Selector name_elems = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") if not name_elems: logger.warning(f"Zeile {i}: Kein Name-Element gefunden. Überspringe.") continue ne = name_elems[0] company_name = (ne.get_attribute("title") or ne.text).strip() # Website per bewährtem Selector web_elems = row.find_elements(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text") website = web_elems[0].text.strip() if web_elems else "" results.append({'name': company_name, 'website': website}) logger.info(f"Extraktion abgeschlossen: {len(results)} Firmen.") return results finally: # 5) Implicit-Wait zurück auf Standard (z. B. 10 s) self.driver.implicitly_wait(10) def click_next_page(self): # Paginator-Buttons greifen btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button") if not btns: return False nxt = btns[-1] # Ende erreicht? if not nxt.is_enabled() or "disabled" in nxt.get_attribute("class"): return False current = self.driver.find_element( By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" ).text nxt.click() # auf Seitenwechsel warten self.wait.until(lambda d: d.find_element( By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" ).text != current) return True def run(self): logger.info("Starte Login und Sucheauswahl…") self.login_and_select_search() all_res = [] page = 1 while True: logger.info(f"Seite {page}: Extrahiere Daten…") all_res.extend(self.extract_current_page_results()) if not self.click_next_page(): break page += 1 return all_res def main(): user, pwd = load_creds(CREDS_FILE) opts = Options() opts.add_argument("--headless") opts.add_argument("--no-sandbox") opts.add_argument("--disable-dev-shm-usage") service = Service(CHROMEDRIVER_PATH) driver = webdriver.Chrome(service=service, options=opts) wait = WebDriverWait(driver, 30) try: scraper = DealfrontScraper(driver, wait, user, pwd) results = scraper.run() finally: driver.quit() os.makedirs(OUTPUT_DIR, exist_ok=True) path = os.path.join(OUTPUT_DIR, "results.json") with open(path, "w", encoding="utf-8") as f: json.dump(results, f, ensure_ascii=False, indent=2) logger.info(f"✅ Fertig: {len(results)} Einträge in {path}") if __name__ == "__main__": main()