Files
Brancheneinstufung2/dealfront_enrichment.py

255 lines
12 KiB
Python

import os
import json
import time
import logging
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# --- Konfiguration ---
class Config:
LOGIN_URL = "https://app.dealfront.com/login"
TARGET_URL = "https://app.dealfront.com/t/prospector/companies"
SEARCH_NAME = "Facility Management" # <-- PASSEN SIE DIES AN IHRE GESPEICHERTE SUCHE AN
CREDENTIALS_FILE = "/app/dealfront_credentials.json"
OUTPUT_DIR = "/app/output"
# --- Logging Setup ---
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
logging.getLogger("selenium.webdriver.remote").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
os.makedirs(Config.OUTPUT_DIR, exist_ok=True)
log_filepath = os.path.join(Config.OUTPUT_DIR, f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.log")
file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8')
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(file_handler)
class DealfrontScraper:
def __init__(self):
logger.info("Initialisiere WebDriver...")
chrome_options = ChromeOptions()
chrome_options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2})
chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1200")
try:
self.driver = webdriver.Chrome(options=chrome_options)
except Exception as e:
logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True)
raise
self.wait = WebDriverWait(self.driver, 30)
self.username, self.password = self._load_credentials()
if not self.username or not self.password:
raise ValueError("Credentials konnten nicht geladen werden. Breche ab.")
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
try:
with open(Config.CREDENTIALS_FILE, 'r', encoding='utf-8') as f:
creds = json.load(f)
return creds.get("username"), creds.get("password")
except Exception as e:
logger.error(f"Credentials-Datei {Config.CREDENTIALS_FILE} konnte nicht geladen werden: {e}")
return None, None
def _save_debug_artifacts(self, suffix=""):
try:
timestamp = time.strftime("%Y%m%d-%H%M%S")
filename_base = os.path.join(Config.OUTPUT_DIR, f"error_{suffix}_{timestamp}")
self.driver.save_screenshot(f"{filename_base}.png")
with open(f"{filename_base}.html", "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"Debug-Artefakte gespeichert: {filename_base}.*")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def login(self):
try:
logger.info(f"Navigiere zur Login-Seite: {Config.LOGIN_URL}")
self.driver.get(Config.LOGIN_URL)
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet. Warte 5 Sekunden auf Session-Etablierung.")
time.sleep(5)
# Verifizierung, dass wir nicht mehr auf der Login-Seite sind
if "login" not in self.driver.current_url:
logger.info("Login erfolgreich, URL hat sich geändert.")
return True
self._save_debug_artifacts("login_stuck")
return False
except Exception as e:
logger.critical("Login-Prozess fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts("login_exception")
return False
def navigate_and_load_search(self, search_name):
try:
logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...")
self.driver.get(Config.TARGET_URL)
self.wait.until(EC.url_contains("/t/prospector/"))
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
self.wait.until(EC.element_to_be_clickable(search_item_selector)).click()
logger.info("Suche geladen. Warte auf das Rendern der Ergebnistabelle.")
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr")))
return True
except Exception as e:
logger.critical("Navigation oder Laden der Suche fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts("navigation_or_search_load")
return False
def extract_current_page_results(self):
results = []
rows_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]")
try:
data_rows = self.driver.find_elements(*rows_selector)
for row in data_rows:
try:
name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
website = "N/A"
try:
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
except NoSuchElementException:
pass
results.append({'name': name, 'website': website})
except NoSuchElementException:
continue
except Exception as e:
logger.error(f"Fehler bei der Extraktion auf der aktuellen Seite: {e}")
return results
def scrape_all_pages(self, max_pages=10):
"""
Iteriert durch alle Ergebnisseiten, wartet nach jedem Paginieren explizit auf neue Daten,
und sammelt alle Firmenzeilen seitengetreu (keine Deduplizierung).
"""
all_companies = []
for page_number in range(1, max_pages + 1):
logger.info(f"--- Verarbeite Seite {page_number} ---")
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table")))
# Vor dem Klick: Erste Zeilen-ID und Text merken
try:
first_row_element = self.driver.find_element(By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
first_row_id = first_row_element.get_attribute("id")
first_row_text = first_row_element.text.strip()
logger.info(f"Vor Pagination: Erste Zeilen-ID: {first_row_id}, Text: '{first_row_text}'")
except Exception as e:
first_row_id = None
first_row_text = None
logger.warning(f"Vor Pagination: Keine Zeilen-ID gefunden: {e}")
# Extrahiere aktuelle Seite
page_results = self.extract_current_page_results()
logger.info(f"Seite {page_number}: {len(page_results)} Firmen gefunden. Gesamt bisher: {len(all_companies) + len(page_results)}")
all_companies.extend(page_results)
# Pagination-Buttons loggen
try:
pagination_nav = self.driver.find_element(By.CSS_SELECTOR, "nav.eb-pagination")
buttons = pagination_nav.find_elements(By.CSS_SELECTOR, "a.eb-pagination-button")
logger.info(f"Gefundene Paginierungs-Buttons auf Seite {page_number}: {len(buttons)}")
for idx, btn in enumerate(buttons):
btn_text = btn.text.strip()
btn_classes = btn.get_attribute('class')
btn_html = btn.get_attribute('outerHTML')
has_svg = "svg" in btn_html
logger.info(f"Button {idx}: Text='{btn_text}', Klassen='{btn_classes}', SVG={has_svg}, HTML={btn_html}")
except Exception as e:
logger.warning(f"Konnte Pagination-Buttons nicht vollständig loggen: {e}")
buttons = []
# Suche nach dem Weiter-Button
next_button = None
for i, r in enumerate(all_rows):
html_snippet = r.get_attribute('outerHTML')[:120].replace('\n', ' ')
logger.info(f"Zeile {i}: Text='{r.text.strip()}', HTML-Start={html_snippet}...")
if ("svg" in btn_html) and ("disabled" not in btn.get_attribute('class')) and (btn.text.strip() == ""):
next_button = btn
logger.info(f"Als Weiter-Button erkannt: Button {idx}")
break
if not next_button:
logger.info("Kein klickbarer 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.")
break
logger.info("Klicke auf 'Weiter'-Button...")
try:
# Button ins Sichtfeld scrollen!
self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", next_button)
time.sleep(0.5) # Kurze Pause, damit das Scrollen abgeschlossen ist
# Button wirklich klicken
self.driver.execute_script("arguments[0].click();", next_button)
logger.info("Klick auf Weiter-Button ausgeführt.")
# Warte auf neue Seite: Änderung der ID ODER Änderung des Texts der ersten Zeile
def page_changed(driver):
try:
except Exception as e:
logger.error(f"Fehler beim Klicken auf den Weiter-Button oder beim Warten auf neue Seite: {e}")
# Screenshot und HTML speichern
try:
timestamp = time.strftime("%Y%m%d-%H%M%S")
self.driver.save_screenshot(f"/app/output/pagination_error_{timestamp}.png")
with open(f"/app/output/pagination_error_{timestamp}.html", "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.info(f"Screenshot und HTML der Seite nach Pagination-Fehler gespeichert.")
except Exception as ee:
logger.error(f"Fehler beim Speichern von Screenshot/HTML: {ee}")
# Zusätzlich: Logge alle Zeilen nach dem Fehler (nur Text und HTML-Start)
try:
all_rows = self.driver.find_elements(By.CSS_SELECTOR, "table#t-result-table tbody tr")
except Exception as eee:
logger.error(f"Fehler beim Finden der Tabellenzeilen nach Fehler: {eee}")
all_rows = []
logger.info(f"Nach Pagination-Fehler: Es sind {len(all_rows)} Zeilen im DOM.")
for i, r in enumerate(all_rows):
html_snippet = r.get_attribute('outerHTML')[:120].replace('\n', ' ')
logger.info(f"Zeile {i}: Text='{r.text.strip()}', HTML-Start={html_snippet}...")
break
return all_companies
def close(self):
if self.driver:
self.driver.quit()
if __name__ == "__main__":
scraper = None
try:
scraper = DealfrontScraper()
if not scraper.login(): raise Exception("Login fehlgeschlagen")
if not scraper.navigate_and_load_search(Config.SEARCH_NAME): raise Exception("Navigation/Suche fehlgeschlagen")
all_companies = scraper.scrape_all_pages(max_pages=6) # Limitiere auf 6 Seiten
if all_companies:
df = pd.DataFrame(all_companies)
output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv")
df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig')
logger.info(f"Ergebnisse ({len(df)} Firmen) erfolgreich in '{output_csv_path}' gespeichert.")
else:
logger.warning("Keine Firmen konnten extrahiert werden.")
except Exception as e:
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=True)
finally:
if scraper:
scraper.close()
logger.info("Dealfront Automatisierung beendet.")