314 lines
15 KiB
Python
314 lines
15 KiB
Python
import os
|
|
import json
|
|
import time
|
|
import logging
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.chrome.options import Options as ChromeOptions
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import TimeoutException, NoSuchElementException
|
|
from selenium.webdriver.common.keys import Keys
|
|
import pandas as pd
|
|
|
|
from config import Config, DEALFRONT_LOGIN_URL, DEALFRONT_CREDENTIALS_FILE, DEALFRONT_TARGET_URL, TARGET_SEARCH_NAME
|
|
OUTPUT_DIR = "/app/output"
|
|
|
|
# Logging-Konfiguration
|
|
LOG_LEVEL = logging.DEBUG if Config.DEBUG else logging.INFO
|
|
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s'
|
|
|
|
# Root-Logger konfigurieren (gilt für alle Bibliotheken)
|
|
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT, force=True)
|
|
|
|
# Logger für Selenium auf INFO setzen, um Spam zu vermeiden
|
|
logging.getLogger("selenium").setLevel(logging.INFO)
|
|
|
|
# Logger für unser eigenes Skript
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# FileHandler hinzufügen, um in eine Datei zu loggen
|
|
log_filename = f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.log"
|
|
log_filepath = os.path.join(OUTPUT_DIR, log_filename)
|
|
file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8')
|
|
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
|
|
logging.getLogger().addHandler(file_handler)
|
|
|
|
OUTPUT_DIR = "/app/output"
|
|
|
|
class DealfrontScraper:
|
|
def __init__(self):
|
|
logger.info("Initialisiere DealfrontScraper und Chrome WebDriver.")
|
|
chrome_options = ChromeOptions()
|
|
|
|
# NEU: Lade-Optimierungen
|
|
prefs = {"profile.managed_default_content_settings.images": 2}
|
|
chrome_options.add_experimental_option("prefs", prefs)
|
|
|
|
chrome_options.add_argument("--headless")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("--window-size=1920,1080")
|
|
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
|
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
|
chrome_options.add_experimental_option('useAutomationExtension', False)
|
|
|
|
try:
|
|
self.driver = webdriver.Chrome(options=chrome_options)
|
|
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
|
except Exception as e:
|
|
logger.critical(f"WebDriver konnte nicht initialisiert werden. Fehler: {e}", exc_info=True)
|
|
self.driver = None
|
|
raise
|
|
|
|
self.wait = WebDriverWait(self.driver, 30)
|
|
logger.info("WebDriver erfolgreich initialisiert (Bild-Laden deaktiviert).")
|
|
|
|
def _load_credentials(self):
|
|
# (Diese Methode bleibt unverändert)
|
|
try:
|
|
with open(DEALFRONT_CREDENTIALS_FILE, 'r') as f:
|
|
creds = json.load(f)
|
|
username = creds.get("username")
|
|
password = creds.get("password")
|
|
if not username or "DEIN_DEALFRONT_BENUTZERNAME" in username or not password or "DEIN_DEALFRONT_PASSWORT" in password:
|
|
logger.error(f"Zugangsdaten in '{DEALFRONT_CREDENTIALS_FILE}' sind ungültig.")
|
|
return None, None
|
|
return username, password
|
|
except FileNotFoundError:
|
|
logger.error(f"Credentials-Datei nicht gefunden: '{DEALFRONT_CREDENTIALS_FILE}'")
|
|
return None, None
|
|
except json.JSONDecodeError:
|
|
logger.error(f"Fehler beim Parsen der Credentials-Datei: '{DEALFRONT_CREDENTIALS_FILE}'")
|
|
return None, None
|
|
|
|
def _save_debug_artifacts(self):
|
|
# (Diese Methode bleibt unverändert, aber mit neuem Namen)
|
|
try:
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
timestamp = time.strftime("%Y%m%d-%H%M%S")
|
|
|
|
screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png")
|
|
self.driver.save_screenshot(screenshot_filepath)
|
|
logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.")
|
|
|
|
html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html")
|
|
with open(html_filepath, "w", encoding="utf-8") as f:
|
|
f.write(self.driver.page_source)
|
|
logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.")
|
|
except Exception as e:
|
|
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
|
|
|
|
def login_and_navigate_to_target(self):
|
|
"""
|
|
Führt den Login durch und klickt dann auf dem Dashboard auf den
|
|
"Quick Link", um zur Target-Seite zu gelangen.
|
|
"""
|
|
if not self.driver: return False
|
|
username, password = self._load_credentials()
|
|
if not username or not password: return False
|
|
|
|
try:
|
|
# SCHRITT 1: LOGIN
|
|
logger.info(f"Navigiere zur Login-Seite: {DEALFRONT_LOGIN_URL}")
|
|
self.driver.get(DEALFRONT_LOGIN_URL)
|
|
|
|
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(username)
|
|
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(password)
|
|
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
|
|
logger.info("Login-Befehl gesendet.")
|
|
|
|
# SCHRITT 2: AUF QUICK-LINK-KACHEL WARTEN UND KLICKEN
|
|
logger.info("Warte auf Dashboard und 'Prospects finden'-Link in den Quick Links.")
|
|
|
|
# Dieser XPath ist sehr spezifisch für den Link in der "Quick links"-Kachel
|
|
prospects_link_selector = (By.XPATH, "//a[@data-test-target-product-tile]")
|
|
prospects_link = self.wait.until(EC.element_to_be_clickable(prospects_link_selector))
|
|
|
|
logger.info("'Prospects finden'-Link gefunden. Klicke darauf...")
|
|
prospects_link.click()
|
|
|
|
# SCHRITT 3: NAVIGATION VERIFIZIEREN
|
|
logger.info("Warte auf die finale Target-Seite...")
|
|
verification_target_selector = (By.XPATH, "//button[normalize-space()='+ Neue Suche']")
|
|
self.wait.until(EC.visibility_of_element_located(verification_target_selector))
|
|
|
|
logger.info("'Target'-Seite erfolgreich und vollständig geladen.")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Login- oder Navigationsprozess fehlgeschlagen: {type(e).__name__}", exc_info=True)
|
|
self._save_debug_artifacts()
|
|
return False
|
|
|
|
def navigate_to_target(self):
|
|
"""Navigiert direkt zur Target-URL und wartet auf Bestätigung."""
|
|
try:
|
|
logger.info(f"Navigiere direkt zur Target-URL: {Config.DEALFRONT_TARGET_URL}")
|
|
self.driver.get(Config.DEALFRONT_TARGET_URL)
|
|
|
|
# Warten, bis die URL sich tatsächlich ändert.
|
|
self.wait.until(EC.url_contains("/t/prospector/"))
|
|
logger.info(f"URL-Wechsel bestätigt. Aktuelle URL: {self.driver.current_url}")
|
|
|
|
# Warten, bis die Seite gerendert ist.
|
|
verification_selector = (By.XPATH, "//button[normalize-space()='+ Neue Suche']")
|
|
self.wait.until(EC.visibility_of_element_located(verification_selector))
|
|
|
|
logger.info("'Target'-Seite erfolgreich und vollständig geladen.")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Navigation zur 'Target'-Seite fehlgeschlagen: {type(e).__name__}", exc_info=True)
|
|
self._save_debug_artifacts()
|
|
return False
|
|
|
|
def navigate_to_target(self):
|
|
"""
|
|
Navigiert zum 'Target'-Bereich und verifiziert den Erfolg in drei Schritten.
|
|
Dieser Ansatz ist maximal robust gegen Timing-Probleme von SPAs.
|
|
"""
|
|
try:
|
|
# SCHRITT 1: Befehl zur Navigation geben
|
|
logger.info(f"Gebe Navigationsbefehl zur Target-URL: {Config.DEALFRONT_TARGET_URL}")
|
|
self.driver.get(Config.DEALFRONT_TARGET_URL)
|
|
|
|
# SCHRITT 2: Warten, bis die URL in der Adresszeile sich tatsächlich ändert.
|
|
url_part_to_wait_for = "/t/prospector/"
|
|
logger.info(f"Warte, bis die Browser-URL '{url_part_to_wait_for}' enthält...")
|
|
self.wait.until(EC.url_contains(url_part_to_wait_for))
|
|
logger.info(f"URL-Wechsel bestätigt. Aktuelle URL: {self.driver.current_url}")
|
|
|
|
# SCHRITT 3: ERST JETZT auf ein sichtbares Element auf der neuen Seite warten.
|
|
verification_selector = (By.XPATH, "//button[normalize-space()='+ Neue Suche']")
|
|
logger.info(f"Warte auf Sichtbarkeit des Verifizierungs-Elements auf der Target-Seite: {verification_selector}")
|
|
self.wait.until(EC.visibility_of_element_located(verification_selector))
|
|
|
|
logger.info("'Target'-Seite erfolgreich und vollständig geladen.")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Navigation zur 'Target'-Seite endgültig fehlgeschlagen: {type(e).__name__}", exc_info=True)
|
|
self._save_debug_artifacts()
|
|
return False
|
|
|
|
def load_search(self, search_name):
|
|
"""Lädt eine vordefinierte Suche anhand ihres Namens."""
|
|
try:
|
|
logger.info(f"Warte, bis die Liste der gespeicherten Suchen sichtbar ist...")
|
|
# Wir warten auf den Container, der alle Such-Items enthält.
|
|
# Ein guter Selektor ist das `ul`-Element mit der ID, die mit `eb-popup` beginnt.
|
|
# Oder allgemeiner: Ein Element, das den Text einer bekannten Suche enthält.
|
|
search_list_container_selector = (By.XPATH, f"//div[normalize-space()='{search_name}']")
|
|
self.wait.until(EC.visibility_of_element_located(search_list_container_selector))
|
|
logger.info(f"Liste gefunden. Klicke jetzt auf die vordefinierte Suche: '{search_name}'")
|
|
|
|
# Der XPath sucht nach einem Element (div, a, button), dessen Text exakt übereinstimmt.
|
|
search_link_selector = (By.XPATH, f"//*[normalize-space()='{search_name}']")
|
|
search_link = self.wait.until(EC.element_to_be_clickable(search_link_selector))
|
|
search_link.click()
|
|
|
|
# Verifizieren, dass die Ergebnisse geladen sind, indem wir auf die Ergebnistabelle warten.
|
|
results_table_header_selector = (By.XPATH, "//th[normalize-space()='Firma']")
|
|
self.wait.until(EC.visibility_of_element_located(results_table_header_selector))
|
|
logger.info(f"Suche '{search_name}' erfolgreich geladen und Ergebnisse angezeigt.")
|
|
|
|
time.sleep(3) # Feste Wartezeit, damit alle Daten nachgeladen sind.
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Laden der Suche '{search_name}' fehlgeschlagen: {type(e).__name__}", exc_info=True)
|
|
self._save_debug_artifacts()
|
|
return False
|
|
|
|
def extract_current_page_results(self):
|
|
"""
|
|
Extrahiert die Firmennamen und Webseiten von der aktuellen Ergebnisseite
|
|
mit einem finalen, robusten XPath-Selektor für die Datenzeilen.
|
|
"""
|
|
try:
|
|
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...")
|
|
results = []
|
|
|
|
# Warten, bis die Tabelle (bzw. ihr Body) da ist
|
|
table_body_selector = (By.CSS_SELECTOR, "table#t-result-table tbody")
|
|
self.wait.until(EC.presence_of_element_located(table_body_selector))
|
|
time.sleep(3) # Letzte Pause, um das JS-Rendering abzuwarten
|
|
|
|
# Finaler, robuster Selektor für die Datenzeilen:
|
|
# Wir suchen alle `tr`-Elemente innerhalb des `tbody`, die eine `id` haben.
|
|
rows_selector = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
|
|
rows = self.driver.find_elements(*rows_selector)
|
|
|
|
if not rows:
|
|
logger.warning("Keine Ergebniszeilen (tr[id]) auf der Seite gefunden. HTML-Struktur hat sich möglicherweise geändert.")
|
|
self._save_debug_artifacts()
|
|
return []
|
|
|
|
logger.info(f"{len(rows)} Firmen-Datenzeilen zur Verarbeitung gefunden.")
|
|
|
|
for i, row in enumerate(rows, 1):
|
|
try:
|
|
# Wir verwenden die stabilen Selektoren, die Sie identifiziert haben.
|
|
company_name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
|
|
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
|
|
results.append({'name': company_name, 'website': website})
|
|
except NoSuchElementException:
|
|
logger.warning(f"Zeile {i}: Konnte Name oder Webseite nicht extrahieren. Überspringe.")
|
|
continue
|
|
|
|
logger.info(f"Extraktion abgeschlossen. {len(results)} Firmen gefunden.")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Schwerwiegender Fehler bei der Extraktion: {type(e).__name__}", exc_info=True)
|
|
self._save_debug_artifacts()
|
|
return []
|
|
|
|
def close(self):
|
|
if self.driver:
|
|
logger.info("Schließe den WebDriver.")
|
|
self.driver.quit()
|
|
|
|
if __name__ == "__main__":
|
|
logger.info("Starte Dealfront Automatisierung - Finaler Workflow")
|
|
|
|
scraper = None
|
|
try:
|
|
scraper = DealfrontScraper()
|
|
if not scraper.driver:
|
|
raise Exception("WebDriver konnte nicht initialisiert werden.")
|
|
|
|
# === EINZIGER AUFRUF FÜR LOGIN UND NAVIGATION ===
|
|
if not scraper.login_and_navigate_to_target():
|
|
raise Exception("Login und Navigation fehlgeschlagen.")
|
|
|
|
# Ab hier geht es weiter wie geplant...
|
|
if not scraper.load_search(Config.TARGET_SEARCH_NAME):
|
|
raise Exception(f"Laden der Suche '{Config.TARGET_SEARCH_NAME}' fehlgeschlagen.")
|
|
|
|
companies = scraper.extract_current_page_results()
|
|
if companies:
|
|
df = pd.DataFrame(companies)
|
|
pd.set_option('display.max_rows', None)
|
|
pd.set_option('display.max_columns', None)
|
|
pd.set_option('display.width', 1000)
|
|
pd.set_option('display.max_colwidth', None)
|
|
print("\n" + "="*80)
|
|
print(" EXTRAHIERTE FIRMEN (ERSTE SEITE) ".center(80, "="))
|
|
print("="*80)
|
|
print(df.to_string(index=False))
|
|
print("="*80 + "\n")
|
|
else:
|
|
logger.warning("Keine Firmen auf der ersten Seite extrahiert oder gefunden.")
|
|
|
|
logger.info("Test erfolgreich abgeschlossen. Warte vor dem Schließen...")
|
|
time.sleep(10)
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=False)
|
|
finally:
|
|
if scraper:
|
|
scraper.close()
|
|
|
|
logger.info("Dealfront Automatisierung beendet.") |