Files
Brancheneinstufung2/dealfront_enrichment.py

269 lines
12 KiB
Python

import os
import json
import time
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.common.keys import Keys
import pandas as pd
from config import Config, DEALFRONT_LOGIN_URL, DEALFRONT_CREDENTIALS_FILE, DEALFRONT_TARGET_URL, TARGET_SEARCH_NAME
# Logging-Konfiguration
LOG_LEVEL = logging.DEBUG if Config.DEBUG else logging.INFO
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s'
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT, force=True)
logger = logging.getLogger(__name__)
OUTPUT_DIR = "/app/output"
class DealfrontScraper:
def __init__(self):
logger.info("Initialisiere DealfrontScraper und Chrome WebDriver.")
chrome_options = ChromeOptions()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
try:
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
except Exception as e:
logger.critical(f"WebDriver konnte nicht initialisiert werden. Fehler: {e}", exc_info=True)
self.driver = None
raise
self.wait = WebDriverWait(self.driver, 30) # Timeout auf 30s erhöht für mehr Stabilität
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
# (Diese Methode bleibt unverändert)
try:
with open(DEALFRONT_CREDENTIALS_FILE, 'r') as f:
creds = json.load(f)
username = creds.get("username")
password = creds.get("password")
if not username or "DEIN_DEALFRONT_BENUTZERNAME" in username or not password or "DEIN_DEALFRONT_PASSWORT" in password:
logger.error(f"Zugangsdaten in '{DEALFRONT_CREDENTIALS_FILE}' sind ungültig.")
return None, None
return username, password
except FileNotFoundError:
logger.error(f"Credentials-Datei nicht gefunden: '{DEALFRONT_CREDENTIALS_FILE}'")
return None, None
except json.JSONDecodeError:
logger.error(f"Fehler beim Parsen der Credentials-Datei: '{DEALFRONT_CREDENTIALS_FILE}'")
return None, None
def _save_debug_artifacts(self):
# (Diese Methode bleibt unverändert, aber mit neuem Namen)
try:
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S")
screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png")
self.driver.save_screenshot(screenshot_filepath)
logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.")
html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html")
with open(html_filepath, "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def login_and_navigate_to_target(self):
"""
Führt Login durch und navigiert zur Target-Seite via Klick auf den "Quick Link".
Dieser Ansatz ist maximal robust.
"""
if not self.driver:
return False
username, password = self._load_credentials()
if not username or not password:
return False
try:
# --- SCHRITT 1: LOGIN ---
logger.info(f"Navigiere zur Login-Seite: {DEALFRONT_LOGIN_URL}")
self.driver.get(DEALFRONT_LOGIN_URL)
email_field = self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "input[name='email']")))
email_field.send_keys(username)
password_field = self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "input[type='password']")))
password_field.send_keys(password)
login_button = self.wait.until(EC.element_to_be_clickable((By.XPATH, "//button[normalize-space()='Log in']")))
login_button.click()
# --- SCHRITT 2: NAVIGATION VIA QUICK-LINK-KACHEL ---
logger.info("Login-Befehl gesendet. Warte auf Dashboard und 'Prospects finden'-Link.")
# Dieser XPath zielt auf den Link in der "Quick links"-Kachel
prospects_link_selector = (By.XPATH, "//a[@data-test-target-product-tile]")
prospects_link = self.wait.until(EC.element_to_be_clickable(prospects_link_selector))
logger.info("'Prospects finden'-Link gefunden. Klicke darauf...")
prospects_link.click()
# --- SCHRITT 3: NAVIGATION VERIFIZIEREN ---
url_part_to_wait_for = "/t/prospector/"
logger.info(f"Warte, bis die Browser-URL '{url_part_to_wait_for}' enthält...")
self.wait.until(EC.url_contains(url_part_to_wait_for))
logger.info(f"URL-Wechsel bestätigt. Aktuelle URL: {self.driver.current_url}")
verification_target_selector = (By.XPATH, "//button[normalize-space()='+ Neue Suche']")
self.wait.until(EC.visibility_of_element_located(verification_target_selector))
logger.info("'Target'-Seite erfolgreich und vollständig geladen.")
return True
except Exception as e:
logger.critical(f"Login- oder Navigationsprozess fehlgeschlagen: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return False
def load_search(self, search_name):
# (Diese Methode bleibt vorerst unverändert)
try:
logger.info(f"Suche und lade die vordefinierte Suche: '{search_name}'")
search_link_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
search_link = self.wait.until(EC.element_to_be_clickable(search_link_selector))
search_link.click()
results_table_header_selector = (By.XPATH, "//th[normalize-space()='Firma']")
self.wait.until(EC.visibility_of_element_located(results_table_header_selector))
logger.info(f"Suche '{search_name}' erfolgreich geladen und Ergebnisse angezeigt.")
time.sleep(3)
return True
except Exception as e:
logger.critical(f"Laden der Suche '{search_name}' fehlgeschlagen: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return False
def extract_current_page_results(self):
"""
Extrahiert die Firmennamen und Webseiten von der aktuellen Ergebnisseite
unter Verwendung der verifizierten und präzisen CSS-Selektoren.
"""
try:
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...")
results = []
# 1. Finde alle Zeilen der Tabelle.
# Jede Zeile ist ein `<tr>`-Element mit einer einzigartigen ID.
rows_selector = (By.CSS_SELECTOR, "table#t-result-table tbody > tr[id]")
# Wir warten explizit, bis mindestens eine Zeile geladen ist.
self.wait.until(EC.presence_of_element_located(rows_selector))
time.sleep(3) # Kurze zusätzliche Pause, um sicherzustellen, dass JS die Tabelle vollständig rendert.
rows = self.driver.find_elements(*rows_selector)
if not rows:
logger.warning("Keine Ergebniszeilen (tr[id]) auf der Seite gefunden.")
self._save_debug_artifacts()
return []
logger.info(f"{len(rows)} Ergebniszeilen zur Verarbeitung gefunden.")
# 2. Iteriere durch jede Zeile und extrahiere die Daten.
for i, row in enumerate(rows):
company_name = ""
website = ""
try:
# --- KORRIGIERTER FIRMENNAMEN-SELEKTOR ---
company_name_selector = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
company_name_element = row.find_element(*company_name_selector)
# Wir holen uns das 'title'-Attribut, da der Text abgeschnitten sein könnte.
company_name = company_name_element.get_attribute("title").strip()
# --- KORRIGIERTER WEBSEITEN-SELEKTOR ---
website_selector = (By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text")
website_element = row.find_element(*website_selector)
website = website_element.text.strip()
if company_name and website:
results.append({'name': company_name, 'website': website})
else:
logger.warning(f"Zeile {i+1}: Unvollständige Daten (Name: '{company_name}', Webseite: '{website}').")
except NoSuchElementException:
logger.warning(f"Zeile {i+1}: Ein erwartetes Element (Name oder Webseite) wurde nicht gefunden. Überspringe diese Zeile.")
continue
logger.info(f"{len(results)} Firmen erfolgreich extrahiert.")
return results
except Exception as e:
logger.error(f"Ein schwerwiegender Fehler ist bei der Extraktion der Ergebnisse aufgetreten: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return []
def close(self):
if self.driver:
logger.info("Schließe den WebDriver.")
self.driver.quit()
if __name__ == "__main__":
logger.info("Starte Dealfront Automatisierung - Phase 2a: Suche und Extraktion")
scraper = None
try:
scraper = DealfrontScraper()
if not scraper.driver:
raise Exception("WebDriver konnte nicht initialisiert werden.")
# Führe den kombinierten Login- und Navigationsschritt aus
if not scraper.login_and_navigate_to_target():
raise Exception("Login und Navigation fehlgeschlagen.")
# Suche laden
if not scraper.load_search(Config.TARGET_SEARCH_NAME):
raise Exception(f"Laden der Suche '{Config.TARGET_SEARCH_NAME}' fehlgeschlagen.")
# Ergebnisse extrahieren
companies = scraper.extract_current_page_results()
# === NEUE, SAUBERE AUSGABE ===
if companies:
# Erstelle einen pandas DataFrame aus der Ergebnisliste
df = pd.DataFrame(companies)
# Konfiguriere pandas, um den vollen Text in den Spalten anzuzeigen
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth', None)
print("\n" + "="*80)
print(" EXTRAHIERTE FIRMEN (ERSTE SEITE) ".center(80, "="))
print("="*80)
if not df.empty:
print(df.to_string(index=False))
else:
print(" DataFrame ist leer, obwohl Ergebnisse vorhanden waren. Überprüfung nötig. ".center(80, "-"))
print("="*80 + "\n")
logger.info(f"{len(df)} Firmen erfolgreich in der Konsole ausgegeben.")
else:
logger.warning("Keine Firmen auf der ersten Seite extrahiert oder gefunden.")
logger.info("Phase 2a Test erfolgreich abgeschlossen. Warte vor dem Schließen...")
time.sleep(10)
except Exception as e:
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=False)
finally:
if scraper:
scraper.close()
logger.info("Dealfront Automatisierung beendet.")