diff --git a/dealfront_enrichment.py b/dealfront_enrichment.py index e5f4650a..cd4f214e 100644 --- a/dealfront_enrichment.py +++ b/dealfront_enrichment.py @@ -1,41 +1,22 @@ import os -import json import time +import json import logging -import pandas as pd + from selenium import webdriver -from selenium.webdriver.common.by import By +from selenium.webdriver.chrome.options import Options, ChromeOptions from selenium.webdriver.chrome.service import Service -from selenium.webdriver.chrome.options import Options as ChromeOptions +from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC -from selenium.common.exceptions import TimeoutException, NoSuchElementException -# ============================================================================== -# TEMPORÄRE, AUTARKE KONFIGURATION -# ============================================================================== -class TempConfig: - # --- Direkt hier definierte Werte, um config.py zu umgehen --- - DEALFRONT_LOGIN_URL = "https://app.dealfront.com/login" - TARGET_SEARCH_NAME = "Facility Management" # <-- BITTE AN IHRE SUCHE ANPASSEN - DEALFRONT_CREDENTIALS_FILE = "/app/dealfront_credentials.json" -# ============================================================================== +from config import TempConfig # Import deiner Konfigurationsklasse -OUTPUT_DIR = "/app/output" -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s', force=True) -logging.getLogger("selenium").setLevel(logging.WARNING) +# Logging konfigurieren +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(name)s - %(message)s' +logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) logger = logging.getLogger(__name__) -log_filename = f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.txt" -log_filepath = os.path.join(OUTPUT_DIR, log_filename) -try: - file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8') - file_handler.setLevel(logging.DEBUG) - file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) - logging.getLogger().addHandler(file_handler) - logger.info(f"Logging konfiguriert. Log-Datei: {log_filepath}") -except Exception as e: - logger.error(f"Konnte Log-Datei nicht erstellen: {e}") class DealfrontScraper: def __init__(self): @@ -47,175 +28,135 @@ class DealfrontScraper: chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--window-size=1920,1080") - service = Service(executable_path='/usr/bin/chromedriver') + service = Service(executable_path=TempConfig.CHROMEDRIVER_PATH) try: self.driver = webdriver.Chrome(service=service, options=chrome_options) - except Exception as e: - logger.critical(f"WebDriver konnte nicht initialisiert werden.", exc_info=True) + except Exception: + logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True) raise - self.wait = WebDriverWait(self.driver, 30) + self.wait = WebDriverWait(self.driver, TempConfig.DEFAULT_TIMEOUT) self.username, self.password = self._load_credentials() logger.info("WebDriver erfolgreich initialisiert.") def _load_credentials(self): try: - with open(TempConfig.DEALFRONT_CREDENTIALS_FILE, 'r') as f: + with open(TempConfig.DEALFRONT_CREDENTIALS_FILE, 'r', encoding='utf-8') as f: creds = json.load(f) - return creds.get("username"), creds.get("password") + return creds.get('username'), creds.get('password') except Exception as e: logger.error(f"Credentials-Datei konnte nicht geladen werden: {e}") - return None, None + raise def _save_debug_artifacts(self): - # ... (Diese Methode bleibt unverändert) ... try: - os.makedirs(OUTPUT_DIR, exist_ok=True) - timestamp = time.strftime("%Y%m%d-%H%M%S") - screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png") - html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html") - self.driver.save_screenshot(screenshot_filepath) - logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.") - with open(html_filepath, "w", encoding="utf-8") as f: + os.makedirs(TempConfig.OUTPUT_DIR, exist_ok=True) + ts = time.strftime("%Y%m%d-%H%M%S") + png = os.path.join(TempConfig.OUTPUT_DIR, f"error_{ts}.png") + html = os.path.join(TempConfig.OUTPUT_DIR, f"error_{ts}.html") + self.driver.save_screenshot(png) + logger.error(f"Screenshot '{png}' gespeichert.") + with open(html, 'w', encoding='utf-8') as f: f.write(self.driver.page_source) - logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.") + logger.error(f"HTML-Source '{html}' gespeichert.") except Exception as e: - logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}") + logger.error(f"Debug-Artefakte konnten nicht gespeichert werden: {e}") def login_and_find_list(self, search_name): - # ... (Diese Methode bleibt unverändert, verwendet aber jetzt TempConfig) ... - try: - logger.info(f"Navigiere zur Login-Seite: {TempConfig.DEALFRONT_LOGIN_URL}") - self.driver.get(TempConfig.DEALFRONT_LOGIN_URL) - self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username) - self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) - self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click() - logger.info("Login-Befehl gesendet.") - logger.info("Warte auf Dashboard und den 'Prospects finden' Quick-Link...") - prospects_link_selector = (By.XPATH, "//a[@data-test-target-product-tile]") - prospects_link = self.wait.until(EC.element_to_be_clickable(prospects_link_selector)) - prospects_link.click() - logger.info("'Prospects finden' geklickt.") - logger.info(f"Warte auf die Liste der Suchen und klicke auf '{search_name}'...") - search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']") - search_item = self.wait.until(EC.element_to_be_clickable(search_item_selector)) - search_item.click() - logger.info(f"Suche '{search_name}' geladen. Warte auf das Rendern der Ergebnistabelle.") - first_row_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") - self.wait.until( - EC.visibility_of_element_located(first_row_locator) - ) - time.sleep(5) - logger.info("Zielseite mit Ergebnissen erfolgreich erreicht.") - return True - except Exception as e: - logger.critical(f"Der Prozess ist fehlgeschlagen: {type(e).__name__}", exc_info=True) - self._save_debug_artifacts() - return False + # Login + logger.info(f"Navigiere zur Login-Seite: {TempConfig.DEALFRONT_LOGIN_URL}") + self.driver.get(TempConfig.DEALFRONT_LOGIN_URL) + self.wait.until(EC.visibility_of_element_located((By.NAME, 'email'))).send_keys(self.username) + self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) + self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click() + logger.info("Login gesendet.") + + # 'Prospects finden' + tile = self.wait.until( + EC.element_to_be_clickable((By.CSS_SELECTOR, "a[data-test-target-product-tile]")) + ) + tile.click() + logger.info("'Prospects finden' geklickt.") + + # Vordefinierte Suche auswählen + sel = (By.XPATH, f"//div[contains(@class,'truncate') and normalize-space()='{search_name}']") + item = self.wait.until(EC.element_to_be_clickable(sel)) + item.click() + logger.info(f"Suche '{search_name}' geladen.") def extract_current_page_results(self): - # 1) Kurzes Absenken des Implicit-Waits + # 1) Kurzer Implicit-Wait self.driver.implicitly_wait(1) - # 2) Warten auf erstes Firmen-Element - first_row_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") - self.wait.until(EC.visibility_of_element_located(first_row_locator)) - time.sleep(1) + # 2) Warten auf erstes Daten-Element + first_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") + self.wait.until(EC.visibility_of_element_located(first_locator)) - logger.info("Extrahiere Ergebnisse von der aktuellen Seite...") + logger.info("Extrahiere aktuelle Seite...") results = [] - # 3) Warten auf mindestens eine Tabellenzeile - rows_selector = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") - self.wait.until(EC.presence_of_all_elements_located(rows_selector)) - rows = self.driver.find_elements(*rows_selector) - logger.info(f"{len(rows)} Firmen-Zeilen gefunden.") + # 3) Auf mindestens eine Zeile warten + rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") + self.wait.until(EC.presence_of_all_elements_located(rows_sel)) + rows = self.driver.find_elements(*rows_sel) + logger.info(f"{len(rows)} Zeilen gefunden.") - # 4) Daten sammeln, ohne weitere Sleeps/Exceptions + # 4) Namen & Websites extrahieren for i, row in enumerate(rows, 1): - name_elems = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") - if not name_elems: - logger.warning(f"Zeile {i}: Kein Name-Element gefunden. Überspringe.") + names = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") + if not names: + logger.warning(f"Zeile {i}: Kein Name gefunden.") continue - name_elem = name_elems[0] - company_name = (name_elem.get_attribute("title") or name_elem.text).strip() + name_elem = names[0] + name = (name_elem.get_attribute('title') or name_elem.text).strip() - web_elems = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a") - if web_elems: - website = web_elems[0].text.strip() + webs = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a") + if webs: + web = webs[0].text.strip() else: cell = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)") - website = cell[0].text.strip() if cell else "" + web = cell[0].text.strip() if cell else '' - results.append({'name': company_name, 'website': website}) + results.append({'name': name, 'website': web}) logger.info(f"Extraktion abgeschlossen: {len(results)} Firmen.") + # Implicit-Wait reset + self.driver.implicitly_wait(TempConfig.IMPLICIT_WAIT) return results def click_next_page(self) -> bool: - """ - Klickt auf den 'Next'-Paginator-Button. - Gibt False zurück, wenn kein Next-Button (mehr) klickbar ist. - """ - # alle Buttons (Prev, Seiten, Next) abgreifen - buttons = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button") - next_btn = buttons[-1] # letzter ist Next - # Ist er deaktiviert? - if not next_btn.is_enabled() or "disabled" in next_btn.get_attribute("class"): + btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button") + if not btns: return False - - # Merke aktuelle Seite - current = self.driver.find_element( - By.CSS_SELECTOR, - "nav.eb-pagination a.eb-pagination-button.active" + nxt = btns[-1] + if not nxt.is_enabled() or 'disabled' in nxt.get_attribute('class'): + return False + curr = self.driver.find_element( + By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" ).text - next_btn.click() - - # Warte, bis die aktive Seite sich ändert - WebDriverWait(self.driver, 10).until( + nxt.click() + self.wait.until( lambda d: d.find_element( - By.CSS_SELECTOR, - "nav.eb-pagination a.eb-pagination-button.active" - ).text != current + By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active" + ).text != curr ) return True def run(self, search_name): - # 1) Login & Suche laden - self.login_and_find_list(search_name) - - # 2) Alle Seiten durchgehen - all_results = [] - while True: - page_results = self.extract_current_page_results() - all_results.extend(page_results) - if not self.click_next_page(): - break - - return all_results + try: + self.login_and_find_list(search_name) + all_res = [] + while True: + page_res = self.extract_current_page_results() + all_res.extend(page_res) + if not self.click_next_page(): + break + return all_res + finally: + self.driver.quit() -if __name__ == "__main__": - from selenium import webdriver - from selenium.webdriver.chrome.options import Options - from selenium.webdriver.support.ui import WebDriverWait - - # 1) WebDriver konfigurieren - chrome_options = Options() - chrome_options.add_argument("--headless") # headless mode - chrome_options.add_argument("--no-sandbox") # für Docker - chrome_options.add_argument("--disable-dev-shm-usage") - driver = webdriver.Chrome(options=chrome_options) - - # 2) Explicit Wait erstellen - wait = WebDriverWait(driver, 30) - - # 3) Scraper starten - scraper = DealfrontScraper(driver, wait) - results = scraper.run("Facility Management") - - # 4) Ergebnis ausgeben oder weiterverarbeiten - for entry in results: - print(entry) - - # 5) Aufräumen - driver.quit() \ No newline at end of file +if __name__ == '__main__': + scraper = DealfrontScraper() + data = scraper.run('Facility Management') + for d in data: + print(d)