Files
Brancheneinstufung2/dealfront_enrichment.py

295 lines
15 KiB
Python

import os
import json
import time
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.common.keys import Keys
import pandas as pd
from config import Config, DEALFRONT_LOGIN_URL, DEALFRONT_CREDENTIALS_FILE, DEALFRONT_TARGET_URL, TARGET_SEARCH_NAME
OUTPUT_DIR = "/app/output"
# Logging-Konfiguration
LOG_LEVEL = logging.INFO # Wir setzen den Standard auf INFO für eine saubere Ausgabe
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s'
# Root-Logger für die Konsolenausgabe konfigurieren
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT, force=True, handlers=[logging.StreamHandler()])
# Selenium-Logger auf WARNING setzen, um den Spam zu unterdrücken
logging.getLogger("selenium").setLevel(logging.WARNING)
# Eigener Logger für unser Skript
logger = logging.getLogger(__name__)
# FileHandler hinzufügen, um ALLES (inkl. DEBUG) in eine .txt-Datei zu schreiben
# Wichtig: Dieser Handler hat sein eigenes Level (DEBUG), um alles zu erfassen.
log_filename = f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.txt"
log_filepath = os.path.join(OUTPUT_DIR, log_filename)
file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8')
file_handler.setLevel(logging.DEBUG) # Alles in die Datei schreiben
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(file_handler)
logger.info(f"Logging konfiguriert. Konsolenausgabe auf Level {logging.getLevelName(LOG_LEVEL)}. Log-Datei: {log_filepath}")
OUTPUT_DIR = "/app/output"
class DealfrontScraper:
def __init__(self):
"""
Initialisiert den WebDriver und den WebDriverWait.
Verwendet explizit den im Dockerfile installierten chromedriver.
"""
logger.info("Initialisiere den DealfrontScraper und den Chrome WebDriver.")
chrome_options = ChromeOptions()
# Lade-Optimierungen und Headless-Argumente
prefs = {"profile.managed_default_content_settings.images": 2}
chrome_options.add_experimental_option("prefs", prefs)
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
# --- ENTSCHEIDENDE ÄNDERUNG ---
# Wir geben den Pfad zum funktionierenden, system-installierten Treiber explizit an.
# Dies umgeht den fehlerhaften webdriver-manager vollständig.
from selenium.webdriver.chrome.service import Service
service = Service(executable_path='/usr/bin/chromedriver')
try:
self.driver = webdriver.Chrome(service=service, options=chrome_options)
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
except Exception as e:
logger.critical(f"WebDriver konnte nicht initialisiert werden. Fehler: {e}", exc_info=True)
self.driver = None
raise
self.wait = WebDriverWait(self.driver, 30)
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
# (Diese Methode bleibt unverändert)
try:
with open(DEALFRONT_CREDENTIALS_FILE, 'r') as f:
creds = json.load(f)
username = creds.get("username")
password = creds.get("password")
if not username or "DEIN_DEALFRONT_BENUTZERNAME" in username or not password or "DEIN_DEALFRONT_PASSWORT" in password:
logger.error(f"Zugangsdaten in '{DEALFRONT_CREDENTIALS_FILE}' sind ungültig.")
return None, None
return username, password
except FileNotFoundError:
logger.error(f"Credentials-Datei nicht gefunden: '{DEALFRONT_CREDENTIALS_FILE}'")
return None, None
except json.JSONDecodeError:
logger.error(f"Fehler beim Parsen der Credentials-Datei: '{DEALFRONT_CREDENTIALS_FILE}'")
return None, None
def _save_debug_artifacts(self):
# (Diese Methode bleibt unverändert, aber mit neuem Namen)
try:
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S")
screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png")
self.driver.save_screenshot(screenshot_filepath)
logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.")
html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html")
with open(html_filepath, "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def login_and_find_list(self, search_name):
"""
Führt den gesamten Prozess vom Login bis zum Laden der Zielliste robust aus.
"""
try:
# === LOGIN ===
logger.info(f"Navigiere zur Login-Seite: {DEALFRONT_LOGIN_URL}")
self.driver.get(DEALFRONT_LOGIN_URL)
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet.")
# === NAVIGATION ZUM TARGET BEREICH ===
logger.info("Warte auf Dashboard und den 'Prospects finden' Quick-Link...")
# Wir warten geduldig (bis zu 30s) auf die Kachel, die uns zur Target-Seite bringt
prospects_link_selector = (By.XPATH, "//a[@data-test-target-product-tile]")
prospects_link = self.wait.until(EC.element_to_be_clickable(prospects_link_selector))
prospects_link.click()
logger.info("'Prospects finden' geklickt. Navigiere zur Target-Seite.")
# === LADEN DER SPEZIFISCHEN SUCHE ===
logger.info(f"Warte auf die Liste der Suchen und klicke auf '{search_name}'...")
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
search_item = self.wait.until(EC.element_to_be_clickable(search_item_selector))
search_item.click()
# === VERIFIZIERUNG UND WARTEN AUF TABELLENDATEN ===
logger.info(f"Suche '{search_name}' geladen. Warte auf das Rendern der Ergebnistabelle.")
table_header_selector = (By.XPATH, "//th[normalize-space()='Firma']")
self.wait.until(EC.visibility_of_element_located(table_header_selector))
time.sleep(5) # Finale, großzügige Pause für das Laden der Tabellen-Daten via JS
logger.info("Zielseite mit Ergebnissen erfolgreich erreicht.")
return True
except Exception as e:
logger.critical(f"Der Prozess ist fehlgeschlagen: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts() # Speichert Screenshot und HTML für die Analyse
return False
def handle_overlays(self):
"""Sucht nach bekannten Popups/Overlays und schließt sie."""
try:
# Wir geben der Seite einen Moment, um mögliche Popups zu laden.
# Timeout auf 5 Sekunden gesetzt, da wir nicht ewig warten wollen.
short_wait = WebDriverWait(self.driver, 5)
# Selektor für den Schließen-Button (das 'x') des CRM-Popups
# Der XPath sucht nach einem beliebigen Button, der ein SVG-Icon mit einer passenden "d"-Path-Definition enthält.
# Dies ist sehr robust gegen Klassen-Änderungen.
close_button_xpath = "//button[@aria-label='Schließen' or @aria-label='Close'] | //div[contains(@class, 't-prospector-backdrop')]//button"
logger.info("Suche nach bekannten Overlays/Popups...")
close_button = short_wait.until(EC.element_to_be_clickable((By.XPATH, close_button_xpath)))
logger.info("Schließen-Button für Overlay gefunden. Klicke darauf.")
close_button.click()
time.sleep(1) # Kurze Pause, damit das Overlay verschwindet.
except TimeoutException:
# Das ist der Normalfall, wenn kein Popup da ist.
logger.info("Kein Overlay/Popup gefunden. Fahre fort.")
except Exception as e:
logger.warning(f"Fehler beim Schließen des Overlays, ignoriere und fahre fort: {e}")
self._save_debug_artifacts()
def extract_current_page_results(self):
"""
Extrahiert Firmennamen und Webseiten direkt von der Seite
mithilfe der verifizierten, präzisen CSS-Selektoren.
"""
try:
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...")
results = []
# Warten, bis das erste Element, das wir suchen (ein Firmenname), vorhanden ist.
# Das ist ein stabiler Indikator, dass die Liste geladen ist.
company_name_selector = ".sticky-column a.t-highlight-text"
logger.info(f"Warte auf das erste Firmenelement mit Selektor: '{company_name_selector}'")
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, company_name_selector)))
# Kurze, feste Pause, damit alle Elemente vollständig gerendert werden können.
time.sleep(3)
# === DIREKTE EXTRAKTION ALLER ELEMENTE MIT IHREN SELEKTOREN ===
company_elements = self.driver.find_elements(By.CSS_SELECTOR, company_name_selector)
website_elements = self.driver.find_elements(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text")
logger.info(f"{len(company_elements)} Firmennamen und {len(website_elements)} Webseiten-Elemente gefunden.")
if not company_elements:
logger.warning("Keine Firmen mit dem angegebenen Selektor gefunden. Speichere Debug-Artefakte.")
self._save_debug_artifacts()
return []
# Wir iterieren über die gefundenen Firmen-Elemente
for i, company_element in enumerate(company_elements):
try:
# Firmenname aus dem 'title'-Attribut extrahieren (verhindert abgeschnittenen Text)
company_name = company_element.get_attribute("title").strip()
# Zugehörige Webseite finden, indem wir von der Zeile (tr) des Firmen-Elements ausgehen
row = company_element.find_element(By.XPATH, "./ancestor::tr")
website_element = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text")
website = website_element.text.strip()
if company_name and website:
results.append({'name': company_name, 'website': website})
else:
logger.warning(f"Zeile {i+1}: Unvollständige Daten (Name: '{company_name}', Webseite: '{website}').")
except NoSuchElementException:
logger.warning(f"Zeile {i+1}: Konnte Webseite für Firma '{company_name}' nicht finden. Überspringe.")
continue
logger.info(f"Extraktion abgeschlossen. {len(results)} Firmen erfolgreich zugeordnet.")
return results
except Exception as e:
logger.error(f"Ein schwerwiegender Fehler ist bei der Extraktion der Ergebnisse aufgetreten: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return []
def close(self):
if self.driver:
logger.info("Schließe den WebDriver.")
self.driver.quit()
if __name__ == "__main__":
logger.info("Starte Dealfront Automatisierung - Finaler Durchbruchsversuch")
scraper = None
try:
scraper = DealfrontScraper()
if not scraper.driver:
raise Exception("WebDriver konnte nicht initialisiert werden.")
# Ein einziger, robuster Aufruf für den gesamten Prozess bis zur Anzeige der Ergebnistabelle
if not scraper.login_and_find_list(Config.TARGET_SEARCH_NAME):
# Der Fehler wird bereits innerhalb der Methode geloggt und Artefakte werden gespeichert.
# Wir müssen hier nur noch den Prozess beenden.
raise Exception("Der Prozess vom Login bis zum Laden der Liste ist fehlgeschlagen. Details siehe Log.")
# Wenn wir hier ankommen, sind wir garantiert auf der richtigen Seite.
# Jetzt extrahieren wir die Daten.
companies = scraper.extract_current_page_results()
# Saubere Ausgabe der Ergebnisse in einer Tabelle
if companies:
df = pd.DataFrame(companies)
# Pandas-Optionen für eine vollständige, ungeschnittene Ausgabe
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth', None)
print("\n" + "="*80)
print(" EXTRAHIERTE FIRMEN (ERSTE SEITE) ".center(80, "="))
print("="*80)
print(df.to_string(index=False))
print("="*80 + "\n")
logger.info(f"{len(df)} Firmen erfolgreich extrahiert und in der Konsole ausgegeben.")
else:
logger.warning("Obwohl die Seite geladen wurde, konnten keine Firmen extrahiert werden. Bitte HTML-Dump prüfen.")
logger.info("Phase 2a Test erfolgreich abgeschlossen. Warte vor dem Schließen...")
time.sleep(10)
except Exception as e:
# Hier fangen wir nur noch die Exceptions aus dem __main__-Block selbst
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=False)
finally:
if scraper:
scraper.close()
logger.info("Dealfront Automatisierung beendet.")