import os import json import time import logging import pandas as pd from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException # --- Konfiguration --- class Config: LOGIN_URL = "https://app.dealfront.com/login" TARGET_URL = "https://app.dealfront.com/t/prospector/companies" SEARCH_NAME = "Facility Management" # <-- PASSEN SIE DIES AN IHRE GESPEICHERTE SUCHE AN CREDENTIALS_FILE = "/app/dealfront_credentials.json" OUTPUT_DIR = "/app/output" # --- Logging Setup --- LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s' logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True) logging.getLogger("selenium.webdriver.remote").setLevel(logging.WARNING) logger = logging.getLogger(__name__) os.makedirs(Config.OUTPUT_DIR, exist_ok=True) log_filepath = os.path.join(Config.OUTPUT_DIR, f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.log") file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8') file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) logging.getLogger().addHandler(file_handler) class DealfrontScraper: def __init__(self): logger.info("Initialisiere WebDriver...") chrome_options = ChromeOptions() chrome_options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2}) chrome_options.add_argument("--headless=new") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--window-size=1920,1200") try: self.driver = webdriver.Chrome(options=chrome_options) except Exception as e: logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True) raise self.wait = WebDriverWait(self.driver, 30) self.username, self.password = self._load_credentials() if not self.username or not self.password: raise ValueError("Credentials konnten nicht geladen werden. Breche ab.") logger.info("WebDriver erfolgreich initialisiert.") def _load_credentials(self): try: with open(Config.CREDENTIALS_FILE, 'r', encoding='utf-8') as f: creds = json.load(f) return creds.get("username"), creds.get("password") except Exception as e: logger.error(f"Credentials-Datei {Config.CREDENTIALS_FILE} konnte nicht geladen werden: {e}") return None, None def _save_debug_artifacts(self, suffix=""): try: timestamp = time.strftime("%Y%m%d-%H%M%S") filename_base = os.path.join(Config.OUTPUT_DIR, f"error_{suffix}_{timestamp}") self.driver.save_screenshot(f"{filename_base}.png") with open(f"{filename_base}.html", "w", encoding="utf-8") as f: f.write(self.driver.page_source) logger.error(f"Debug-Artefakte gespeichert: {filename_base}.*") except Exception as e: logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}") def login(self): try: logger.info(f"Navigiere zur Login-Seite: {Config.LOGIN_URL}") self.driver.get(Config.LOGIN_URL) self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username) self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click() logger.info("Login-Befehl gesendet. Warte 5 Sekunden auf Session-Etablierung.") time.sleep(5) # Verifizierung, dass wir nicht mehr auf der Login-Seite sind if "login" not in self.driver.current_url: logger.info("Login erfolgreich, URL hat sich geändert.") return True self._save_debug_artifacts("login_stuck") return False except Exception as e: logger.critical("Login-Prozess fehlgeschlagen.", exc_info=True) self._save_debug_artifacts("login_exception") return False def navigate_and_load_search(self, search_name): try: logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...") self.driver.get(Config.TARGET_URL) self.wait.until(EC.url_contains("/t/prospector/")) search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']") self.wait.until(EC.element_to_be_clickable(search_item_selector)).click() logger.info("Suche geladen. Warte auf das Rendern der Ergebnistabelle.") self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr"))) return True except Exception as e: logger.critical("Navigation oder Laden der Suche fehlgeschlagen.", exc_info=True) self._save_debug_artifacts("navigation_or_search_load") return False def extract_current_page_results(self): results = [] rows_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]") try: data_rows = self.driver.find_elements(*rows_selector) for row in data_rows: try: name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip() website = "N/A" try: website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip() except NoSuchElementException: pass results.append({'name': name, 'website': website}) except NoSuchElementException: continue except Exception as e: logger.error(f"Fehler bei der Extraktion auf der aktuellen Seite: {e}") return results def scrape_all_pages(self, max_pages=6): """ Iteriert durch alle Ergebnisseiten, scrollt auf jeder Seite nach unten, um alle Daten zu laden, und extrahiert sie dann. Verhindert Duplikate. """ all_companies = {} # Verwende ein Dictionary, um Duplikate zu vermeiden for page_number in range(1, max_pages + 1): logger.info(f"--- Verarbeite Seite {page_number} ---") # Warten, bis die Tabelle grundsätzlich geladen ist table_selector = (By.CSS_SELECTOR, "table#t-result-table") self.wait.until(EC.visibility_of_element_located(table_selector)) # --- NEU: Intelligente Scroll-Schleife --- logger.info("Scrolle Seite nach unten, um alle Firmen zu laden...") last_height = self.driver.execute_script("return document.querySelector('.scroll-viewport').scrollHeight") while True: self.driver.execute_script("document.querySelector('.scroll-viewport').scrollTo(0, document.querySelector('.scroll-viewport').scrollHeight);") time.sleep(2) # Wartezeit für das Nachladen der Inhalte new_height = self.driver.execute_script("return document.querySelector('.scroll-viewport').scrollHeight") if new_height == last_height: break last_height = new_height logger.info("Seitenende erreicht, alle Zeilen sollten geladen sein.") # Extrahiere die Daten von der komplett geladenen Seite page_results = self.extract_current_page_results() if not page_results: logger.warning("Konnte auf dieser Seite keine Ergebnisse extrahieren, obwohl gescrollt wurde.") for company in page_results: unique_key = (company.get('name'), company.get('website')) if unique_key not in all_companies: all_companies[unique_key] = company logger.info(f"Seite {page_number}: {len(page_results)} Firmen gefunden. Gesamt einzigartig: {len(all_companies)}") # --- Paginierungs-Logik --- try: # Wir merken uns die ID der LETZTEN Zeile, um den Wechsel zu verifizieren last_row_id = self.driver.find_element(By.XPATH, "(//table[@id='t-result-table']/tbody/tr[@id])[last()]").get_attribute("id") next_button_selector = (By.XPATH, "//nav[contains(@class, 'eb-pagination')]//a[not(contains(@class, 'disabled'))][last()]") next_button = self.driver.find_element(*next_button_selector) if "fa-angle-right" not in next_button.get_attribute('innerHTML'): logger.info("Letzte Seite erreicht (kein 'Weiter'-Pfeil im letzten Button).") break self.driver.execute_script("arguments[0].click();", next_button) logger.info(f"Klicke auf 'Weiter' zu Seite {page_number + 1}...") # Warte darauf, dass die letzte Zeile der alten Seite verschwindet self.wait.until(EC.staleness_of(self.driver.find_element(By.ID, last_row_id))) logger.info("Seitenwechsel erfolgreich verifiziert.") except (NoSuchElementException, TimeoutException): logger.info("Kein klickbarer 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.") break return list(all_companies.values()) def close(self): if self.driver: self.driver.quit() if __name__ == "__main__": scraper = None try: scraper = DealfrontScraper() if not scraper.login(): raise Exception("Login fehlgeschlagen") if not scraper.navigate_and_load_search(Config.SEARCH_NAME): raise Exception("Navigation/Suche fehlgeschlagen") all_companies = scraper.scrape_all_pages(max_pages=6) # Limitiere auf 6 Seiten if all_companies: df = pd.DataFrame(all_companies) output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv") df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig') logger.info(f"Ergebnisse ({len(df)} Firmen) erfolgreich in '{output_csv_path}' gespeichert.") else: logger.warning("Keine Firmen konnten extrahiert werden.") except Exception as e: logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=True) finally: if scraper: scraper.close() logger.info("Dealfront Automatisierung beendet.")