diff --git a/dealfront_enrichment.py b/dealfront_enrichment.py index 0913ee9a..6a24b5b3 100644 --- a/dealfront_enrichment.py +++ b/dealfront_enrichment.py @@ -13,152 +13,140 @@ from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC -# ──────────────────────────────────────────────────────────────────────────── +# ──────────────────────────────────────────────────────────────── # Konstanten LOGIN_URL = "https://app.dealfront.com/login" -TARGET_SEARCH_NAME = "Facility Management" -CREDENTIALS_FILE = "dealfront_credentials.json" +SEARCH_NAME = "Facility Management" +CREDS_FILE = "dealfront_credentials.json" OUTPUT_DIR = "output" CHROMEDRIVER_PATH = "/usr/bin/chromedriver" -LOG_FORMAT = "%(asctime)s - %(levelname)-8s - %(message)s" -# ──────────────────────────────────────────────────────────────────────────── +LOG_FORMAT = "%(asctime)s %(levelname)-8s %(message)s" +# ──────────────────────────────────────────────────────────────── -# Logging logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True) logger = logging.getLogger(__name__) logging.getLogger("selenium").setLevel(logging.WARNING) -def load_credentials(path): - if not os.path.isfile(path): - logger.error(f"Credentials-Datei nicht gefunden: {path}") +def load_creds(path): + if not os.path.exists(path): + logger.error("Credentials-Datei nicht gefunden: %s", path) sys.exit(1) with open(path, encoding="utf-8") as f: creds = json.load(f) - user = creds.get("username") - pwd = creds.get("password") - if not user or not pwd: - logger.error("Credentials-Datei enthält keinen username/password.") + u = creds.get("username") + p = creds.get("password") + if not u or not p: + logger.error("Username oder Passwort fehlt in %s", path) sys.exit(1) - return user, pwd + return u, p class DealfrontScraper: - def __init__(self, driver, wait, username, password): + def __init__(self, driver, wait, user, pwd): self.driver = driver self.wait = wait - self.username = username - self.password = password + self.user = user + self.pwd = pwd - def login_and_find_list(self): + def login_and_select_search(self): # 1) Login self.driver.get(LOGIN_URL) self.wait.until(EC.visibility_of_element_located( (By.CSS_SELECTOR, "input[type='email'], input[type='text']") )) - self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']")\ - .send_keys(self.username) - self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")\ - .send_keys(self.password) + self.driver.find_element(By.CSS_SELECTOR, "input[type='email'], input[type='text']").send_keys(self.user) + self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.pwd) self.driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() - # 2) Quick-Link "Prospects finden" per XPath-Text suchen - prospects_xpath = "//*[contains(normalize-space(.), 'Prospects finden')]" - btn = self.wait.until(EC.element_to_be_clickable((By.XPATH, prospects_xpath))) - btn.click() + # 2) Auf Sidebar-Liste warten + ul_selector = "ul[data-userpilot-id='sidebar-searches-list']" + self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, ul_selector))) - # 3) Vordefinierte Suche anklicken - search_xpath = f"//*[contains(normalize-space(.), '{TARGET_SEARCH_NAME}')]" - btn2 = self.wait.until(EC.element_to_be_clickable((By.XPATH, search_xpath))) - btn2.click() + # 3) Genaue Suche anklicken über das
+ xpath = f"//ul[@data-userpilot-id='sidebar-searches-list']//div[@title='{SEARCH_NAME}']" + elem = self.wait.until(EC.element_to_be_clickable((By.XPATH, xpath))) + # JS-Click, damit alle Listener feuern + self.driver.execute_script("arguments[0].click();", elem) - # 4) Erstes Daten-Element abwarten + # 4) Auf erste Datenzeile warten first = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") self.wait.until(EC.visibility_of_element_located(first)) time.sleep(1) - def extract_current_page_results(self): - # kurz Implicit-Wait 1 s + def extract_page(self): + # kurz Implicit-Wait 1 s, damit find_elements nicht blocken self.driver.implicitly_wait(1) - - # auf ≥1 Zeile warten rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") self.wait.until(EC.presence_of_all_elements_located(rows_sel)) rows = self.driver.find_elements(*rows_sel) - results = [] + out = [] for row in rows: - # Name ne = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") if not ne: continue name = (ne[0].get_attribute("title") or ne[0].text).strip() - # Website aus dritter Spalte we = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a") if we: - site = we[0].text.strip() + site = we[0].get_attribute("href").split("://")[-1].rstrip("/") else: td3 = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)") site = td3[0].text.strip() if td3 else "" - results.append({"name": name, "website": site}) + out.append({"name": name, "website": site}) - # Implicit-Wait zurück auf 10 s + # reset Implicit-Wait self.driver.implicitly_wait(10) - return results + return out - def click_next_page(self): - # Paginator-Buttons (Prev, 1…n, Next) + def click_next(self): btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button") if not btns: return False nxt = btns[-1] - if (not nxt.is_enabled()) or ("disabled" in nxt.get_attribute("class")): + if not nxt.is_enabled() or "disabled" in nxt.get_attribute("class"): return False - current = self.driver.find_element( By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" ).text nxt.click() - # auf neue aktive Seite warten self.wait.until(lambda d: d.find_element( By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" ).text != current) return True def run(self): - self.login_and_find_list() - all_res = [] + self.login_and_select_search() + all_data = [] while True: - all_res.extend(self.extract_current_page_results()) - if not self.click_next_page(): + all_data.extend(self.extract_page()) + if not self.click_next(): break - return all_res + return all_data def main(): - user, pwd = load_credentials(CREDENTIALS_FILE) + user, pwd = load_creds(CREDS_FILE) - # ChromeDriver starten - opts = Options() + opts = Options() opts.add_argument("--headless") opts.add_argument("--no-sandbox") opts.add_argument("--disable-dev-shm-usage") service = Service(CHROMEDRIVER_PATH) - driver = webdriver.Chrome(service=service, options=opts) - wait = WebDriverWait(driver, 30) + driver = webdriver.Chrome(service=service, options=opts) + wait = WebDriverWait(driver, 30) try: scraper = DealfrontScraper(driver, wait, user, pwd) - data = scraper.run() + data = scraper.run() finally: driver.quit() - # speichern os.makedirs(OUTPUT_DIR, exist_ok=True) out = os.path.join(OUTPUT_DIR, "results.json") with open(out, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) - print(f"✅ Fertig: {len(data)} Einträge in '{out}'") + print(f"✅ Fertig: {len(data)} Einträge in {out}") if __name__ == "__main__": main()