Files
Brancheneinstufung2/dealfront_enrichment.py

236 lines
11 KiB
Python

import os
import json
import time
import logging
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# ==============================================================================
# TEMPORÄRE, AUTARKE KONFIGURATION
# ==============================================================================
class TempConfig:
# --- Direkt hier definierte Werte, um config.py zu umgehen ---
DEALFRONT_LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management" # <-- BITTE AN IHRE SUCHE ANPASSEN
DEALFRONT_CREDENTIALS_FILE = "/app/dealfront_credentials.json"
# ==============================================================================
OUTPUT_DIR = "/app/output"
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s', force=True)
logging.getLogger("selenium").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
log_filename = f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.txt"
log_filepath = os.path.join(OUTPUT_DIR, log_filename)
try:
file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(file_handler)
logger.info(f"Logging konfiguriert. Log-Datei: {log_filepath}")
except Exception as e:
logger.error(f"Konnte Log-Datei nicht erstellen: {e}")
class DealfrontScraper:
def __init__(self):
logger.info("Initialisiere den DealfrontScraper...")
chrome_options = ChromeOptions()
prefs = {"profile.managed_default_content_settings.images": 2}
chrome_options.add_experimental_option("prefs", prefs)
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
service = Service(executable_path='/usr/bin/chromedriver')
try:
self.driver = webdriver.Chrome(service=service, options=chrome_options)
except Exception as e:
logger.critical(f"WebDriver konnte nicht initialisiert werden.", exc_info=True)
raise
self.wait = WebDriverWait(self.driver, 30)
self.username, self.password = self._load_credentials()
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
try:
with open(TempConfig.DEALFRONT_CREDENTIALS_FILE, 'r') as f:
creds = json.load(f)
return creds.get("username"), creds.get("password")
except Exception as e:
logger.error(f"Credentials-Datei konnte nicht geladen werden: {e}")
return None, None
def _save_debug_artifacts(self):
# ... (Diese Methode bleibt unverändert) ...
try:
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S")
screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png")
html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html")
self.driver.save_screenshot(screenshot_filepath)
logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.")
with open(html_filepath, "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def login_and_find_list(self, search_name):
# ... (Diese Methode bleibt unverändert, verwendet aber jetzt TempConfig) ...
try:
logger.info(f"Navigiere zur Login-Seite: {TempConfig.DEALFRONT_LOGIN_URL}")
self.driver.get(TempConfig.DEALFRONT_LOGIN_URL)
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet.")
logger.info("Warte auf Dashboard und den 'Prospects finden' Quick-Link...")
prospects_link_selector = (By.XPATH, "//a[@data-test-target-product-tile]")
prospects_link = self.wait.until(EC.element_to_be_clickable(prospects_link_selector))
prospects_link.click()
logger.info("'Prospects finden' geklickt.")
logger.info(f"Warte auf die Liste der Suchen und klicke auf '{search_name}'...")
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
search_item = self.wait.until(EC.element_to_be_clickable(search_item_selector))
search_item.click()
logger.info(f"Suche '{search_name}' geladen. Warte auf das Rendern der Ergebnistabelle.")
first_row_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(
EC.visibility_of_element_located(first_row_locator)
)
time.sleep(5)
logger.info("Zielseite mit Ergebnissen erfolgreich erreicht.")
return True
except Exception as e:
logger.critical(f"Der Prozess ist fehlgeschlagen: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return False
def extract_current_page_results(self):
# 1) Kurzes Absenken des Implicit-Waits
self.driver.implicitly_wait(1)
# 2) Warten auf erstes Firmen-Element
first_row_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first_row_locator))
time.sleep(1)
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...")
results = []
# 3) Warten auf mindestens eine Tabellenzeile
rows_selector = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.presence_of_all_elements_located(rows_selector))
rows = self.driver.find_elements(*rows_selector)
logger.info(f"{len(rows)} Firmen-Zeilen gefunden.")
# 4) Daten sammeln, ohne weitere Sleeps/Exceptions
for i, row in enumerate(rows, 1):
name_elems = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
if not name_elems:
logger.warning(f"Zeile {i}: Kein Name-Element gefunden. Überspringe.")
continue
name_elem = name_elems[0]
company_name = (name_elem.get_attribute("title") or name_elem.text).strip()
web_elems = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
if web_elems:
website = web_elems[0].text.strip()
else:
cell = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
website = cell[0].text.strip() if cell else ""
results.append({'name': company_name, 'website': website})
logger.info(f"Extraktion abgeschlossen: {len(results)} Firmen.")
return results
def click_next_page(self) -> bool:
"""
Klickt auf den 'Next'-Paginator-Button.
Gibt False zurück, wenn kein Next-Button (mehr) klickbar ist.
"""
# alle Buttons (Prev, Seiten, Next) abgreifen
buttons = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
next_btn = buttons[-1] # letzter ist Next
# Ist er deaktiviert?
if not next_btn.is_enabled() or "disabled" in next_btn.get_attribute("class"):
return False
# Merke aktuelle Seite
current = self.driver.find_element(
By.CSS_SELECTOR,
"nav.eb-pagination a.eb-pagination-button.active"
).text
next_btn.click()
# Warte, bis die aktive Seite sich ändert
WebDriverWait(self.driver, 10).until(
lambda d: d.find_element(
By.CSS_SELECTOR,
"nav.eb-pagination a.eb-pagination-button.active"
).text != current
)
return True
def run(self, search_name):
# 1) Login & Suche laden
self.login_and_find_list(search_name)
# 2) Alle Seiten durchgehen
all_results = []
while True:
page_results = self.extract_current_page_results()
all_results.extend(page_results)
if not self.click_next_page():
break
return all_results
if __name__ == "__main__":
logger.info("Starte Dealfront Automatisierung - DEBUG-MODUS")
scraper = None
try:
scraper = DealfrontScraper(driver, wait)
results = scraper.run("Facility Management")
if not scraper.driver:
raise Exception("WebDriver konnte nicht initialisiert werden.")
if not scraper.login_and_find_list(TempConfig.TARGET_SEARCH_NAME):
raise Exception("Der Prozess vom Login bis zum Laden der Liste ist fehlgeschlagen.")
# In dieser Version gibt es keine handle_overlays Methode mehr
# scraper.handle_overlays()
companies = scraper.extract_current_page_results()
if companies:
df = pd.DataFrame(companies)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth', None)
print("\n" + "="*80)
print(" EXTRAHIERTE FIRMEN (ERSTE SEITE) ".center(80, "="))
print("="*80)
print(df.to_string(index=False))
print("="*80 + "\n")
else:
logger.warning("Obwohl die Seite geladen wurde, konnten keine Firmen extrahiert werden.")
logger.info("Test erfolgreich abgeschlossen. Warte vor dem Schließen...")
time.sleep(10)
except Exception as e:
logger.critical(f"Ein kritischer Fehler ist im Hauptprozess aufgetreten: {e}", exc_info=False)
finally:
if scraper:
scraper.close()
logger.info("Dealfront Automatisierung beendet.")