import os import json import time import logging import pandas as pd from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException # --- Konfiguration --- class Config: LOGIN_URL = "https://app.dealfront.com/login" TARGET_URL = "https://app.dealfront.com/t/prospector/companies" SEARCH_NAME = "Facility Management" # <-- PASSEN SIE DIES AN IHRE GESPEICHERTE SUCHE AN CREDENTIALS_FILE = "/app/dealfront_credentials.json" OUTPUT_DIR = "/app/output" # --- Logging Setup --- LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s' logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True) logging.getLogger("selenium.webdriver.remote").setLevel(logging.WARNING) logger = logging.getLogger(__name__) os.makedirs(Config.OUTPUT_DIR, exist_ok=True) log_filepath = os.path.join(Config.OUTPUT_DIR, f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.log") file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8') file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) logging.getLogger().addHandler(file_handler) class DealfrontScraper: def __init__(self): logger.info("Initialisiere WebDriver...") chrome_options = ChromeOptions() chrome_options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2}) chrome_options.add_argument("--headless=new") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--window-size=1920,1200") try: self.driver = webdriver.Chrome(options=chrome_options) except Exception as e: logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True) raise self.wait = WebDriverWait(self.driver, 30) self.username, self.password = self._load_credentials() if not self.username or not self.password: raise ValueError("Credentials konnten nicht geladen werden. Breche ab.") logger.info("WebDriver erfolgreich initialisiert.") def _load_credentials(self): try: with open(Config.CREDENTIALS_FILE, 'r', encoding='utf-8') as f: creds = json.load(f) return creds.get("username"), creds.get("password") except Exception as e: logger.error(f"Credentials-Datei {Config.CREDENTIALS_FILE} konnte nicht geladen werden: {e}") return None, None def _save_debug_artifacts(self, suffix=""): try: timestamp = time.strftime("%Y%m%d-%H%M%S") filename_base = os.path.join(Config.OUTPUT_DIR, f"error_{suffix}_{timestamp}") self.driver.save_screenshot(f"{filename_base}.png") with open(f"{filename_base}.html", "w", encoding="utf-8") as f: f.write(self.driver.page_source) logger.error(f"Debug-Artefakte gespeichert: {filename_base}.*") except Exception as e: logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}") def login(self): try: logger.info(f"Navigiere zur Login-Seite: {Config.LOGIN_URL}") self.driver.get(Config.LOGIN_URL) self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username) self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click() logger.info("Login-Befehl gesendet. Warte 5 Sekunden auf Session-Etablierung.") time.sleep(5) # Verifizierung, dass wir nicht mehr auf der Login-Seite sind if "login" not in self.driver.current_url: logger.info("Login erfolgreich, URL hat sich geändert.") return True self._save_debug_artifacts("login_stuck") return False except Exception as e: logger.critical("Login-Prozess fehlgeschlagen.", exc_info=True) self._save_debug_artifacts("login_exception") return False def scroll_table_to_bottom(self): """ Scrollt das Tabellen-Element und die Seite nach unten, damit alle Zeilen sichtbar und geladen sind. """ try: table = self.driver.find_element(By.CSS_SELECTOR, "table#t-result-table") self.driver.execute_script("arguments[0].scrollIntoView(false);", table) self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(1) # Kurze Pause, damit ggf. Inhalte nachgeladen werden logger.info("Nach Seitenwechsel nach unten gescrollt.") except Exception as e: logger.warning(f"Fehler beim Scrollen nach unten: {e}") def navigate_and_load_search(self, search_name): try: logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...") self.driver.get(Config.TARGET_URL) self.wait.until(EC.url_contains("/t/prospector/")) search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']") self.wait.until(EC.element_to_be_clickable(search_item_selector)).click() logger.info("Suche geladen. Warte auf das Rendern der Ergebnistabelle.") self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr"))) return True except Exception as e: logger.critical("Navigation oder Laden der Suche fehlgeschlagen.", exc_info=True) self._save_debug_artifacts("navigation_or_search_load") return False def extract_current_page_results(self): results = [] rows_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]") try: data_rows = self.driver.find_elements(*rows_selector) for row in data_rows: try: name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip() website = "N/A" try: website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip() except NoSuchElementException: pass results.append({'name': name, 'website': website}) except NoSuchElementException: continue except Exception as e: logger.error(f"Fehler bei der Extraktion auf der aktuellen Seite: {e}") return results def scrape_all_pages(self, max_pages=10): """ Iteriert durch alle Ergebnisseiten, wartet nach jedem Paginieren explizit auf neue Daten, scrollt nach jedem Seitenwechsel nach unten und sammelt alle Firmenzeilen seitengetreu. """ all_companies = [] for page_number in range(1, max_pages + 1): logger.info(f"--- Verarbeite Seite {page_number} ---") try: self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table"))) except TimeoutException: logger.error("Ergebnistabelle wurde nicht geladen. Breche ab.") break # Vor dem Klick: IDs aller Zeilen merken try: rows_before = self.driver.find_elements(By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") ids_before = [r.get_attribute("id") for r in rows_before] logger.info(f"Vor Pagination: IDs der Zeilen: {ids_before}") except NoSuchElementException: ids_before = [] logger.warning("Vor Pagination: Keine Zeilen gefunden.") # Extrahiere aktuelle Seite page_results = self.extract_current_page_results() logger.info(f"Seite {page_number}: {len(page_results)} Firmen gefunden. Gesamt bisher: {len(all_companies) + len(page_results)}") all_companies.extend(page_results) # Pagination-Buttons loggen try: pagination_nav = self.driver.find_element(By.CSS_SELECTOR, "nav.eb-pagination") buttons = pagination_nav.find_elements(By.CSS_SELECTOR, "a.eb-pagination-button") logger.info(f"Gefundene Paginierungs-Buttons auf Seite {page_number}: {len(buttons)}") for idx, btn in enumerate(buttons): btn_text = btn.text.strip() btn_classes = btn.get_attribute('class') btn_html = btn.get_attribute('outerHTML') has_svg = "svg" in btn_html logger.info(f"Button {idx}: Text='{btn_text}', Klassen='{btn_classes}', SVG={has_svg}, HTML-Start={btn_html[:120]}...") except NoSuchElementException: logger.warning("Keine Pagination-Buttons gefunden.") buttons = [] # Suche nach dem Weiter-Button next_button = None for idx, btn in enumerate(buttons): btn_html = btn.get_attribute('outerHTML') btn_text = btn.text.strip() btn_classes = btn.get_attribute('class') has_svg = "svg" in btn_html is_disabled = "disabled" in btn_classes if has_svg and not is_disabled and btn_text == "": next_button = btn logger.info(f"Als Weiter-Button erkannt: Button {idx}") break if not next_button: logger.info("Kein klickbarer 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.") break logger.info("Klicke auf 'Weiter'-Button...") try: # Button ins Sichtfeld scrollen! self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", next_button) time.sleep(0.5) # Kurze Pause, damit das Scrollen abgeschlossen ist # Button wirklich klicken self.driver.execute_script("arguments[0].click();", next_button) logger.info("Klick auf Weiter-Button ausgeführt.") # Warte auf neue Seite: Änderung der IDs der Zeilen def page_changed(driver): try: rows_after = driver.find_elements(By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") ids_after = [r.get_attribute("id") for r in rows_after] logger.debug(f"Nach Pagination: IDs der Zeilen: {ids_after}") return ids_after != ids_before and len(ids_after) > 0 except Exception: return False self.wait.until(page_changed) logger.info("Seitenwechsel erfolgreich verifiziert (IDs der Zeilen haben sich geändert).") # NEU: Nach jedem Seitenwechsel nach unten scrollen! self.scroll_table_to_bottom() # Logge die aktuelle aktive Seite try: active_page = self.driver.find_element(By.CSS_SELECTOR, "a.eb-pagination-button.active").text.strip() logger.info(f"Aktive Seite nach Pagination: {active_page}") except NoSuchElementException: logger.warning("Konnte aktive Seite nach Pagination nicht ermitteln.") except Exception as e: logger.error(f"Fehler beim Klicken auf den Weiter-Button oder beim Warten auf neue Seite: {e}") # Screenshot und HTML speichern try: timestamp = time.strftime("%Y%m%d-%H%M%S") self.driver.save_screenshot(f"/app/output/pagination_error_{timestamp}.png") with open(f"/app/output/pagination_error_{timestamp}.html", "w", encoding="utf-8") as f: f.write(self.driver.page_source) logger.info(f"Screenshot und HTML der Seite nach Pagination-Fehler gespeichert.") except Exception as ee: logger.error(f"Fehler beim Speichern von Screenshot/HTML: {ee}") # Zusätzlich: Logge alle Zeilen nach dem Fehler (nur Text und HTML-Start) try: all_rows = self.driver.find_elements(By.CSS_SELECTOR, "table#t-result-table tbody tr") logger.info(f"Nach Pagination-Fehler: Es sind {len(all_rows)} Zeilen im DOM.") for i, r in enumerate(all_rows): html_snippet = r.get_attribute('outerHTML')[:120].replace('\n', ' ') logger.info(f"Zeile {i}: Text='{r.text.strip()}', HTML-Start={html_snippet}...") except Exception as eee: logger.error(f"Fehler beim Loggen der Zeilen nach Fehler: {eee}") break return all_companies def close(self): if self.driver: self.driver.quit() if __name__ == "__main__": scraper = None try: scraper = DealfrontScraper() if not scraper.login(): raise Exception("Login fehlgeschlagen") if not scraper.navigate_and_load_search(Config.SEARCH_NAME): raise Exception("Navigation/Suche fehlgeschlagen") all_companies = scraper.scrape_all_pages(max_pages=6) # Limitiere auf 6 Seiten if all_companies: df = pd.DataFrame(all_companies) output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv") df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig') logger.info(f"Ergebnisse ({len(df)} Firmen) erfolgreich in '{output_csv_path}' gespeichert.") else: logger.warning("Keine Firmen konnten extrahiert werden.") except Exception as e: logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=True) finally: if scraper: scraper.close() logger.info("Dealfront Automatisierung beendet.")