Files
Brancheneinstufung2/dealfront_enrichment.py

209 lines
10 KiB
Python

import os
import json
import time
import logging
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# ==============================================================================
# TEMPORÄRE, AUTARKE KONFIGURATION
# ==============================================================================
class TempConfig:
# --- Direkt hier definierte Werte, um config.py zu umgehen ---
DEALFRONT_LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management" # <-- BITTE AN IHRE SUCHE ANPASSEN
DEALFRONT_CREDENTIALS_FILE = "/app/dealfront_credentials.json"
# ==============================================================================
OUTPUT_DIR = "/app/output"
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s' # <-- DEFINITION HIER
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
logging.getLogger("selenium").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
log_filename = f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.txt"
log_filepath = os.path.join(OUTPUT_DIR, log_filename)
try:
file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(file_handler)
logger.info(f"Logging konfiguriert. Log-Datei: {log_filepath}")
except Exception as e:
logger.error(f"Konnte Log-Datei nicht erstellen: {e}")
class DealfrontScraper:
def __init__(self):
logger.info("Initialisiere den DealfrontScraper...")
chrome_options = ChromeOptions()
prefs = {"profile.managed_default_content_settings.images": 2}
chrome_options.add_experimental_option("prefs", prefs)
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
service = Service(executable_path='/usr/bin/chromedriver')
try:
self.driver = webdriver.Chrome(service=service, options=chrome_options)
except Exception as e:
logger.critical(f"WebDriver konnte nicht initialisiert werden.", exc_info=True)
raise
self.wait = WebDriverWait(self.driver, 30)
self.username, self.password = self._load_credentials()
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
try:
with open(TempConfig.DEALFRONT_CREDENTIALS_FILE, 'r') as f:
creds = json.load(f)
return creds.get("username"), creds.get("password")
except Exception as e:
logger.error(f"Credentials-Datei konnte nicht geladen werden: {e}")
return None, None
def _save_debug_artifacts(self):
# ... (Diese Methode bleibt unverändert) ...
try:
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S")
screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png")
html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html")
self.driver.save_screenshot(screenshot_filepath)
logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.")
with open(html_filepath, "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def login_and_find_list(self, search_name):
"""
Führt den gesamten Prozess vom Login bis zum Auffinden der Suchergebnisliste durch.
Diese Methode ist für Geschwindigkeit und Stabilität optimiert.
"""
try:
# Schritt 1: Login
logger.info(f"Navigiere zur Login-Seite: {TempConfig.DEALFRONT_LOGIN_URL}")
self.driver.get(TempConfig.DEALFRONT_LOGIN_URL)
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet.")
# Schritt 2: Navigation zur Target-Seite via Klick auf den Quick-Link
logger.info("Warte auf Dashboard und den 'Prospects finden' Quick-Link...")
prospects_link_selector = (By.XPATH, "//a[@data-test-target-product-tile]")
prospects_link = self.wait.until(EC.element_to_be_clickable(prospects_link_selector))
prospects_link.click()
logger.info("'Prospects finden' geklickt.")
# Schritt 3: Vordefinierte Suche laden
logger.info(f"Warte auf die Liste der Suchen und klicke auf '{search_name}'...")
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
self.wait.until(EC.element_to_be_clickable(search_item_selector)).click()
# Schritt 4: Erfolg verifizieren durch Warten auf die Tabelle
logger.info(f"Suche '{search_name}' geladen. Warte auf das Rendern der Ergebnistabelle.")
first_row_locator = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.visibility_of_element_located(first_row_locator))
logger.info("Zielseite mit Ergebnissen erfolgreich erreicht.")
return True
except Exception as e:
logger.critical(f"Der Prozess bis zum Laden der Liste ist fehlgeschlagen: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return False
def extract_current_page_results(self):
"""
Extrahiert Daten mit einem schnellen, direkten Ansatz, der Geister-Zeilen ignoriert.
"""
try:
logger.info("Extrahiere Ergebnisse mit dem finalen, direkten Selektor-Ansatz...")
results = []
# 1. Warten, bis die erste Daten-Zelle (Firmenname) sichtbar ist.
# Dies ist unser einziger Wartepunkt und bestätigt, dass die Daten geladen sind.
first_company_link_selector = (By.CSS_SELECTOR, "td.sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first_company_link_selector))
# 2. Finde ALLE Firmen-Links und ALLE Website-Links auf der Seite auf einmal.
# Das ist extrem schnell, da es nur zwei Suchbefehle an den Browser sind.
company_elements = self.driver.find_elements(By.CSS_SELECTOR, "td.sticky-column a.t-highlight-text")
website_elements = self.driver.find_elements(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text")
logger.info(f"{len(company_elements)} Firmennamen und {len(website_elements)} Webseiten-Elemente gefunden.")
# 3. Ordne die Ergebnisse anhand ihrer Reihenfolge im DOM zu.
# Wir gehen davon aus, dass die Anzahl übereinstimmt.
if not company_elements:
logger.warning("Keine Firmen mit dem angegebenen Selektor gefunden.")
self._save_debug_artifacts()
return []
for i in range(len(company_elements)):
company_name = company_elements[i].get_attribute("title").strip()
# Wir ordnen die Webseite anhand des Indexes zu.
# Wenn es weniger Webseiten als Firmen gibt, fangen wir das ab.
website = website_elements[i].text.strip() if i < len(website_elements) else "N/A"
results.append({'name': company_name, 'website': website})
logger.info(f"Extraktion abgeschlossen. {len(results)} Firmen verarbeitet.")
return results
except Exception as e:
logger.error(f"Schwerwiegender Fehler bei der Extraktion: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return []
def close(self):
if self.driver:
logger.info("Schließe den WebDriver.")
self.driver.quit()
if __name__ == "__main__":
logger.info("Starte Dealfront Automatisierung - DEBUG-MODUS")
scraper = None
try:
scraper = DealfrontScraper()
if not scraper.driver:
raise Exception("WebDriver konnte nicht initialisiert werden.")
if not scraper.login_and_find_list(TempConfig.TARGET_SEARCH_NAME):
raise Exception("Der Prozess vom Login bis zum Laden der Liste ist fehlgeschlagen.")
# In dieser Version gibt es keine handle_overlays Methode mehr
# scraper.handle_overlays()
companies = scraper.extract_current_page_results()
if companies:
df = pd.DataFrame(companies)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth', None)
print("\n" + "="*80)
print(" EXTRAHIERTE FIRMEN (ERSTE SEITE) ".center(80, "="))
print("="*80)
print(df.to_string(index=False))
print("="*80 + "\n")
else:
logger.warning("Obwohl die Seite geladen wurde, konnten keine Firmen extrahiert werden.")
logger.info("Test erfolgreich abgeschlossen. Warte vor dem Schließen...")
time.sleep(10)
except Exception as e:
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=False)
finally:
if scraper:
scraper.close()
logger.info("Dealfront Automatisierung beendet.")