import os import json import time import logging import pandas as pd from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException # ============================================================================== # TEMPORÄRE, AUTARKE KONFIGURATION # ============================================================================== class TempConfig: # --- Direkt hier definierte Werte, um config.py zu umgehen --- DEALFRONT_LOGIN_URL = "https://app.dealfront.com/login" TARGET_SEARCH_NAME = "Facility Management" # <-- BITTE AN IHRE SUCHE ANPASSEN DEALFRONT_CREDENTIALS_FILE = "/app/dealfront_credentials.json" # ============================================================================== OUTPUT_DIR = "/app/output" LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s' # <-- DEFINITION HIER logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True) logging.getLogger("selenium").setLevel(logging.WARNING) logger = logging.getLogger(__name__) log_filename = f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.txt" log_filepath = os.path.join(OUTPUT_DIR, log_filename) try: file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8') file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) logging.getLogger().addHandler(file_handler) logger.info(f"Logging konfiguriert. Log-Datei: {log_filepath}") except Exception as e: logger.error(f"Konnte Log-Datei nicht erstellen: {e}") class DealfrontScraper: def __init__(self): logger.info("Initialisiere den DealfrontScraper...") chrome_options = ChromeOptions() prefs = {"profile.managed_default_content_settings.images": 2} chrome_options.add_experimental_option("prefs", prefs) chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--window-size=1920,1080") service = Service(executable_path='/usr/bin/chromedriver') try: self.driver = webdriver.Chrome(service=service, options=chrome_options) except Exception as e: logger.critical(f"WebDriver konnte nicht initialisiert werden.", exc_info=True) raise self.wait = WebDriverWait(self.driver, 30) self.username, self.password = self._load_credentials() logger.info("WebDriver erfolgreich initialisiert.") def _load_credentials(self): try: with open(TempConfig.DEALFRONT_CREDENTIALS_FILE, 'r') as f: creds = json.load(f) return creds.get("username"), creds.get("password") except Exception as e: logger.error(f"Credentials-Datei konnte nicht geladen werden: {e}") return None, None def _save_debug_artifacts(self): # ... (Diese Methode bleibt unverändert) ... try: os.makedirs(OUTPUT_DIR, exist_ok=True) timestamp = time.strftime("%Y%m%d-%H%M%S") screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png") html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html") self.driver.save_screenshot(screenshot_filepath) logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.") with open(html_filepath, "w", encoding="utf-8") as f: f.write(self.driver.page_source) logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.") except Exception as e: logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}") def login_and_find_list(self, search_name): try: logger.info(f"Navigiere zur Login-Seite: {TempConfig.DEALFRONT_LOGIN_URL}") self.driver.get(TempConfig.DEALFRONT_LOGIN_URL) self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username) self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click() logger.info("Login-Befehl gesendet. Warte auf Navigation...") prospects_link_selector = (By.XPATH, "//a[@data-test-target-product-tile]") prospects_link = self.wait.until(EC.element_to_be_clickable(prospects_link_selector)) prospects_link.click() logger.info("'Prospects finden' geklickt. Lade Suchliste...") search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']") self.wait.until(EC.element_to_be_clickable(search_item_selector)).click() logger.info(f"Suche '{search_name}' geladen. Warte auf Ergebnisse.") first_row_locator = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") self.wait.until(EC.visibility_of_element_located(first_row_locator)) logger.info("Zielseite mit Ergebnissen erfolgreich erreicht.") return True except Exception as e: logger.critical(f"Prozess bis zum Laden der Liste fehlgeschlagen: {type(e).__name__}", exc_info=True) self._save_debug_artifacts() return False def extract_current_page_results(self): """ Extrahiert Daten NUR aus den sichtbaren Ergebniszeilen und optimiert die Wartezeiten. """ try: logger.info("Extrahiere Ergebnisse von der aktuellen Seite (optimierter Ansatz)...") results = [] # Warten, bis die erste Daten-Zelle (Firmenname) sichtbar ist. Das ist unser Anker. first_company_cell_selector = (By.CSS_SELECTOR, "td.sticky-after-checkbox a.t-highlight-text") self.wait.until(EC.visibility_of_element_located(first_company_cell_selector)) # Finde alle Tabellenzeilen. all_rows = self.driver.find_elements(By.CSS_SELECTOR, "table#t-result-table tbody tr") logger.info(f"Insgesamt {len(all_rows)} -Elemente im tbody gefunden. Filtere nach sichtbaren Datenzeilen...") for i, row in enumerate(all_rows, 1): try: # Wir prüfen pro Zeile, ob sie die benötigten Elemente enthält. # Das ist viel robuster als alle Elemente auf einmal zu suchen. company_name_element = row.find_element(By.CSS_SELECTOR, "td.sticky-after-checkbox a.t-highlight-text") company_name = company_name_element.get_attribute("title").strip() website_element = row.find_element(By.CSS_SELECTOR, "td:nth-of-type(3) a") website = website_element.text.strip() if company_name: results.append({'name': company_name, 'website': website or "N/A"}) except NoSuchElementException: # Das ist jetzt erwartet für "Geister"-Zeilen. Wir loggen das nicht mehr als Warning. continue logger.info(f"Extraktion abgeschlossen. {len(results)} gültige Firmen gefunden.") return results except Exception as e: logger.error(f"Schwerwiegender Fehler bei der Extraktion: {type(e).__name__}", exc_info=True) self._save_debug_artifacts() return [] def close(self): if self.driver: logger.info("Schließe den WebDriver.") self.driver.quit() if __name__ == "__main__": logger.info("Starte Dealfront Automatisierung - DEBUG-MODUS") scraper = None try: scraper = DealfrontScraper() if not scraper.driver: raise Exception("WebDriver konnte nicht initialisiert werden.") if not scraper.login_and_find_list(TempConfig.TARGET_SEARCH_NAME): raise Exception("Der Prozess vom Login bis zum Laden der Liste ist fehlgeschlagen.") # In dieser Version gibt es keine handle_overlays Methode mehr # scraper.handle_overlays() companies = scraper.extract_current_page_results() if companies: df = pd.DataFrame(companies) pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option('display.width', 1000) pd.set_option('display.max_colwidth', None) print("\n" + "="*80) print(" EXTRAHIERTE FIRMEN (ERSTE SEITE) ".center(80, "=")) print("="*80) print(df.to_string(index=False)) print("="*80 + "\n") else: logger.warning("Obwohl die Seite geladen wurde, konnten keine Firmen extrahiert werden.") logger.info("Test erfolgreich abgeschlossen. Warte vor dem Schließen...") time.sleep(10) except Exception as e: logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=False) finally: if scraper: scraper.close() logger.info("Dealfront Automatisierung beendet.")