From 5d34e284f67a11e4f7d64a2037db11b89a0ad99f Mon Sep 17 00:00:00 2001 From: Floke Date: Wed, 9 Jul 2025 12:57:48 +0000 Subject: [PATCH] dealfront_enrichment.py aktualisiert --- dealfront_enrichment.py | 98 ++++++++++++++++++++++------------------- 1 file changed, 53 insertions(+), 45 deletions(-) diff --git a/dealfront_enrichment.py b/dealfront_enrichment.py index 9b410f37..7c2d6b89 100644 --- a/dealfront_enrichment.py +++ b/dealfront_enrichment.py @@ -14,9 +14,9 @@ from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # ──────────────────────────────────────────────────────────────── -# Einstellung +# Konstanten LOGIN_URL = "https://app.dealfront.com/login" -TARGET_PAGE = "https://app.dealfront.com/target" +TARGET_TAB = "Target" SEARCH_NAME = "Facility Management" CREDS_FILE = "dealfront_credentials.json" OUTPUT_DIR = "output" @@ -32,9 +32,8 @@ def load_creds(path): if not os.path.exists(path): logger.error("Credentials-Datei nicht gefunden: %s", path) sys.exit(1) - with open(path, encoding="utf-8") as f: - j = json.load(f) - u, p = j.get("username"), j.get("password") + creds = json.load(open(path, encoding="utf-8")) + u, p = creds.get("username"), creds.get("password") if not u or not p: logger.error("username/password fehlen in %s", path) sys.exit(1) @@ -47,69 +46,72 @@ class DealfrontScraper: self.user = user self.pwd = pwd - def login_and_load_target(self): - # 1) Login + def login_and_select_search(self): + # 1) Login-Seite aufrufen self.driver.get(LOGIN_URL) - self.wait.until(EC.visibility_of_element_located( - (By.CSS_SELECTOR, "input[type='email'],input[type='text']") - )) + self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "input[type='email'],input[type='text']"))) + + # 2) Credentials eintragen self.driver.find_element(By.CSS_SELECTOR, "input[type='email'],input[type='text']").send_keys(self.user) self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.pwd) self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() - # 2) Warten bis URL sich ändert (Login abgeschlossen) - self.wait.until(lambda d: d.current_url != LOGIN_URL) + # 3) Auf Target-Tab klicken + self.wait.until(EC.element_to_be_clickable((By.LINK_TEXT, TARGET_TAB))).click() - # 3) Direkt zur Target-Seite navigieren - self.driver.get(TARGET_PAGE) + # 4) Sidebar mit Such-List laden + sidebar_sel = "ul[data-userpilot-id='sidebar-searches-list']" + self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, sidebar_sel))) - # 4) Auf die Sidebar mit deinen vordefinierten Suchen warten - sidebar_xpath = "//ul[contains(@class,'sidebar-tree-view')]" - self.wait.until(EC.visibility_of_element_located((By.XPATH, sidebar_xpath))) + # 5) Deine Suche anklicken (div[title=…]) + div_sel = f"div[title='{SEARCH_NAME}']" + el = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, div_sel))) + self.driver.execute_script("arguments[0].click()", el) - # 5) Deine Suche anklicken über das div[@title] - search_xpath = f"//ul[contains(@class,'sidebar-tree-view')]//div[@title='{SEARCH_NAME}']" - elem = self.wait.until(EC.element_to_be_clickable((By.XPATH, search_xpath))) - self.driver.execute_script("arguments[0].click();", elem) - - # 6) Auf das erste Daten-Element warten, bevor wir extrahieren - first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") - self.wait.until(EC.visibility_of_element_located(first)) + # 6) Kurzes Warten, bis erste Zeile da ist + self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "a.t-highlight-text.t-highlight-text-snippet"))) time.sleep(1) def extract_current_page_results(self): - # kurz Implicit-Wait absenken + # kurzer Implicit-Wait für schnelles Fallback self.driver.implicitly_wait(1) - rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") - self.wait.until(EC.presence_of_all_elements_located(rows_sel)) - rows = self.driver.find_elements(*rows_sel) + # auf ≥1 Zeile warten + rows = self.wait.until(EC.presence_of_all_elements_located(( + By.CSS_SELECTOR, "table#t-result-table tbody tr[id]" + ))) - out = [] + data = [] for row in rows: - ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") + # Name-Element + ne = row.find_elements(By.CSS_SELECTOR, "a.t-highlight-text.t-highlight-text-snippet") if not ne: continue name = (ne[0].get_attribute("title") or ne[0].text).strip() - we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a") + # Website-Element + we = row.find_elements(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text") if we: site = we[0].get_attribute("href").split("://")[-1].rstrip("/") else: - td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)") - site = td3[0].text.strip() if td3 else "" + # Fallback: Zellen-Text + txt = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)") + site = txt[0].text.strip() if txt else "" - out.append({"name": name, "website": site}) + data.append({"name": name, "website": site}) # Implicit-Wait zurücksetzen self.driver.implicitly_wait(10) - return out + logger.info(f" Extrahiert: {len(data)} Zeilen") + return data def click_next_page(self): + # Paginator-Buttons greifen btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button") if not btns: return False nxt = btns[-1] + # Ende erreicht? if not nxt.is_enabled() or "disabled" in nxt.get_attribute("class"): return False @@ -117,19 +119,25 @@ class DealfrontScraper: By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" ).text nxt.click() + # auf Seitenwechsel warten self.wait.until(lambda d: d.find_element( By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" ).text != current) return True def run(self): - self.login_and_load_target() - all_data = [] + logger.info("Starte Login und Sucheauswahl…") + self.login_and_select_search() + + all_res = [] + page = 1 while True: - all_data.extend(self.extract_current_page_results()) + logger.info(f"Seite {page}: Extrahiere Daten…") + all_res.extend(self.extract_current_page_results()) if not self.click_next_page(): break - return all_data + page += 1 + return all_res def main(): user, pwd = load_creds(CREDS_FILE) @@ -144,16 +152,16 @@ def main(): try: scraper = DealfrontScraper(driver, wait, user, pwd) - data = scraper.run() + results = scraper.run() finally: driver.quit() os.makedirs(OUTPUT_DIR, exist_ok=True) - out = os.path.join(OUTPUT_DIR, "results.json") - with open(out, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) + path = os.path.join(OUTPUT_DIR, "results.json") + with open(path, "w", encoding="utf-8") as f: + json.dump(results, f, ensure_ascii=False, indent=2) - logger.info("✅ Fertig: %d Einträge in %s", len(data), out) + logger.info(f"✅ Fertig: {len(results)} Einträge in {path}") if __name__ == "__main__": main()