Files
Brancheneinstufung2/dealfront_enrichment.py

278 lines
13 KiB
Python

import os
import json
import time
import logging
import tempfile
import shutil
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# --- Konfiguration ---
class Config:
LOGIN_URL = "https://app.dealfront.com/login"
TARGET_URL = "https://app.dealfront.com/t/prospector/companies"
SEARCH_NAME = "Facility Management" # <-- PASSEN SIE DIES AN IHRE GESPEICHERTE SUCHE AN
CREDENTIALS_FILE = "/app/dealfront_credentials.json"
OUTPUT_DIR = "/app/output"
# --- Logging Setup ---
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
logging.getLogger("selenium.webdriver.remote").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
os.makedirs(Config.OUTPUT_DIR, exist_ok=True)
log_filepath = os.path.join(Config.OUTPUT_DIR, f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.log")
file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8')
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(file_handler)
class DealfrontScraper:
def __init__(self):
logger.info("Initialisiere WebDriver...")
chrome_options = ChromeOptions()
chrome_options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2})
# chrome_options.add_argument("--headless=new") # Headless DEAKTIVIERT für Debugging!
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1200")
# TEMP DIR für User Data (verhindert Konflikte)
self._tmpdir = tempfile.mkdtemp()
chrome_options.add_argument(f"--user-data-dir={self._tmpdir}")
try:
self.driver = webdriver.Chrome(options=chrome_options)
except Exception as e:
logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True)
shutil.rmtree(self._tmpdir, ignore_errors=True)
raise
self.wait = WebDriverWait(self.driver, 30)
self.username, self.password = self._load_credentials()
if not self.username or not self.password:
raise ValueError("Credentials konnten nicht geladen werden. Breche ab.")
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
try:
with open(Config.CREDENTIALS_FILE, 'r', encoding='utf-8') as f:
creds = json.load(f)
return creds.get("username"), creds.get("password")
except Exception as e:
logger.error(f"Credentials-Datei {Config.CREDENTIALS_FILE} konnte nicht geladen werden: {e}")
return None, None
def _save_debug_artifacts(self, suffix=""):
try:
timestamp = time.strftime("%Y%m%d-%H%M%S")
filename_base = os.path.join(Config.OUTPUT_DIR, f"error_{suffix}_{timestamp}")
self.driver.save_screenshot(f"{filename_base}.png")
with open(f"{filename_base}.html", "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"Debug-Artefakte gespeichert: {filename_base}.*")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def login(self):
try:
logger.info(f"Navigiere zur Login-Seite: {Config.LOGIN_URL}")
self.driver.get(Config.LOGIN_URL)
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet. Warte 5 Sekunden auf Session-Etablierung.")
time.sleep(5)
if "login" not in self.driver.current_url:
logger.info("Login erfolgreich, URL hat sich geändert.")
return True
self._save_debug_artifacts("login_stuck")
return False
except Exception as e:
logger.critical("Login-Prozess fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts("login_exception")
return False
def scroll_table_slowly(self, steps=10, pause=0.3):
"""
Scrollt die Tabelle in mehreren Schritten langsam nach unten,
damit bei Virtualisierung/Lazy Rendering alle Zeilen geladen werden.
"""
try:
table = self.driver.find_element(By.CSS_SELECTOR, "table#t-result-table")
table_height = table.size['height']
for i in range(steps):
y = int(table_height * (i + 1) / steps)
self.driver.execute_script("arguments[0].scrollTop = arguments[1];", table, y)
time.sleep(pause)
logger.info("Tabelle langsam nach unten gescrollt.")
except Exception as e:
logger.warning(f"Fehler beim langsamen Scrollen: {e}")
def navigate_and_load_search(self, search_name):
try:
logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...")
self.driver.get(Config.TARGET_URL)
self.wait.until(EC.url_contains("/t/prospector/"))
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
self.wait.until(EC.element_to_be_clickable(search_item_selector)).click()
logger.info("Suche geladen. Warte auf das Rendern der Ergebnistabelle.")
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr")))
return True
except Exception as e:
logger.critical("Navigation oder Laden der Suche fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts("navigation_or_search_load")
return False
def extract_visible_firmennamen_js(self):
"""
Extrahiert die sichtbaren Firmennamen und Websites direkt per JavaScript aus der Tabelle.
"""
script = """
let rows = document.querySelectorAll('table#t-result-table tbody tr');
let result = [];
for (let row of rows) {
let nameElem = row.querySelector('.sticky-column a.t-highlight-text');
let websiteElem = row.querySelector('a.text-gray-400.t-highlight-text');
if (nameElem) {
result.push({
name: nameElem.getAttribute('title') || nameElem.innerText,
website: websiteElem ? websiteElem.innerText : ''
});
}
}
return result;
"""
return self.driver.execute_script("return " + script)
def scrape_all_pages(self, max_pages=10):
all_companies = []
previous_first_name = None
for page_number in range(1, max_pages + 1):
logger.info(f"--- Verarbeite Seite {page_number} ---")
try:
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table")))
except TimeoutException:
logger.error("Ergebnistabelle wurde nicht geladen. Breche ab.")
break
logger.info("Warte 5 Sekunden, um sicherzugehen, dass alle Daten geladen sind...")
time.sleep(5)
# Scroll an den Anfang und dann langsam nach unten
self.driver.execute_script("window.scrollTo(0, 0);")
time.sleep(0.5)
self.scroll_table_slowly()
logger.info("Warte nach Scrollen nochmals 2 Sekunden...")
time.sleep(2)
# Jetzt per JS extrahieren
page_results = self.extract_visible_firmennamen_js()
for r in page_results:
r['page'] = page_number
logger.info(f"Seite {page_number}: {len(page_results)} Firmen gefunden. Erste Firmen: {[r['name'] for r in page_results[:3]]}")
all_companies.extend(page_results)
# Pagination-Buttons loggen und Weiter-Button suchen
try:
pagination_nav = self.driver.find_element(By.CSS_SELECTOR, "nav.eb-pagination")
buttons = pagination_nav.find_elements(By.CSS_SELECTOR, "a.eb-pagination-button")
logger.info(f"Gefundene Paginierungs-Buttons auf Seite {page_number}: {len(buttons)}")
for idx, btn in enumerate(buttons):
btn_text = btn.text.strip()
btn_classes = btn.get_attribute('class')
btn_html = btn.get_attribute('outerHTML')
has_svg = "svg" in btn_html
logger.info(f"Button {idx}: Text='{btn_text}', Klassen='{btn_classes}', SVG={has_svg}, HTML-Start={btn_html[:120]}...")
except NoSuchElementException:
logger.warning("Keine Pagination-Buttons gefunden.")
buttons = []
next_button = None
for idx, btn in enumerate(buttons):
btn_html = btn.get_attribute('outerHTML')
btn_text = btn.text.strip()
btn_classes = btn.get_attribute('class')
has_svg = "svg" in btn_html
is_disabled = "disabled" in btn_classes
if has_svg and not is_disabled and btn_text == "":
next_button = btn
logger.info(f"Als Weiter-Button erkannt: Button {idx}")
break
if not next_button:
logger.info("Kein klickbarer 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.")
break
logger.info("Klicke auf 'Weiter'-Button...")
try:
self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", next_button)
time.sleep(0.5)
self.driver.execute_script("arguments[0].click();", next_button)
logger.info("Klick auf Weiter-Button ausgeführt.")
# Warte auf Änderung des ersten Firmennamens
if page_results:
previous_first_name = page_results[0]['name']
else:
previous_first_name = ""
def page_changed(driver):
try:
name = driver.execute_script("""
let row = document.querySelector('table#t-result-table tbody tr');
if (!row) return '';
let nameElem = row.querySelector('.sticky-column a.t-highlight-text');
return nameElem ? (nameElem.getAttribute('title') || nameElem.innerText) : '';
""")
return name and name != previous_first_name
except Exception:
return False
self.wait.until(page_changed)
logger.info("Seitenwechsel erfolgreich verifiziert (erster Firmenname hat sich geändert).")
except Exception as e:
logger.error(f"Fehler beim Klicken auf den Weiter-Button oder beim Warten auf neue Seite: {e}")
try:
timestamp = time.strftime("%Y%m%d-%H%M%S")
self.driver.save_screenshot(f"/app/output/pagination_error_{timestamp}.png")
with open(f"/app/output/pagination_error_{timestamp}.html", "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.info(f"Screenshot und HTML der Seite nach Pagination-Fehler gespeichert.")
except Exception as ee:
logger.error(f"Fehler beim Speichern von Screenshot/HTML: {ee}")
break
return all_companies
def close(self):
if hasattr(self, "driver") and self.driver:
self.driver.quit()
if hasattr(self, "_tmpdir"):
shutil.rmtree(self._tmpdir, ignore_errors=True)
if __name__ == "__main__":
scraper = None
try:
scraper = DealfrontScraper()
if not scraper.login(): raise Exception("Login fehlgeschlagen")
if not scraper.navigate_and_load_search(Config.SEARCH_NAME): raise Exception("Navigation/Suche fehlgeschlagen")
all_companies = scraper.scrape_all_pages(max_pages=6) # Limitiere auf 6 Seiten
if all_companies:
df = pd.DataFrame(all_companies)
output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv")
df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig')
logger.info(f"Ergebnisse ({len(df)} Firmen) erfolgreich in '{output_csv_path}' gespeichert.")
else:
logger.warning("Keine Firmen konnten extrahiert werden.")
except Exception as e:
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=True)
finally:
if scraper:
scraper.close()
logger.info("Dealfront Automatisierung beendet.")