Files
Brancheneinstufung2/dealfront_enrichment.py
2025-07-03 06:41:00 +00:00

152 lines
7.1 KiB
Python

import os
import json
import time
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# Wichtig: Die zentralen Imports zuerst
from config import Config, DEALFRONT_LOGIN_URL, DEALFRONT_CREDENTIALS_FILE
# Logging-Konfiguration, eigenständig für dieses Skript
LOG_LEVEL = logging.DEBUG if Config.DEBUG else logging.INFO
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s'
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT, force=True)
logger = logging.getLogger(__name__)
# Definiere einen festen Ausgabeordner, der im Dockerfile erstellt wird
OUTPUT_DIR = "/app/output"
class DealfrontScraper:
def __init__(self):
logger.info("Initialisiere den DealfrontScraper und den Chrome WebDriver.")
chrome_options = ChromeOptions()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
try:
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
except Exception as e:
logger.critical(f"WebDriver konnte nicht initialisiert werden. Fehler: {e}", exc_info=True)
self.driver = None
raise
self.wait = WebDriverWait(self.driver, 20)
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
try:
with open(DEALFRONT_CREDENTIALS_FILE, 'r') as f:
creds = json.load(f)
username = creds.get("username")
password = creds.get("password")
if not username or "DEIN_DEALFRONT_BENUTZERNAME" in username or not password or "DEIN_DEALFRONT_PASSWORT" in password:
logger.error(f"Zugangsdaten in '{DEALFRONT_CREDENTIALS_FILE}' sind ungültig.")
return None, None
return username, password
except FileNotFoundError:
logger.error(f"Credentials-Datei nicht gefunden: '{DEALFRONT_CREDENTIALS_FILE}'")
return None, None
except json.JSONDecodeError:
logger.error(f"Fehler beim Parsen der Credentials-Datei: '{DEALFRONT_CREDENTIALS_FILE}'")
return None, None
def _save_error_screenshot(self):
"""Speichert einen Screenshot im Fehlerfall in das definierte OUTPUT_DIR mit Zeitstempel."""
try:
# Sicherstellen, dass der Ausgabeordner im Container existiert
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S")
filepath = os.path.join(OUTPUT_DIR, f"login_error_{timestamp}.png")
self.driver.save_screenshot(filepath)
logger.error(f"Screenshot '{filepath}' wurde für die Analyse gespeichert.")
except Exception as e_ss:
logger.error(f"Konnte Screenshot nicht speichern: {e_ss}")
def login(self):
"""
Führt den Login-Prozess auf der Dealfront-Plattform durch.
Diese Version wartet explizit auf jedes Element, bevor interagiert wird,
und verwendet die verifizierten Selektoren.
"""
if not self.driver:
return False
username, password = self._load_credentials()
if not username or not password:
return False
try:
logger.info(f"Navigiere zur Login-Seite: {DEALFRONT_LOGIN_URL}")
self.driver.get(DEALFRONT_LOGIN_URL)
# --- SCHRITT 1: Warten auf das E-Mail-Feld und ausfüllen ---
# Wir verwenden 'name' als Selektor, da dieser im HTML verifiziert ist.
email_selector = (By.NAME, "email")
logger.debug(f"Warte auf die Sichtbarkeit des E-Mail-Feldes mit Selektor: {email_selector}")
email_field = self.wait.until(EC.visibility_of_element_located(email_selector))
email_field.send_keys(username)
logger.info("E-Mail-Feld gefunden und ausgefüllt.")
# --- SCHRITT 2: Passwort-Feld finden und ausfüllen ---
# Wir nehmen an, dass das Passwortfeld ebenfalls 'name="password"' hat.
password_selector = (By.NAME, "password")
logger.debug(f"Suche Passwort-Feld mit Selektor: {password_selector}")
password_field = self.driver.find_element(*password_selector) # * entpackt das Tupel
password_field.send_keys(password)
logger.info("Passwort-Feld gefunden und ausgefüllt.")
# --- SCHRITT 3: Warten auf den Anmelde-Button und klicken ---
# Wir suchen nach einem Button, der den Text 'Anmelden' enthält. Das ist sehr robust.
login_button_selector = (By.XPATH, "//button[normalize-space()='Anmelden']")
logger.debug(f"Warte darauf, dass der Anmelde-Button klickbar ist: {login_button_selector}")
login_button = self.wait.until(EC.element_to_be_clickable(login_button_selector))
login_button.click()
logger.info("Anmelde-Button geklickt. Warte auf die Verifizierung...")
# --- SCHRITT 4: Login-Erfolg verifizieren ---
# Wir warten auf das Suchfeld im Dashboard.
verification_element_selector = (By.XPATH, "//input[@data-cy='header-search-input']")
logger.debug(f"Warte auf das Verifizierungs-Element: {verification_element_selector}")
self.wait.until(EC.visibility_of_element_located(verification_element_selector))
logger.info("Login ERFOLGREICH! Dashboard-Element gefunden.")
return True
except Exception as e:
logger.critical(f"Ein Fehler ist während des Logins aufgetreten: {type(e).__name__}", exc_info=True)
self._save_error_screenshot()
return False
def close(self):
if self.driver:
logger.info("Schließe den WebDriver.")
self.driver.quit()
if __name__ == "__main__":
logger.info("Starte Dealfront Automatisierung - Phase 1: Login-Test")
scraper = None
try:
scraper = DealfrontScraper()
if scraper.driver:
if scraper.login():
logger.info("Login-Prozess erfolgreich abgeschlossen.")
time.sleep(5)
else:
logger.error("Login-Prozess ist fehlgeschlagen.")
except Exception as e:
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten.", exc_info=True)
finally:
if scraper:
scraper.close()
logger.info("Login-Test beendet.")