dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-08 10:28:45 +00:00
parent a04426daaa
commit e6667f423d

View File

@@ -2,294 +2,218 @@ import os
import json import json
import time import time
import logging import logging
import pandas as pd
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.common.by import By from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.common.keys import Keys
import pandas as pd
# Importiere Konfigurationen
from config import Config, DEALFRONT_LOGIN_URL, DEALFRONT_CREDENTIALS_FILE, DEALFRONT_TARGET_URL, TARGET_SEARCH_NAME from config import Config, DEALFRONT_LOGIN_URL, DEALFRONT_CREDENTIALS_FILE, DEALFRONT_TARGET_URL, TARGET_SEARCH_NAME
# Definiere einen festen Ausgabeordner, der im Dockerfile erstellt wird
OUTPUT_DIR = "/app/output" OUTPUT_DIR = "/app/output"
# Logging-Konfiguration # Logging-Konfiguration
LOG_LEVEL = logging.INFO # Wir setzen den Standard auf INFO für eine saubere Ausgabe LOG_LEVEL = logging.INFO
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s' LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s'
# Root-Logger für die Konsolenausgabe konfigurieren
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT, force=True, handlers=[logging.StreamHandler()]) logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT, force=True, handlers=[logging.StreamHandler()])
logging.getLogger("selenium").setLevel(logging.WARNING) # Selenium-Spam reduzieren
# Selenium-Logger auf WARNING setzen, um den Spam zu unterdrücken
logging.getLogger("selenium").setLevel(logging.WARNING)
# Eigener Logger für unser Skript
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# FileHandler hinzufügen, um ALLES (inkl. DEBUG) in eine .txt-Datei zu schreiben # FileHandler hinzufügen, um in eine .txt-Datei zu loggen
# Wichtig: Dieser Handler hat sein eigenes Level (DEBUG), um alles zu erfassen.
log_filename = f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.txt" log_filename = f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.txt"
log_filepath = os.path.join(OUTPUT_DIR, log_filename) log_filepath = os.path.join(OUTPUT_DIR, log_filename)
file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8') try:
file_handler.setLevel(logging.DEBUG) # Alles in die Datei schreiben file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8')
file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) file_handler.setLevel(logging.DEBUG) # Alles in die Datei schreiben
logging.getLogger().addHandler(file_handler) file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(file_handler)
logger.info(f"Logging konfiguriert. Konsole auf Level {logging.getLevelName(LOG_LEVEL)}. Log-Datei: {log_filepath}")
except FileNotFoundError:
logger.error(f"Konnte Log-Datei nicht erstellen. Das Verzeichnis '{OUTPUT_DIR}' existiert möglicherweise nicht im Container.")
logger.error("Stellen Sie sicher, dass das Volume-Mapping korrekt ist: -v \"$(pwd)/output:/app/output\"")
logger.info(f"Logging konfiguriert. Konsolenausgabe auf Level {logging.getLevelName(LOG_LEVEL)}. Log-Datei: {log_filepath}")
OUTPUT_DIR = "/app/output"
class DealfrontScraper: class DealfrontScraper:
"""
Kapselt alle Interaktionen mit der Dealfront-Plattform mittels Selenium.
"""
def __init__(self): def __init__(self):
""" logger.info("Initialisiere den DealfrontScraper...")
Initialisiert den WebDriver und den WebDriverWait.
Verwendet explizit den im Dockerfile installierten chromedriver.
"""
logger.info("Initialisiere den DealfrontScraper und den Chrome WebDriver.")
chrome_options = ChromeOptions() chrome_options = ChromeOptions()
# Lade-Optimierungen und Headless-Argumente
prefs = {"profile.managed_default_content_settings.images": 2} prefs = {"profile.managed_default_content_settings.images": 2}
chrome_options.add_experimental_option("prefs", prefs) chrome_options.add_experimental_option("prefs", prefs)
chrome_options.add_argument("--headless") chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
# --- ENTSCHEIDENDE ÄNDERUNG --- # Service-Objekt, das explizit den system-installierten Treiber verwendet
# Wir geben den Pfad zum funktionierenden, system-installierten Treiber explizit an.
# Dies umgeht den fehlerhaften webdriver-manager vollständig.
from selenium.webdriver.chrome.service import Service
service = Service(executable_path='/usr/bin/chromedriver') service = Service(executable_path='/usr/bin/chromedriver')
try: try:
self.driver = webdriver.Chrome(service=service, options=chrome_options) self.driver = webdriver.Chrome(service=service, options=chrome_options)
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
except Exception as e: except Exception as e:
logger.critical(f"WebDriver konnte nicht initialisiert werden. Fehler: {e}", exc_info=True) logger.critical(f"WebDriver konnte nicht initialisiert werden. Fehler: {e}", exc_info=True)
self.driver = None self.driver = None
raise raise
self.wait = WebDriverWait(self.driver, 30) self.wait = WebDriverWait(self.driver, 30)
self.username, self.password = self._load_credentials()
logger.info("WebDriver erfolgreich initialisiert.") logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self): def _load_credentials(self):
# (Diese Methode bleibt unverändert)
try: try:
with open(DEALFRONT_CREDENTIALS_FILE, 'r') as f: with open(DEALFRONT_CREDENTIALS_FILE, 'r') as f:
creds = json.load(f) creds = json.load(f)
username = creds.get("username") username = creds.get("username")
password = creds.get("password") password = creds.get("password")
if not username or "DEIN_DEALFRONT_BENUTZERNAME" in username or not password or "DEIN_DEALFRONT_PASSWORT" in password: if not username or "DEIN_DEALFRONT_BENUTZERNAME" in username or not password:
logger.error(f"Zugangsdaten in '{DEALFRONT_CREDENTIALS_FILE}' sind ungültig.") logger.error(f"Zugangsdaten in '{DEALFRONT_CREDENTIALS_FILE}' sind ungültig.")
return None, None return None, None
return username, password return username, password
except FileNotFoundError: except FileNotFoundError:
logger.error(f"Credentials-Datei nicht gefunden: '{DEALFRONT_CREDENTIALS_FILE}'") logger.error(f"Credentials-Datei nicht gefunden: '{DEALFRONT_CREDENTIALS_FILE}'")
return None, None return None, None
except json.JSONDecodeError: return None, None
logger.error(f"Fehler beim Parsen der Credentials-Datei: '{DEALFRONT_CREDENTIALS_FILE}'")
return None, None
def _save_debug_artifacts(self): def _save_debug_artifacts(self):
# (Diese Methode bleibt unverändert, aber mit neuem Namen)
try: try:
os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S") timestamp = time.strftime("%Y%m%d-%H%M%S")
screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png") screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png")
html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html")
self.driver.save_screenshot(screenshot_filepath) self.driver.save_screenshot(screenshot_filepath)
logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.") logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.")
html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html")
with open(html_filepath, "w", encoding="utf-8") as f: with open(html_filepath, "w", encoding="utf-8") as f:
f.write(self.driver.page_source) f.write(self.driver.page_source)
logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.") logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.")
except Exception as e: except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}") logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def login_and_find_list(self, search_name): def login_and_find_list(self, search_name):
""" """Führt den gesamten Prozess vom Login bis zum Laden der Zielliste robust aus."""
Führt den gesamten Prozess vom Login bis zum Laden der Zielliste robust aus. try:
""" # === LOGIN ===
try: logger.info(f"Navigiere zur Login-Seite: {DEALFRONT_LOGIN_URL}")
# === LOGIN === self.driver.get(DEALFRONT_LOGIN_URL)
logger.info(f"Navigiere zur Login-Seite: {DEALFRONT_LOGIN_URL}") self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username)
self.driver.get(DEALFRONT_LOGIN_URL) self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username) self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) logger.info("Login-Befehl gesendet.")
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet.")
# === NAVIGATION ZUM TARGET BEREICH === # === NAVIGATION ZUM TARGET BEREICH ===
logger.info("Warte auf Dashboard und den 'Prospects finden' Quick-Link...") logger.info("Warte auf Dashboard und den 'Prospects finden' Quick-Link...")
# Wir warten geduldig (bis zu 30s) auf die Kachel, die uns zur Target-Seite bringt prospects_link_selector = (By.XPATH, "//a[@data-test-target-product-tile]")
prospects_link_selector = (By.XPATH, "//a[@data-test-target-product-tile]") prospects_link = self.wait.until(EC.element_to_be_clickable(prospects_link_selector))
prospects_link = self.wait.until(EC.element_to_be_clickable(prospects_link_selector)) prospects_link.click()
prospects_link.click() logger.info("'Prospects finden' geklickt.")
logger.info("'Prospects finden' geklickt. Navigiere zur Target-Seite.")
# === LADEN DER SPEZIFISCHEN SUCHE ===
# === LADEN DER SPEZIFISCHEN SUCHE === logger.info(f"Warte auf die Liste der Suchen und klicke auf '{search_name}'...")
logger.info(f"Warte auf die Liste der Suchen und klicke auf '{search_name}'...") search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']") search_item = self.wait.until(EC.element_to_be_clickable(search_item_selector))
search_item = self.wait.until(EC.element_to_be_clickable(search_item_selector)) search_item.click()
search_item.click()
# === VERIFIZIERUNG UND WARTEN AUF TABELLENDATEN ===
# === VERIFIZIERUNG UND WARTEN AUF TABELLENDATEN === logger.info(f"Suche '{search_name}' geladen. Warte auf das Rendern der Ergebnistabelle.")
logger.info(f"Suche '{search_name}' geladen. Warte auf das Rendern der Ergebnistabelle.") table_header_selector = (By.XPATH, "//th[normalize-space()='Firma']")
table_header_selector = (By.XPATH, "//th[normalize-space()='Firma']") self.wait.until(EC.visibility_of_element_located(table_header_selector))
self.wait.until(EC.visibility_of_element_located(table_header_selector)) time.sleep(5)
time.sleep(5) # Finale, großzügige Pause für das Laden der Tabellen-Daten via JS
logger.info("Zielseite mit Ergebnissen erfolgreich erreicht.")
logger.info("Zielseite mit Ergebnissen erfolgreich erreicht.") return True
return True except Exception as e:
logger.critical(f"Der Prozess vom Login bis zum Finden der Liste ist fehlgeschlagen: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return False
except Exception as e:
logger.critical(f"Der Prozess ist fehlgeschlagen: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts() # Speichert Screenshot und HTML für die Analyse
return False
def handle_overlays(self): def handle_overlays(self):
"""Sucht nach bekannten Popups/Overlays und schließt sie.""" """Sucht nach bekannten Popups/Overlays und schließt sie."""
try: try:
# Wir geben der Seite einen Moment, um mögliche Popups zu laden.
# Timeout auf 5 Sekunden gesetzt, da wir nicht ewig warten wollen.
short_wait = WebDriverWait(self.driver, 5) short_wait = WebDriverWait(self.driver, 5)
close_button_xpath = "//button[@aria-label='Schließen' or @aria-label='Close']"
# Selektor für den Schließen-Button (das 'x') des CRM-Popups
# Der XPath sucht nach einem beliebigen Button, der ein SVG-Icon mit einer passenden "d"-Path-Definition enthält.
# Dies ist sehr robust gegen Klassen-Änderungen.
close_button_xpath = "//button[@aria-label='Schließen' or @aria-label='Close'] | //div[contains(@class, 't-prospector-backdrop')]//button"
logger.info("Suche nach bekannten Overlays/Popups...") logger.info("Suche nach bekannten Overlays/Popups...")
close_button = short_wait.until(EC.element_to_be_clickable((By.XPATH, close_button_xpath))) close_button = short_wait.until(EC.element_to_be_clickable((By.XPATH, close_button_xpath)))
logger.info("Schließen-Button für Overlay gefunden. Klicke darauf.") logger.info("Schließen-Button für Overlay gefunden. Klicke darauf.")
close_button.click() close_button.click()
time.sleep(1) # Kurze Pause, damit das Overlay verschwindet. time.sleep(1)
except TimeoutException: except TimeoutException:
# Das ist der Normalfall, wenn kein Popup da ist.
logger.info("Kein Overlay/Popup gefunden. Fahre fort.") logger.info("Kein Overlay/Popup gefunden. Fahre fort.")
except Exception as e: except Exception as e:
logger.warning(f"Fehler beim Schließen des Overlays, ignoriere und fahre fort: {e}") logger.warning(f"Fehler beim Schließen des Overlays, ignoriere und fahre fort: {e}")
self._save_debug_artifacts()
def extract_current_page_results(self):
"""
Extrahiert Firmennamen und Webseiten direkt von der Seite
mithilfe der verifizierten, präzisen CSS-Selektoren.
"""
try:
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...")
results = []
# Warten, bis das erste Element, das wir suchen (ein Firmenname), vorhanden ist.
# Das ist ein stabiler Indikator, dass die Liste geladen ist.
company_name_selector = ".sticky-column a.t-highlight-text"
logger.info(f"Warte auf das erste Firmenelement mit Selektor: '{company_name_selector}'")
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, company_name_selector)))
# Kurze, feste Pause, damit alle Elemente vollständig gerendert werden können.
time.sleep(3)
# === DIREKTE EXTRAKTION ALLER ELEMENTE MIT IHREN SELEKTOREN === def extract_current_page_results(self):
company_elements = self.driver.find_elements(By.CSS_SELECTOR, company_name_selector) """Extrahiert die Firmennamen und Webseiten."""
website_elements = self.driver.find_elements(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text") try:
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...")
logger.info(f"{len(company_elements)} Firmennamen und {len(website_elements)} Webseiten-Elemente gefunden.") results = []
rows_selector = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
if not company_elements: self.wait.until(EC.presence_of_element_located(rows_selector))
logger.warning("Keine Firmen mit dem angegebenen Selektor gefunden. Speichere Debug-Artefakte.") time.sleep(3)
self._save_debug_artifacts() rows = self.driver.find_elements(*rows_selector)
return [] if not rows:
logger.warning("Keine Ergebniszeilen (tr[id]) gefunden.")
# Wir iterieren über die gefundenen Firmen-Elemente
for i, company_element in enumerate(company_elements):
try:
# Firmenname aus dem 'title'-Attribut extrahieren (verhindert abgeschnittenen Text)
company_name = company_element.get_attribute("title").strip()
# Zugehörige Webseite finden, indem wir von der Zeile (tr) des Firmen-Elements ausgehen
row = company_element.find_element(By.XPATH, "./ancestor::tr")
website_element = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text")
website = website_element.text.strip()
if company_name and website:
results.append({'name': company_name, 'website': website})
else:
logger.warning(f"Zeile {i+1}: Unvollständige Daten (Name: '{company_name}', Webseite: '{website}').")
except NoSuchElementException:
logger.warning(f"Zeile {i+1}: Konnte Webseite für Firma '{company_name}' nicht finden. Überspringe.")
continue
logger.info(f"Extraktion abgeschlossen. {len(results)} Firmen erfolgreich zugeordnet.")
return results
except Exception as e:
logger.error(f"Ein schwerwiegender Fehler ist bei der Extraktion der Ergebnisse aufgetreten: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return [] return []
logger.info(f"{len(rows)} Firmen-Datenzeilen zur Verarbeitung gefunden.")
for i, row in enumerate(rows, 1):
try:
company_name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
results.append({'name': company_name, 'website': website})
except NoSuchElementException:
logger.warning(f"Zeile {i}: Name oder Webseite nicht extrahierbar. Überspringe.")
continue
logger.info(f"Extraktion abgeschlossen. {len(results)} Firmen gefunden.")
return results
except Exception as e:
logger.error(f"Schwerwiegender Fehler bei der Extraktion: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return []
def close(self): def close(self):
if self.driver: if self.driver:
logger.info("Schließe den WebDriver.") logger.info("Schließe den WebDriver.")
self.driver.quit() self.driver.quit()
if __name__ == "__main__": if __name__ == "__main__":
logger.info("Starte Dealfront Automatisierung - Finaler Durchbruchsversuch") logger.info("Starte Dealfront Automatisierung - Finaler Durchbruchsversuch")
scraper = None scraper = None
try: try:
scraper = DealfrontScraper() scraper = DealfrontScraper()
if not scraper.driver: if not scraper.driver:
raise Exception("WebDriver konnte nicht initialisiert werden.") raise Exception("WebDriver konnte nicht initialisiert werden.")
# Ein einziger, robuster Aufruf für den gesamten Prozess bis zur Anzeige der Ergebnistabelle
if not scraper.login_and_find_list(Config.TARGET_SEARCH_NAME): if not scraper.login_and_find_list(Config.TARGET_SEARCH_NAME):
# Der Fehler wird bereits innerhalb der Methode geloggt und Artefakte werden gespeichert.
# Wir müssen hier nur noch den Prozess beenden.
raise Exception("Der Prozess vom Login bis zum Laden der Liste ist fehlgeschlagen. Details siehe Log.") raise Exception("Der Prozess vom Login bis zum Laden der Liste ist fehlgeschlagen. Details siehe Log.")
# Wenn wir hier ankommen, sind wir garantiert auf der richtigen Seite. scraper.handle_overlays()
# Jetzt extrahieren wir die Daten.
companies = scraper.extract_current_page_results() companies = scraper.extract_current_page_results()
# Saubere Ausgabe der Ergebnisse in einer Tabelle
if companies: if companies:
df = pd.DataFrame(companies) df = pd.DataFrame(companies)
# Pandas-Optionen für eine vollständige, ungeschnittene Ausgabe
pd.set_option('display.max_rows', None) pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None) pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000) pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth', None) pd.set_option('display.max_colwidth', None)
print("\n" + "="*80) print("\n" + "="*80)
print(" EXTRAHIERTE FIRMEN (ERSTE SEITE) ".center(80, "=")) print(" EXTRAHIERTE FIRMEN (ERSTE SEITE) ".center(80, "="))
print("="*80) print("="*80)
print(df.to_string(index=False)) print(df.to_string(index=False))
print("="*80 + "\n") print("="*80 + "\n")
logger.info(f"{len(df)} Firmen erfolgreich extrahiert und in der Konsole ausgegeben.")
else: else:
logger.warning("Obwohl die Seite geladen wurde, konnten keine Firmen extrahiert werden. Bitte HTML-Dump prüfen.") logger.warning("Obwohl die Seite geladen wurde, konnten keine Firmen extrahiert werden. Bitte HTML-Dump prüfen.")
logger.info("Phase 2a Test erfolgreich abgeschlossen. Warte vor dem Schließen...") logger.info("Phase 2a Test erfolgreich abgeschlossen. Warte vor dem Schließen...")
time.sleep(10) time.sleep(10)
except Exception as e: except Exception as e:
# Hier fangen wir nur noch die Exceptions aus dem __main__-Block selbst
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=False) logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=False)
finally: finally:
if scraper: if scraper:
scraper.close() scraper.close()
logger.info("Dealfront Automatisierung beendet.") logger.info("Dealfront Automatisierung beendet.")