Files
Brancheneinstufung2/dealfront_enrichment.py

211 lines
10 KiB
Python

import os
import json
import time
import logging
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# --- Konfiguration ---
class Config:
LOGIN_URL = "https://app.dealfront.com/login"
TARGET_URL = "https://app.dealfront.com/t/prospector/companies"
SEARCH_NAME = "Facility Management" # <-- PASSEN SIE DIES AN IHRE GESPEICHERTE SUCHE AN
CREDENTIALS_FILE = "/app/dealfront_credentials.json"
OUTPUT_DIR = "/app/output"
# --- Logging Setup ---
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
logging.getLogger("selenium.webdriver.remote").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
os.makedirs(Config.OUTPUT_DIR, exist_ok=True)
log_filepath = os.path.join(Config.OUTPUT_DIR, f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.log")
file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8')
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(file_handler)
class DealfrontScraper:
def __init__(self):
logger.info("Initialisiere WebDriver...")
chrome_options = ChromeOptions()
chrome_options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2})
chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1200")
try:
self.driver = webdriver.Chrome(options=chrome_options)
except Exception as e:
logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True)
raise
self.wait = WebDriverWait(self.driver, 30)
self.username, self.password = self._load_credentials()
if not self.username or not self.password:
raise ValueError("Credentials konnten nicht geladen werden. Breche ab.")
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
try:
with open(Config.CREDENTIALS_FILE, 'r', encoding='utf-8') as f:
creds = json.load(f)
return creds.get("username"), creds.get("password")
except Exception as e:
logger.error(f"Credentials-Datei {Config.CREDENTIALS_FILE} konnte nicht geladen werden: {e}")
return None, None
def _save_debug_artifacts(self, suffix=""):
try:
timestamp = time.strftime("%Y%m%d-%H%M%S")
filename_base = os.path.join(Config.OUTPUT_DIR, f"error_{suffix}_{timestamp}")
self.driver.save_screenshot(f"{filename_base}.png")
with open(f"{filename_base}.html", "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"Debug-Artefakte gespeichert: {filename_base}.*")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def login(self):
try:
logger.info(f"Navigiere zur Login-Seite: {Config.LOGIN_URL}")
self.driver.get(Config.LOGIN_URL)
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet. Warte 5 Sekunden auf Session-Etablierung.")
time.sleep(5)
# Verifizierung, dass wir nicht mehr auf der Login-Seite sind
if "login" not in self.driver.current_url:
logger.info("Login erfolgreich, URL hat sich geändert.")
return True
self._save_debug_artifacts("login_stuck")
return False
except Exception as e:
logger.critical("Login-Prozess fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts("login_exception")
return False
def navigate_and_load_search(self, search_name):
try:
logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...")
self.driver.get(Config.TARGET_URL)
self.wait.until(EC.url_contains("/t/prospector/"))
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
self.wait.until(EC.element_to_be_clickable(search_item_selector)).click()
logger.info("Suche geladen. Warte auf das Rendern der Ergebnistabelle.")
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr")))
return True
except Exception as e:
logger.critical("Navigation oder Laden der Suche fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts("navigation_or_search_load")
return False
def extract_current_page_results(self):
results = []
rows_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]")
data_rows = self.driver.find_elements(*rows_selector)
for row in data_rows:
try:
name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
website = "N/A"
try:
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
except NoSuchElementException:
pass
results.append({'name': name, 'website': website})
except NoSuchElementException:
continue
return results
def scrape_all_pages(self, max_pages=10): # Sicherheitslimit für Seiten
"""
Iteriert durch alle Ergebnisseiten, indem auf den 'Weiter'-Button geklickt wird.
Scrollt vor jeder Extraktion, um alle Elemente zu laden.
"""
all_companies = {}
for page_number in range(1, max_pages + 1):
logger.info(f"--- Verarbeite Seite {page_number} ---")
# 1. Warten, bis die Tabelle grundsätzlich geladen ist
table_selector = (By.CSS_SELECTOR, "table#t-result-table tbody")
self.wait.until(EC.visibility_of_element_located(table_selector))
# 2. SCROLLEN: Scrolle die Seite nach unten, um alle Zeilen zu laden
logger.info("Scrolle nach unten, um alle Zeilen der Seite zu laden...")
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2) # Kurze Pause, damit die neuen Zeilen rendern können
self.driver.execute_script("window.scrollTo(0, 0);") # Optional: wieder nach oben scrollen
time.sleep(1)
# 3. DATEN EXTRAHIEREN
page_results = self.extract_current_page_results()
if not page_results:
logger.info("Keine Ergebnisse auf dieser Seite extrahiert. Breche ab.")
break
for company in page_results:
# Verwende eine Kombination aus Name und Webseite als eindeutigen Schlüssel
unique_key = (company.get('name'), company.get('website'))
if unique_key not in all_companies:
all_companies[unique_key] = company
logger.info(f"Seite {page_number}: {len(page_results)} Firmen gefunden. Gesamt einzigartig: {len(all_companies)}")
# 4. PAGINIERUNG
try:
# Robusterer Selektor für den "Weiter"-Button
next_button_selector = (By.XPATH, "//a[contains(@class, 'eb-pagination-button') and not(contains(@class, 'disabled'))][last()]")
next_button = self.driver.find_element(*next_button_selector)
# Prüfen, ob der gefundene Button der "Weiter"-Button ist (enthält Pfeil nach rechts)
if "fa-angle-right" not in next_button.get_attribute('innerHTML'):
logger.info("Letzte Seite erreicht (kein 'Weiter'-Pfeil im letzten Button).")
break
logger.info("Klicke auf 'Weiter'...")
self.driver.execute_script("arguments[0].click();", next_button)
time.sleep(4) # Feste Wartezeit für den Seitenwechsel
except NoSuchElementException:
logger.info("Kein klickbarer 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.")
break
return list(all_companies.values())
def close(self):
if self.driver:
self.driver.quit()
if __name__ == "__main__":
scraper = None
try:
scraper = DealfrontScraper()
if not scraper.login(): raise Exception("Login fehlgeschlagen")
if not scraper.navigate_and_load_search(Config.SEARCH_NAME): raise Exception("Navigation/Suche fehlgeschlagen")
all_companies = scraper.scrape_all_pages()
if all_companies:
df = pd.DataFrame(all_companies)
output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv")
df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig')
logger.info(f"Ergebnisse ({len(df)} Firmen) erfolgreich in '{output_csv_path}' gespeichert.")
else:
logger.warning("Keine Firmen konnten extrahiert werden.")
except Exception as e:
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=True)
finally:
if scraper:
scraper.close()
logger.info("Dealfront Automatisierung beendet.")