Files
Brancheneinstufung2/dealfront_enrichment.py

256 lines
12 KiB
Python

import os
import json
import time
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.common.keys import Keys
import pandas as pd
from config import Config, DEALFRONT_LOGIN_URL, DEALFRONT_CREDENTIALS_FILE, DEALFRONT_TARGET_URL, TARGET_SEARCH_NAME
# Logging-Konfiguration
LOG_LEVEL = logging.DEBUG if Config.DEBUG else logging.INFO
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s'
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT, force=True)
logger = logging.getLogger(__name__)
OUTPUT_DIR = "/app/output"
class DealfrontScraper:
def __init__(self):
logger.info("Initialisiere DealfrontScraper und Chrome WebDriver.")
chrome_options = ChromeOptions()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
try:
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
except Exception as e:
logger.critical(f"WebDriver konnte nicht initialisiert werden. Fehler: {e}", exc_info=True)
self.driver = None
raise
self.wait = WebDriverWait(self.driver, 30) # Timeout auf 30s erhöht für mehr Stabilität
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
# (Diese Methode bleibt unverändert)
try:
with open(DEALFRONT_CREDENTIALS_FILE, 'r') as f:
creds = json.load(f)
username = creds.get("username")
password = creds.get("password")
if not username or "DEIN_DEALFRONT_BENUTZERNAME" in username or not password or "DEIN_DEALFRONT_PASSWORT" in password:
logger.error(f"Zugangsdaten in '{DEALFRONT_CREDENTIALS_FILE}' sind ungültig.")
return None, None
return username, password
except FileNotFoundError:
logger.error(f"Credentials-Datei nicht gefunden: '{DEALFRONT_CREDENTIALS_FILE}'")
return None, None
except json.JSONDecodeError:
logger.error(f"Fehler beim Parsen der Credentials-Datei: '{DEALFRONT_CREDENTIALS_FILE}'")
return None, None
def _save_debug_artifacts(self):
# (Diese Methode bleibt unverändert, aber mit neuem Namen)
try:
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S")
screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png")
self.driver.save_screenshot(screenshot_filepath)
logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.")
html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html")
with open(html_filepath, "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def login_and_navigate_to_target(self):
"""
Führt den Login durch und navigiert zur Target-URL.
Dieser kombinierte Ansatz wartet nach jeder Aktion explizit auf den Erfolg.
"""
if not self.driver:
return False
username, password = self._load_credentials()
if not username or not password:
return False
try:
# --- SCHRITT 1: LOGIN-SEITE LADEN ---
logger.info(f"Navigiere zur Login-Seite: {DEALFRONT_LOGIN_URL}")
self.driver.get(DEALFRONT_LOGIN_URL)
# --- SCHRITT 2: DATEN EINGEBEN UND KLICKEN ---
logger.info("Warte auf Login-Formular und fülle es aus...")
email_field = self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "input[name='email']")))
email_field.send_keys(username)
password_field = self.driver.find_element(By.CSS_SELECTOR, "input[type='password']")
password_field.send_keys(password)
login_button = self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']")
login_button.click()
logger.info("Login-Button geklickt.")
# --- SCHRITT 3: AUF ERFOLGREICHEN LOGIN WARTEN (Dashboard) ---
logger.info("Warte auf Dashboard nach dem Login...")
verification_dashboard_selector = (By.XPATH, "//input[@data-cy='header-search-input']")
self.wait.until(EC.visibility_of_element_located(verification_dashboard_selector))
logger.info("Dashboard erfolgreich erreicht.")
time.sleep(2) # Eine kleine Pause, damit alle Hintergrund-Skripte laden können.
# --- SCHRITT 4: DIREKTE NAVIGATION ZUR TARGET-URL ---
logger.info(f"Navigiere direkt zur Target-URL: {Config.DEALFRONT_TARGET_URL}")
self.driver.get(Config.DEALFRONT_TARGET_URL)
# --- SCHRITT 5: ERFOLG DER NAVIGATION VERIFIZIEREN ---
logger.info("Warte auf Bestätigung der Target-Seite...")
verification_target_selector = (By.XPATH, "//button[normalize-space()='+ Neue Suche']")
self.wait.until(EC.visibility_of_element_located(verification_target_selector))
logger.info("'Target'-Seite erfolgreich und vollständig geladen.")
return True
except Exception as e:
logger.critical(f"Login- oder Navigationsprozess fehlgeschlagen: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return False
def load_search(self, search_name):
# (Diese Methode bleibt vorerst unverändert)
try:
logger.info(f"Suche und lade die vordefinierte Suche: '{search_name}'")
search_link_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
search_link = self.wait.until(EC.element_to_be_clickable(search_link_selector))
search_link.click()
results_table_header_selector = (By.XPATH, "//th[normalize-space()='Firma']")
self.wait.until(EC.visibility_of_element_located(results_table_header_selector))
logger.info(f"Suche '{search_name}' erfolgreich geladen und Ergebnisse angezeigt.")
time.sleep(3)
return True
except Exception as e:
logger.critical(f"Laden der Suche '{search_name}' fehlgeschlagen: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return False
def extract_current_page_results(self):
"""
Extrahiert die Firmennamen und Webseiten von der aktuellen Ergebnisseite
unter Verwendung der verifizierten und präzisen CSS-Selektoren.
"""
try:
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...")
results = []
# 1. Finde alle Zeilen der Tabelle.
# Jede Zeile ist ein `<tr>`-Element mit einer einzigartigen ID.
rows_selector = (By.CSS_SELECTOR, "table#t-result-table tbody > tr[id]")
# Wir warten explizit, bis mindestens eine Zeile geladen ist.
self.wait.until(EC.presence_of_element_located(rows_selector))
time.sleep(3) # Kurze zusätzliche Pause, um sicherzustellen, dass JS die Tabelle vollständig rendert.
rows = self.driver.find_elements(*rows_selector)
if not rows:
logger.warning("Keine Ergebniszeilen (tr[id]) auf der Seite gefunden.")
self._save_debug_artifacts()
return []
logger.info(f"{len(rows)} Ergebniszeilen zur Verarbeitung gefunden.")
# 2. Iteriere durch jede Zeile und extrahiere die Daten.
for i, row in enumerate(rows):
company_name = ""
website = ""
try:
# --- KORRIGIERTER FIRMENNAMEN-SELEKTOR ---
company_name_selector = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
company_name_element = row.find_element(*company_name_selector)
# Wir holen uns das 'title'-Attribut, da der Text abgeschnitten sein könnte.
company_name = company_name_element.get_attribute("title").strip()
# --- KORRIGIERTER WEBSEITEN-SELEKTOR ---
website_selector = (By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text")
website_element = row.find_element(*website_selector)
website = website_element.text.strip()
if company_name and website:
results.append({'name': company_name, 'website': website})
else:
logger.warning(f"Zeile {i+1}: Unvollständige Daten (Name: '{company_name}', Webseite: '{website}').")
except NoSuchElementException:
logger.warning(f"Zeile {i+1}: Ein erwartetes Element (Name oder Webseite) wurde nicht gefunden. Überspringe diese Zeile.")
continue
logger.info(f"{len(results)} Firmen erfolgreich extrahiert.")
return results
except Exception as e:
logger.error(f"Ein schwerwiegender Fehler ist bei der Extraktion der Ergebnisse aufgetreten: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return []
def close(self):
if self.driver:
logger.info("Schließe den WebDriver.")
self.driver.quit()
if __name__ == "__main__":
logger.info("Starte Dealfront Automatisierung - Phase 2a: Suche und Extraktion")
scraper = None
try:
scraper = DealfrontScraper()
if not scraper.driver:
raise Exception("WebDriver konnte nicht initialisiert werden.")
# === NEUER AUFRUF DER KOMBINIERTEN FUNKTION ===
if not scraper.login_and_navigate_to_target():
raise Exception("Login und Navigation fehlgeschlagen.")
# Suche laden
if not scraper.load_search(Config.TARGET_SEARCH_NAME):
raise Exception(f"Laden der Suche '{Config.TARGET_SEARCH_NAME}' fehlgeschlagen.")
# Ergebnisse extrahieren und ausgeben
companies = scraper.extract_current_page_results()
if companies:
df = pd.DataFrame(companies)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth', None)
print("\n" + "="*80)
print(" EXTRAHIERTE FIRMEN (ERSTE SEITE) ".center(80, "="))
print("="*80)
print(df.to_string(index=False))
print("="*80 + "\n")
else:
logger.warning("Keine Firmen auf der ersten Seite extrahiert oder gefunden.")
logger.info("Phase 2a Test erfolgreich abgeschlossen. Warte vor dem Schließen...")
time.sleep(10)
except Exception as e:
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=False)
finally:
if scraper:
scraper.close()
logger.info("Dealfront Automatisierung beendet.")