dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-08 19:01:20 +00:00
parent 6607f0112e
commit 5e09b785be

View File

@@ -1,41 +1,22 @@
import os import os
import json
import time import time
import json
import logging import logging
import pandas as pd
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options, ChromeOptions
from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# ============================================================================== from config import TempConfig # Import deiner Konfigurationsklasse
# TEMPORÄRE, AUTARKE KONFIGURATION
# ==============================================================================
class TempConfig:
# --- Direkt hier definierte Werte, um config.py zu umgehen ---
DEALFRONT_LOGIN_URL = "https://app.dealfront.com/login"
TARGET_SEARCH_NAME = "Facility Management" # <-- BITTE AN IHRE SUCHE ANPASSEN
DEALFRONT_CREDENTIALS_FILE = "/app/dealfront_credentials.json"
# ==============================================================================
OUTPUT_DIR = "/app/output" # Logging konfigurieren
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s', force=True) LOG_FORMAT = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
logging.getLogger("selenium").setLevel(logging.WARNING) logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
log_filename = f"dealfront_run_{time.strftime('%Y%m%d-%H%M%S')}.txt"
log_filepath = os.path.join(OUTPUT_DIR, log_filename)
try:
file_handler = logging.FileHandler(log_filepath, mode='w', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(file_handler)
logger.info(f"Logging konfiguriert. Log-Datei: {log_filepath}")
except Exception as e:
logger.error(f"Konnte Log-Datei nicht erstellen: {e}")
class DealfrontScraper: class DealfrontScraper:
def __init__(self): def __init__(self):
@@ -47,175 +28,135 @@ class DealfrontScraper:
chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument("--window-size=1920,1080")
service = Service(executable_path='/usr/bin/chromedriver') service = Service(executable_path=TempConfig.CHROMEDRIVER_PATH)
try: try:
self.driver = webdriver.Chrome(service=service, options=chrome_options) self.driver = webdriver.Chrome(service=service, options=chrome_options)
except Exception as e: except Exception:
logger.critical(f"WebDriver konnte nicht initialisiert werden.", exc_info=True) logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True)
raise raise
self.wait = WebDriverWait(self.driver, 30) self.wait = WebDriverWait(self.driver, TempConfig.DEFAULT_TIMEOUT)
self.username, self.password = self._load_credentials() self.username, self.password = self._load_credentials()
logger.info("WebDriver erfolgreich initialisiert.") logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self): def _load_credentials(self):
try: try:
with open(TempConfig.DEALFRONT_CREDENTIALS_FILE, 'r') as f: with open(TempConfig.DEALFRONT_CREDENTIALS_FILE, 'r', encoding='utf-8') as f:
creds = json.load(f) creds = json.load(f)
return creds.get("username"), creds.get("password") return creds.get('username'), creds.get('password')
except Exception as e: except Exception as e:
logger.error(f"Credentials-Datei konnte nicht geladen werden: {e}") logger.error(f"Credentials-Datei konnte nicht geladen werden: {e}")
return None, None raise
def _save_debug_artifacts(self): def _save_debug_artifacts(self):
# ... (Diese Methode bleibt unverändert) ...
try: try:
os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(TempConfig.OUTPUT_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S") ts = time.strftime("%Y%m%d-%H%M%S")
screenshot_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.png") png = os.path.join(TempConfig.OUTPUT_DIR, f"error_{ts}.png")
html_filepath = os.path.join(OUTPUT_DIR, f"error_{timestamp}.html") html = os.path.join(TempConfig.OUTPUT_DIR, f"error_{ts}.html")
self.driver.save_screenshot(screenshot_filepath) self.driver.save_screenshot(png)
logger.error(f"Screenshot '{screenshot_filepath}' wurde für die Analyse gespeichert.") logger.error(f"Screenshot '{png}' gespeichert.")
with open(html_filepath, "w", encoding="utf-8") as f: with open(html, 'w', encoding='utf-8') as f:
f.write(self.driver.page_source) f.write(self.driver.page_source)
logger.error(f"HTML-Quellcode '{html_filepath}' wurde für die Analyse gespeichert.") logger.error(f"HTML-Source '{html}' gespeichert.")
except Exception as e: except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}") logger.error(f"Debug-Artefakte konnten nicht gespeichert werden: {e}")
def login_and_find_list(self, search_name): def login_and_find_list(self, search_name):
# ... (Diese Methode bleibt unverändert, verwendet aber jetzt TempConfig) ... # Login
try:
logger.info(f"Navigiere zur Login-Seite: {TempConfig.DEALFRONT_LOGIN_URL}") logger.info(f"Navigiere zur Login-Seite: {TempConfig.DEALFRONT_LOGIN_URL}")
self.driver.get(TempConfig.DEALFRONT_LOGIN_URL) self.driver.get(TempConfig.DEALFRONT_LOGIN_URL)
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username) self.wait.until(EC.visibility_of_element_located((By.NAME, 'email'))).send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click() self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet.") logger.info("Login gesendet.")
logger.info("Warte auf Dashboard und den 'Prospects finden' Quick-Link...")
prospects_link_selector = (By.XPATH, "//a[@data-test-target-product-tile]") # 'Prospects finden'
prospects_link = self.wait.until(EC.element_to_be_clickable(prospects_link_selector)) tile = self.wait.until(
prospects_link.click() EC.element_to_be_clickable((By.CSS_SELECTOR, "a[data-test-target-product-tile]"))
logger.info("'Prospects finden' geklickt.")
logger.info(f"Warte auf die Liste der Suchen und klicke auf '{search_name}'...")
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
search_item = self.wait.until(EC.element_to_be_clickable(search_item_selector))
search_item.click()
logger.info(f"Suche '{search_name}' geladen. Warte auf das Rendern der Ergebnistabelle.")
first_row_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(
EC.visibility_of_element_located(first_row_locator)
) )
time.sleep(5) tile.click()
logger.info("Zielseite mit Ergebnissen erfolgreich erreicht.") logger.info("'Prospects finden' geklickt.")
return True
except Exception as e: # Vordefinierte Suche auswählen
logger.critical(f"Der Prozess ist fehlgeschlagen: {type(e).__name__}", exc_info=True) sel = (By.XPATH, f"//div[contains(@class,'truncate') and normalize-space()='{search_name}']")
self._save_debug_artifacts() item = self.wait.until(EC.element_to_be_clickable(sel))
return False item.click()
logger.info(f"Suche '{search_name}' geladen.")
def extract_current_page_results(self): def extract_current_page_results(self):
# 1) Kurzes Absenken des Implicit-Waits # 1) Kurzer Implicit-Wait
self.driver.implicitly_wait(1) self.driver.implicitly_wait(1)
# 2) Warten auf erstes Firmen-Element # 2) Warten auf erstes Daten-Element
first_row_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") first_locator = (By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first_row_locator)) self.wait.until(EC.visibility_of_element_located(first_locator))
time.sleep(1)
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...") logger.info("Extrahiere aktuelle Seite...")
results = [] results = []
# 3) Warten auf mindestens eine Tabellenzeile # 3) Auf mindestens eine Zeile warten
rows_selector = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]") rows_sel = (By.CSS_SELECTOR, "table#t-result-table tbody tr[id]")
self.wait.until(EC.presence_of_all_elements_located(rows_selector)) self.wait.until(EC.presence_of_all_elements_located(rows_sel))
rows = self.driver.find_elements(*rows_selector) rows = self.driver.find_elements(*rows_sel)
logger.info(f"{len(rows)} Firmen-Zeilen gefunden.") logger.info(f"{len(rows)} Zeilen gefunden.")
# 4) Daten sammeln, ohne weitere Sleeps/Exceptions # 4) Namen & Websites extrahieren
for i, row in enumerate(rows, 1): for i, row in enumerate(rows, 1):
name_elems = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text") names = row.find_elements(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text")
if not name_elems: if not names:
logger.warning(f"Zeile {i}: Kein Name-Element gefunden. Überspringe.") logger.warning(f"Zeile {i}: Kein Name gefunden.")
continue continue
name_elem = name_elems[0] name_elem = names[0]
company_name = (name_elem.get_attribute("title") or name_elem.text).strip() name = (name_elem.get_attribute('title') or name_elem.text).strip()
web_elems = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a") webs = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3) a")
if web_elems: if webs:
website = web_elems[0].text.strip() web = webs[0].text.strip()
else: else:
cell = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)") cell = row.find_elements(By.CSS_SELECTOR, "td:nth-of-type(3)")
website = cell[0].text.strip() if cell else "" web = cell[0].text.strip() if cell else ''
results.append({'name': company_name, 'website': website}) results.append({'name': name, 'website': web})
logger.info(f"Extraktion abgeschlossen: {len(results)} Firmen.") logger.info(f"Extraktion abgeschlossen: {len(results)} Firmen.")
# Implicit-Wait reset
self.driver.implicitly_wait(TempConfig.IMPLICIT_WAIT)
return results return results
def click_next_page(self) -> bool: def click_next_page(self) -> bool:
""" btns = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
Klickt auf den 'Next'-Paginator-Button. if not btns:
Gibt False zurück, wenn kein Next-Button (mehr) klickbar ist.
"""
# alle Buttons (Prev, Seiten, Next) abgreifen
buttons = self.driver.find_elements(By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button")
next_btn = buttons[-1] # letzter ist Next
# Ist er deaktiviert?
if not next_btn.is_enabled() or "disabled" in next_btn.get_attribute("class"):
return False return False
nxt = btns[-1]
# Merke aktuelle Seite if not nxt.is_enabled() or 'disabled' in nxt.get_attribute('class'):
current = self.driver.find_element( return False
By.CSS_SELECTOR, curr = self.driver.find_element(
"nav.eb-pagination a.eb-pagination-button.active" By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
).text ).text
next_btn.click() nxt.click()
self.wait.until(
# Warte, bis die aktive Seite sich ändert
WebDriverWait(self.driver, 10).until(
lambda d: d.find_element( lambda d: d.find_element(
By.CSS_SELECTOR, By.CSS_SELECTOR, "nav.eb-pagination a.eb-pagination-button.active"
"nav.eb-pagination a.eb-pagination-button.active" ).text != curr
).text != current
) )
return True return True
def run(self, search_name): def run(self, search_name):
# 1) Login & Suche laden try:
self.login_and_find_list(search_name) self.login_and_find_list(search_name)
all_res = []
# 2) Alle Seiten durchgehen
all_results = []
while True: while True:
page_results = self.extract_current_page_results() page_res = self.extract_current_page_results()
all_results.extend(page_results) all_res.extend(page_res)
if not self.click_next_page(): if not self.click_next_page():
break break
return all_res
return all_results finally:
self.driver.quit()
if __name__ == "__main__": if __name__ == '__main__':
from selenium import webdriver scraper = DealfrontScraper()
from selenium.webdriver.chrome.options import Options data = scraper.run('Facility Management')
from selenium.webdriver.support.ui import WebDriverWait for d in data:
print(d)
# 1) WebDriver konfigurieren
chrome_options = Options()
chrome_options.add_argument("--headless") # headless mode
chrome_options.add_argument("--no-sandbox") # für Docker
chrome_options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=chrome_options)
# 2) Explicit Wait erstellen
wait = WebDriverWait(driver, 30)
# 3) Scraper starten
scraper = DealfrontScraper(driver, wait)
results = scraper.run("Facility Management")
# 4) Ergebnis ausgeben oder weiterverarbeiten
for entry in results:
print(entry)
# 5) Aufräumen
driver.quit()