dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-13 11:23:09 +00:00
parent 2c702fb58a
commit 41414fc6d8

View File

@@ -14,7 +14,7 @@ from selenium.common.exceptions import TimeoutException, NoSuchElementException
class Config:
LOGIN_URL = "https://app.dealfront.com/login"
TARGET_URL = "https://app.dealfront.com/t/prospector/companies"
SEARCH_NAME = "Facility Management"
SEARCH_NAME = "Facility Management" # <-- PASSEN SIE DIES AN IHRE GESPEICHERTE SUCHE AN
CREDENTIALS_FILE = "/app/dealfront_credentials.json"
OUTPUT_DIR = "/app/output"
@@ -34,6 +34,7 @@ class DealfrontScraper:
def __init__(self):
logger.info("Initialisiere WebDriver...")
chrome_options = ChromeOptions()
chrome_options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2})
chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
@@ -48,7 +49,7 @@ class DealfrontScraper:
self.wait = WebDriverWait(self.driver, 30)
self.username, self.password = self._load_credentials()
if not self.username or not self.password:
raise ValueError("Credentials konnten nicht geladen werden.")
raise ValueError("Credentials konnten nicht geladen werden. Breche ab.")
logger.info("WebDriver erfolgreich initialisiert.")
def _load_credentials(self):
@@ -61,7 +62,15 @@ class DealfrontScraper:
return None, None
def _save_debug_artifacts(self, suffix=""):
pass # Vorerst deaktiviert, um Logs sauber zu halten
try:
timestamp = time.strftime("%Y%m%d-%H%M%S")
filename_base = os.path.join(Config.OUTPUT_DIR, f"error_{suffix}_{timestamp}")
self.driver.save_screenshot(f"{filename_base}.png")
with open(f"{filename_base}.html", "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"Debug-Artefakte gespeichert: {filename_base}.*")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def login(self):
try:
@@ -70,27 +79,28 @@ class DealfrontScraper:
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet. Warte auf Session...")
logger.info("Login-Befehl gesendet. Warte 5 Sekunden auf Session-Etablierung.")
time.sleep(5)
# Verifizierung, dass wir nicht mehr auf der Login-Seite sind
if "login" in self.driver.current_url:
raise Exception("Weiterleitung nach Login fehlgeschlagen.")
logger.info("Login erfolgreich.")
return True
except Exception as e:
logger.critical("Login-Prozess fehlgeschlagen.", exc_info=True)
logger.critical(f"Login-Prozess fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts("login_exception")
return False
def navigate_and_load_search(self, search_name):
try:
logger.info(f"Navigiere direkt zur Target-Seite und lade Suche...")
logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...")
self.driver.get(Config.TARGET_URL)
self.wait.until(EC.url_contains("/t/prospector/"))
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
self.wait.until(EC.element_to_be_clickable(search_item_selector)).click()
logger.info("Suche geladen. Warte auf Ergebnistabelle.")
logger.info("Suche geladen. Warte auf das Rendern der Ergebnistabelle.")
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr")))
return True
except Exception as e:
@@ -98,34 +108,64 @@ class DealfrontScraper:
self._save_debug_artifacts("navigation_or_search_load")
return False
def scrape_all_pages(self, max_pages=6):
"""
Scrollt die Seite schrittweise nach unten und extrahiert neue Daten,
bis keine neuen Firmen mehr geladen werden oder das Seitenlimit erreicht ist.
"""
logger.info("Starte finalen Scroll- und Extraktionsprozess...")
all_companies = {}
last_found_count = 0
def extract_current_page_results(self):
results = []
rows_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]")
for i in range(max_pages * 5): # Mache maximal 5 Scroll-Versuche pro Seite
try:
data_rows = self.driver.find_elements(*rows_selector)
for row in data_rows:
try:
name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
website = "N/A"
try:
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
except NoSuchElementException:
pass
results.append({'name': name, 'website': website})
except NoSuchElementException:
continue
except Exception as e:
logger.error(f"Fehler bei der Extraktion auf der aktuellen Seite: {e}")
return results
def scrape_all_pages(self, max_pages=6):
all_companies = {}
for page_number in range(1, max_pages + 1):
logger.info(f"--- Verarbeite Seite {page_number} ---")
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table")))
# Scrollen, um alle 20 Zeilen zu laden
logger.info("Scrolle Seite nach unten, um alle Zeilen zu laden...")
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2) # Pause für das Rendern
page_results = self.extract_current_page_results()
for company in page_results:
unique_key = (company.get('name'), company.get('website'))
all_companies[unique_key] = company
# Prüfen, ob neue Firmen hinzugekommen sind
if len(all_companies) > last_found_count:
logger.info(f"Scrolle... {len(all_companies)} einzigartige Firmen gefunden.")
last_found_count = len(all_companies)
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(3) # Wartezeit für das Nachladen
else:
logger.info("Keine neuen Firmen nach dem Scrollen gefunden. Paginierung scheint abgeschlossen.")
break # Die Schleife beenden, wenn Scrollen keine neuen Ergebnisse bringt
logger.info(f"Seite {page_number}: {len(page_results)} Firmen extrahiert. Gesamt einzigartig: {len(all_companies)}")
try:
# Eindeutiger XPath für den "Weiter"-Pfeil-Button, der nicht deaktiviert ist
next_button_selector = (By.XPATH, "//nav//a[not(contains(@class, 'disabled')) and .//svg[contains(@class, 'fa-angle-right')]]")
next_button = self.driver.find_element(*next_button_selector)
self.driver.execute_script("arguments[0].scrollIntoView(true);", next_button)
time.sleep(1)
next_button.click()
logger.info(f"Auf Seite {page_number + 1} geblättert. Warte auf neue Inhalte...")
time.sleep(4) # Feste Wartezeit für Seitenwechsel
except NoSuchElementException:
logger.info("Kein klickbarer 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.")
break
return list(all_companies.values())
def close(self):
if self.driver:
self.driver.quit()