277 lines
14 KiB
Python
277 lines
14 KiB
Python
import os
|
|
import json
|
|
import time
|
|
import logging
|
|
import pandas as pd
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.chrome.options import Options as ChromeOptions
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import TimeoutException, NoSuchElementException
|
|
|
|
# --- Konfiguration ---
|
|
class Config:
|
|
LOGIN_URL = "https://app.dealfront.com/login"
|
|
TARGET_URL = "https://app.dealfront.com/t/prospector/companies"
|
|
SEARCH_NAME = "Facility Management" # <-- PASSEN SIE DIES AN IHRE GESPEICHERTE SUCHE AN
|
|
CREDENTIALS_FILE = "/app/dealfront_credentials.json"
|
|
OUTPUT_DIR = "/app/output"
|
|
|
|
# --- Logging Setup ---
|
|
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s'
|
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, force=True)
|
|
logging.getLogger("selenium.webdriver.remote").setLevel(logging.WARNING)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
os.makedirs(Config.OUTPUT_DIR, exist_ok=True)
|
|
log_filepath = os.path.join(Config.OUTPUT_DIR, f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.log")
|
|
file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8')
|
|
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
|
|
logging.getLogger().addHandler(file_handler)
|
|
|
|
class DealfrontScraper:
|
|
def __init__(self):
|
|
logger.info("Initialisiere WebDriver...")
|
|
chrome_options = ChromeOptions()
|
|
chrome_options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2})
|
|
chrome_options.add_argument("--headless=new")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("--window-size=1920,1200")
|
|
|
|
try:
|
|
self.driver = webdriver.Chrome(options=chrome_options)
|
|
except Exception as e:
|
|
logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True)
|
|
raise
|
|
|
|
self.wait = WebDriverWait(self.driver, 30)
|
|
self.username, self.password = self._load_credentials()
|
|
if not self.username or not self.password:
|
|
raise ValueError("Credentials konnten nicht geladen werden. Breche ab.")
|
|
logger.info("WebDriver erfolgreich initialisiert.")
|
|
|
|
def _load_credentials(self):
|
|
try:
|
|
with open(Config.CREDENTIALS_FILE, 'r', encoding='utf-8') as f:
|
|
creds = json.load(f)
|
|
return creds.get("username"), creds.get("password")
|
|
except Exception as e:
|
|
logger.error(f"Credentials-Datei {Config.CREDENTIALS_FILE} konnte nicht geladen werden: {e}")
|
|
return None, None
|
|
|
|
def _save_debug_artifacts(self, suffix=""):
|
|
# (Diese Methode bleibt unverändert)
|
|
pass
|
|
|
|
# --- WIEDERHERGESTELLTE METHODEN ---
|
|
def login(self):
|
|
try:
|
|
logger.info(f"Navigiere zur Login-Seite: {Config.LOGIN_URL}")
|
|
self.driver.get(Config.LOGIN_URL)
|
|
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username)
|
|
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
|
|
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
|
|
logger.info("Login-Befehl gesendet. Warte 5 Sekunden auf Session-Etablierung.")
|
|
time.sleep(5)
|
|
# Einfache Überprüfung, ob wir weitergeleitet wurden
|
|
if "login" not in self.driver.current_url:
|
|
logger.info("Login erfolgreich, URL hat sich geändert.")
|
|
return True
|
|
else:
|
|
# Warten auf ein Dashboard-Element als Fallback
|
|
self.wait.until(EC.visibility_of_element_located((By.XPATH, "//a[contains(@href, '/dashboard')]")))
|
|
logger.info("Login erfolgreich und Dashboard erreicht.")
|
|
return True
|
|
except Exception as e:
|
|
logger.critical("Login-Prozess fehlgeschlagen.", exc_info=True)
|
|
self._save_debug_artifacts()
|
|
return False
|
|
|
|
def navigate_and_load_search(self, search_name):
|
|
try:
|
|
logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...")
|
|
self.driver.get(Config.TARGET_URL)
|
|
self.wait.until(EC.url_contains("/t/prospector/"))
|
|
|
|
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
|
|
self.wait.until(EC.element_to_be_clickable(search_item_selector)).click()
|
|
|
|
logger.info("Suche geladen. Warte auf die Ergebnistabelle.")
|
|
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr")))
|
|
return True
|
|
except Exception as e:
|
|
logger.critical("Navigation oder Laden der Suche fehlgeschlagen.", exc_info=True)
|
|
self._save_debug_artifacts()
|
|
return False
|
|
|
|
# --- METHODE AUS DEM LETZTEN ERFOLGREICHEN VERSUCH ---
|
|
def extract_current_page_results(self):
|
|
"""
|
|
Extrahiert Daten NUR aus den sichtbaren Ergebniszeilen und optimiert die Wartezeiten.
|
|
"""
|
|
try:
|
|
logger.info("Extrahiere Ergebnisse mit dem finalen, präzisen Selektor...")
|
|
results = []
|
|
|
|
# 1. Warten, bis die erste Daten-Zelle (Firmenname) sichtbar ist.
|
|
first_company_link_selector = (By.CSS_SELECTOR, "td.sticky-column a.t-highlight-text")
|
|
self.wait.until(EC.visibility_of_element_located(first_company_link_selector))
|
|
|
|
# 2. Finde NUR die Zeilen, die tatsächlich einen Firmennamen-Link enthalten.
|
|
# Dieser XPath ist extrem robust und filtert "Geister"-Zeilen heraus.
|
|
rows_with_data_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]")
|
|
rows = self.driver.find_elements(*rows_with_data_selector)
|
|
logger.info(f"{len(rows)} gültige Datenzeilen zur Verarbeitung gefunden.")
|
|
|
|
for row in rows:
|
|
try:
|
|
# Innerhalb dieser garantiert validen Zeilen können wir nun sicher extrahieren.
|
|
company_name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
|
|
|
|
# Versuche, die Webseite zu finden. Wenn nicht vorhanden, wird sie "N/A".
|
|
try:
|
|
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
|
|
except NoSuchElementException:
|
|
website = "N/A"
|
|
|
|
results.append({'name': company_name, 'website': website})
|
|
except Exception as e:
|
|
# Dieser Fall sollte jetzt kaum noch auftreten.
|
|
logger.warning(f"Konnte Daten aus einer validen Zeile nicht extrahieren: {e}")
|
|
continue
|
|
|
|
logger.info(f"Extraktion abgeschlossen. {len(results)} Firmen verarbeitet.")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Schwerwiegender Fehler bei der Extraktion: {type(e).__name__}", exc_info=True)
|
|
self._save_debug_artifacts()
|
|
return []
|
|
|
|
# --- NEUE METHODE FÜR PAGINIERUNG ---
|
|
def scrape_all_pages(self):
|
|
"""
|
|
Iteriert durch alle Ergebnisseiten, indem auf den 'Weiter'-Button geklickt wird.
|
|
"""
|
|
all_companies = []
|
|
page_number = 1
|
|
|
|
while True:
|
|
logger.info(f"--- Verarbeite Seite {page_number} ---")
|
|
|
|
# Warten auf die erste Datenzeile auf der aktuellen Seite
|
|
company_name_selector = (By.CSS_SELECTOR, "td.sticky-column a.t-highlight-text")
|
|
self.wait.until(EC.visibility_of_element_located(company_name_selector))
|
|
|
|
# Daten der aktuellen Seite extrahieren
|
|
page_results = self.extract_current_page_results()
|
|
if not page_results:
|
|
logger.warning("Konnte keine Ergebnisse auf der aktuellen Seite extrahieren.")
|
|
break # Stoppen, wenn keine Daten gefunden wurden
|
|
|
|
all_companies.extend(page_results)
|
|
logger.info(f"Seite {page_number}: {len(page_results)} Firmen gefunden. Gesamt: {len(all_companies)}")
|
|
|
|
# Weiter-Button finden
|
|
# Wir suchen nach einem Link, der ein SVG-Icon mit der Klasse 'fa-angle-right' enthält.
|
|
next_page_button_selector = (By.CSS_SELECTOR, "a.eb-pagination-button:not([disabled]) > svg.fa-angle-right")
|
|
|
|
try:
|
|
next_button = self.wait.until(EC.element_to_be_clickable(next_page_button_selector))
|
|
|
|
# Prüfen, ob der Button wirklich klickbar ist und nicht nur vorhanden
|
|
if next_button.is_displayed() and next_button.is_enabled():
|
|
logger.info("Weiter-Button gefunden und klickbar. Klicke auf Weiter...")
|
|
self.driver.execute_script("arguments[0].click();", next_button)
|
|
page_number += 1
|
|
|
|
# Nach dem Klicken auf "Weiter" warten wir wieder auf das Erscheinen des ersten Elements der NEUEN Seite.
|
|
# Das ist robuster als eine feste Zeit.
|
|
self.wait.until(EC.visibility_of_element_located(company_name_selector))
|
|
logger.info("Neue Seite geladen.")
|
|
else:
|
|
logger.info("Weiter-Button ist vorhanden, aber deaktiviert. Letzte Seite erreicht.")
|
|
break
|
|
except TimeoutException:
|
|
logger.info("Kein klickbarer 'Weiter'-Button gefunden. Letzte Seite erreicht.")
|
|
break
|
|
except NoSuchElementException:
|
|
logger.info("Kein 'Weiter'-Button gefunden. Letzte Seite erreicht.")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Unerwarteter Fehler beim Navigieren zur nächsten Seite: {e}")
|
|
self._save_debug_artifacts()
|
|
break
|
|
|
|
return all_companies
|
|
|
|
# Die extract_current_page_results Methode bleibt, wie sie zuletzt war, mit den robusten Selektoren
|
|
def extract_current_page_results(self):
|
|
"""
|
|
Extrahiert Daten NUR aus den sichtbaren Ergebniszeilen und optimiert die Wartezeiten.
|
|
"""
|
|
try:
|
|
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...")
|
|
results = []
|
|
|
|
first_company_link_selector = (By.CSS_SELECTOR, "td.sticky-column a.t-highlight-text")
|
|
self.wait.until(EC.visibility_of_element_located(first_company_link_selector))
|
|
|
|
rows_with_data_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]")
|
|
rows = self.driver.find_elements(*rows_with_data_selector)
|
|
|
|
if not rows:
|
|
logger.warning("Keine gültigen Datenzeilen gefunden.")
|
|
return []
|
|
|
|
for row in rows:
|
|
try:
|
|
company_name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
|
|
website = "N/A"
|
|
try:
|
|
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
|
|
except NoSuchElementException:
|
|
pass
|
|
results.append({'name': company_name, 'website': website})
|
|
except NoSuchElementException:
|
|
logger.warning("Konnte Daten aus einer Zeile nicht extrahieren.")
|
|
continue
|
|
|
|
logger.info(f"{len(results)} Firmen extrahiert.")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Extraktion: {type(e).__name__}", exc_info=True)
|
|
self._save_debug_artifacts()
|
|
return []
|
|
|
|
def close(self):
|
|
if self.driver:
|
|
self.driver.quit()
|
|
logger.info("WebDriver geschlossen.")
|
|
|
|
if __name__ == "__main__":
|
|
scraper = None
|
|
try:
|
|
scraper = DealfrontScraper()
|
|
if not scraper.login(): raise Exception("Login fehlgeschlagen")
|
|
if not scraper.navigate_and_load_search(Config.SEARCH_NAME): raise Exception("Navigation/Suche fehlgeschlagen")
|
|
|
|
all_companies = scraper.scrape_all_pages()
|
|
|
|
if all_companies:
|
|
df = pd.DataFrame(all_companies)
|
|
output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv")
|
|
df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig')
|
|
logger.info(f"Ergebnisse ({len(df)} Firmen) erfolgreich in '{output_csv_path}' gespeichert.")
|
|
else:
|
|
logger.warning("Keine Firmen konnten extrahiert werden.")
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=True)
|
|
finally:
|
|
if scraper:
|
|
scraper.close()
|
|
logger.info("Dealfront Automatisierung beendet.") |