import os import json import time import logging from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException # Wichtig: Die zentralen Imports zuerst from config import Config, DEALFRONT_LOGIN_URL, DEALFRONT_CREDENTIALS_FILE # Logging-Konfiguration, eigenständig für dieses Skript LOG_LEVEL = logging.DEBUG if Config.DEBUG else logging.INFO LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s' logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT, force=True) logger = logging.getLogger(__name__) # Definiere einen festen Ausgabeordner, der im Dockerfile erstellt wird OUTPUT_DIR = "/app/output" class DealfrontScraper: def __init__(self): logger.info("Initialisiere den DealfrontScraper und den Chrome WebDriver.") chrome_options = ChromeOptions() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument("--disable-blink-features=AutomationControlled") chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option('useAutomationExtension', False) try: self.driver = webdriver.Chrome(options=chrome_options) self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") except Exception as e: logger.critical(f"WebDriver konnte nicht initialisiert werden. Fehler: {e}", exc_info=True) self.driver = None raise self.wait = WebDriverWait(self.driver, 20) logger.info("WebDriver erfolgreich initialisiert.") def _load_credentials(self): try: with open(DEALFRONT_CREDENTIALS_FILE, 'r') as f: creds = json.load(f) username = creds.get("username") password = creds.get("password") if not username or "DEIN_DEALFRONT_BENUTZERNAME" in username or not password or "DEIN_DEALFRONT_PASSWORT" in password: logger.error(f"Zugangsdaten in '{DEALFRONT_CREDENTIALS_FILE}' sind ungültig.") return None, None return username, password except FileNotFoundError: logger.error(f"Credentials-Datei nicht gefunden: '{DEALFRONT_CREDENTIALS_FILE}'") return None, None except json.JSONDecodeError: logger.error(f"Fehler beim Parsen der Credentials-Datei: '{DEALFRONT_CREDENTIALS_FILE}'") return None, None def _save_debug_artifacts(self): """Speichert einen Screenshot UND den HTML-Quellcode im Fehlerfall.""" try: os.makedirs(OUTPUT_DIR, exist_ok=True) timestamp = time.strftime("%Y%m%d-%H%M%S") # 1. Screenshot speichern screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png") self.driver.save_screenshot(screenshot_filepath) logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.") # 2. HTML-Quellcode speichern html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html") with open(html_filepath, "w", encoding="utf-8") as f: f.write(self.driver.page_source) logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.") except Exception as e: logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}") def login(self): """ Führt den Login-Prozess auf der Dealfront-Plattform durch. Diese Version wartet explizit auf jedes Element und verwendet die verifizierten Selektoren aus dem HTML-Dump. """ if not self.driver: return False username, password = self._load_credentials() if not username or not password: return False try: logger.info(f"Navigiere zur Login-Seite: {DEALFRONT_LOGIN_URL}") self.driver.get(DEALFRONT_LOGIN_URL) # --- SCHRITT 1: Warten auf das E-Mail-Feld und ausfüllen --- email_selector = (By.CSS_SELECTOR, "input[name='email']") logger.debug(f"Warte auf die Sichtbarkeit des E-Mail-Feldes: {email_selector}") email_field = self.wait.until(EC.visibility_of_element_located(email_selector)) email_field.send_keys(username) logger.info("E-Mail-Feld gefunden und ausgefüllt.") # --- SCHRITT 2: Warten auf das Passwort-Feld und ausfüllen --- # KORRIGIERT: Wir verwenden den CSS-Selektor für das 'type'-Attribut, # da kein 'name'-Attribut vorhanden ist. password_selector = (By.CSS_SELECTOR, "input[type='password']") logger.debug(f"Warte auf die Sichtbarkeit des Passwort-Feldes: {password_selector}") password_field = self.wait.until(EC.visibility_of_element_located(password_selector)) password_field.send_keys(password) logger.info("Passwort-Feld gefunden und ausgefüllt.") # --- SCHRITT 3: Warten auf den Anmelde-Button und klicken --- login_button_selector = (By.XPATH, "//button[normalize-space()='Log in']") # Englische Version, falls Sprache umschaltet logger.debug(f"Warte darauf, dass der Anmelde-Button klickbar ist: {login_button_selector}") login_button = self.wait.until(EC.element_to_be_clickable(login_button_selector)) login_button.click() logger.info("Anmelde-Button geklickt. Warte auf die Verifizierung...") # --- SCHRITT 4: Login-Erfolg verifizieren --- verification_element_selector = (By.XPATH, "//input[@data-cy='header-search-input']") logger.debug(f"Warte auf das Verifizierungs-Element: {verification_element_selector}") self.wait.until(EC.visibility_of_element_located(verification_element_selector)) logger.info("LOGIN ERFOLGREICH! Dashboard-Element gefunden.") return True except Exception as e: logger.critical(f"Ein Fehler ist während des Logins aufgetreten: {type(e).__name__}", exc_info=True) self._save_debug_artifacts() return False def navigate_to_target(self): """Navigiert zum 'Target'-Bereich der Anwendung.""" try: logger.info("Navigiere zum 'Target'-Bereich...") target_button_selector = (By.XPATH, "//a[contains(@class, 'text-base') and .//span[normalize-space()='Target']] | //a[.//div[normalize-space()='Target']]") # Alternative, falls der obere Selektor nicht klappt: # target_button_selector = (By.CSS_SELECTOR, "a[data-test-product-link-for='Target']") target_button = self.wait.until(EC.element_to_be_clickable(target_button_selector)) target_button.click() # Verifizieren, dass die Seite geladen ist, indem wir auf den "+ Neue Suche"-Button warten new_search_button_selector = (By.XPATH, "//button[normalize-space()='+ Neue Suche']") self.wait.until(EC.visibility_of_element_located(new_search_button_selector)) logger.info("'Target'-Seite erfolgreich geladen.") return True except Exception as e: logger.critical(f"Navigation zur 'Target'-Seite fehlgeschlagen: {type(e).__name__}", exc_info=True) self._save_debug_artifacts() return False def load_search(self, search_name): """Lädt eine vordefinierte Suche anhand ihres Namens.""" try: logger.info(f"Suche und lade die vordefinierte Suche: '{search_name}'") # Der XPath sucht nach einem Link, dessen Text den Suchnamen enthält search_link_selector = (By.XPATH, f"//a[contains(., '{search_name}')] | //div[contains(., '{search_name}') and contains(@class, 'list-item')]") search_link = self.wait.until(EC.element_to_be_clickable(search_link_selector)) search_link.click() # Verifizieren, dass die Ergebnisse geladen sind, indem wir auf die Ergebnistabelle warten. # Ein guter Indikator ist die Tabellenüberschrift "Firma". results_table_header_selector = (By.XPATH, "//th[normalize-space()='Firma']") self.wait.until(EC.visibility_of_element_located(results_table_header_selector)) logger.info(f"Suche '{search_name}' erfolgreich geladen und Ergebnisse angezeigt.") # Eine kleine, feste Wartezeit kann helfen, damit alle Daten nachgeladen sind. time.sleep(3) return True except Exception as e: logger.critical(f"Laden der Suche '{search_name}' fehlgeschlagen: {type(e).__name__}", exc_info=True) self._save_debug_artifacts() return False def extract_current_page_results(self): """Extrahiert die Firmennamen und Webseiten von der aktuellen Ergebnisseite.""" try: logger.info("Extrahiere Ergebnisse von der aktuellen Seite...") results = [] # Finde alle Zeilen in der Ergebnistabelle. Wir gehen davon aus, dass es `tr`-Elemente sind. # Ein stabiler Selektor könnte die `data-test`-Attribute nutzen, falls vorhanden. rows_selector = (By.XPATH, "//tbody/tr") rows = self.driver.find_elements(*rows_selector) if not rows: logger.warning("Keine Ergebniszeilen auf der Seite gefunden.") return [] logger.info(f"{len(rows)} Ergebniszeilen gefunden.") for row in rows: try: # Extrahiere Firmenname und Webseite. # Diese XPaths sind Annahmen und müssen ggf. nach Analyse des HTML angepasst werden. company_name = row.find_element(By.XPATH, ".//td[2]//a").text.strip() website = row.find_element(By.XPATH, ".//td[3]//a").text.strip() if company_name and website: results.append({'name': company_name, 'website': website}) except NoSuchElementException: logger.warning("Konnte Name oder Webseite in einer Zeile nicht finden, überspringe Zeile.") continue logger.info(f"{len(results)} Firmen erfolgreich extrahiert.") return results except Exception as e: logger.error(f"Fehler bei der Extraktion der Ergebnisse: {type(e).__name__}", exc_info=True) self._save_debug_artifacts() return [] def close(self): if self.driver: logger.info("Schließe den WebDriver.") self.driver.quit() if __name__ == "__main__": logger.info("Starte Dealfront Automatisierung - Phase 2a: Suche und Extraktion") scraper = None try: scraper = DealfrontScraper() if scraper.driver: # Schritt 1: Login if not scraper.login(): raise Exception("Login fehlgeschlagen.") logger.info("Login erfolgreich. Pausiere kurz vor der Navigation.") time.sleep(3) # Schritt 2: Navigation if not scraper.navigate_to_target(): raise Exception("Navigation zur Target-Seite fehlgeschlagen.") # Schritt 3: Suche laden if not scraper.load_search(Config.TARGET_SEARCH_NAME): raise Exception(f"Laden der Suche '{Config.TARGET_SEARCH_NAME}' fehlgeschlagen.") # Schritt 4: Ergebnisse extrahieren companies = scraper.extract_current_page_results() if companies: logger.info("===== Extrahierte Firmen (erste Seite) =====") for company in companies: logger.info(f" - Name: {company['name']}, Webseite: {company['website']}") logger.info("===========================================") else: logger.warning("Keine Firmen auf der ersten Seite extrahiert.") logger.info("Phase 2a Test erfolgreich abgeschlossen. Warte vor dem Schließen...") time.sleep(10) except Exception as e: logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=False) finally: if scraper: scraper.close() logger.info("Dealfront Automatisierung beendet.")