dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-11 09:04:21 +00:00
parent 0bf54699b1
commit c4154b49b7

View File

@@ -62,10 +62,16 @@ class DealfrontScraper:
return None, None
def _save_debug_artifacts(self, suffix=""):
# (Diese Methode bleibt unverändert)
pass
try:
timestamp = time.strftime("%Y%m%d-%H%M%S")
filename_base = os.path.join(Config.OUTPUT_DIR, f"error_{suffix}_{timestamp}")
self.driver.save_screenshot(f"{filename_base}.png")
with open(f"{filename_base}.html", "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"Debug-Artefakte gespeichert: {filename_base}.*")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
# --- WIEDERHERGESTELLTE METHODEN ---
def login(self):
try:
logger.info(f"Navigiere zur Login-Seite: {Config.LOGIN_URL}")
@@ -75,18 +81,15 @@ class DealfrontScraper:
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet. Warte 5 Sekunden auf Session-Etablierung.")
time.sleep(5)
# Einfache Überprüfung, ob wir weitergeleitet wurden
# Verifizierung, dass wir nicht mehr auf der Login-Seite sind
if "login" not in self.driver.current_url:
logger.info("Login erfolgreich, URL hat sich geändert.")
return True
else:
# Warten auf ein Dashboard-Element als Fallback
self.wait.until(EC.visibility_of_element_located((By.XPATH, "//a[contains(@href, '/dashboard')]")))
logger.info("Login erfolgreich und Dashboard erreicht.")
return True
self._save_debug_artifacts("login_stuck")
return False
except Exception as e:
logger.critical("Login-Prozess fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts()
self._save_debug_artifacts("login_exception")
return False
def navigate_and_load_search(self, search_name):
@@ -98,159 +101,72 @@ class DealfrontScraper:
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
self.wait.until(EC.element_to_be_clickable(search_item_selector)).click()
logger.info("Suche geladen. Warte auf die Ergebnistabelle.")
logger.info("Suche geladen. Warte auf das Rendern der Ergebnistabelle.")
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr")))
return True
except Exception as e:
logger.critical("Navigation oder Laden der Suche fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts()
self._save_debug_artifacts("navigation_or_search_load")
return False
# --- METHODE AUS DEM LETZTEN ERFOLGREICHEN VERSUCH ---
def extract_current_page_results(self):
"""
Extrahiert Daten NUR aus den sichtbaren Ergebniszeilen und optimiert die Wartezeiten.
"""
try:
logger.info("Extrahiere Ergebnisse mit dem finalen, präzisen Selektor...")
results = []
# 1. Warten, bis die erste Daten-Zelle (Firmenname) sichtbar ist.
first_company_link_selector = (By.CSS_SELECTOR, "td.sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first_company_link_selector))
# 2. Finde NUR die Zeilen, die tatsächlich einen Firmennamen-Link enthalten.
# Dieser XPath ist extrem robust und filtert "Geister"-Zeilen heraus.
rows_with_data_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]")
rows = self.driver.find_elements(*rows_with_data_selector)
logger.info(f"{len(rows)} gültige Datenzeilen zur Verarbeitung gefunden.")
for row in rows:
results = []
rows_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]")
data_rows = self.driver.find_elements(*rows_selector)
for row in data_rows:
try:
name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
website = "N/A"
try:
# Innerhalb dieser garantiert validen Zeilen können wir nun sicher extrahieren.
company_name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
# Versuche, die Webseite zu finden. Wenn nicht vorhanden, wird sie "N/A".
try:
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
except NoSuchElementException:
website = "N/A"
results.append({'name': company_name, 'website': website})
except Exception as e:
# Dieser Fall sollte jetzt kaum noch auftreten.
logger.warning(f"Konnte Daten aus einer validen Zeile nicht extrahieren: {e}")
continue
logger.info(f"Extraktion abgeschlossen. {len(results)} Firmen verarbeitet.")
return results
except Exception as e:
logger.error(f"Schwerwiegender Fehler bei der Extraktion: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return []
# --- NEUE METHODE FÜR PAGINIERUNG ---
def scrape_all_pages(self):
"""
Iteriert durch alle Ergebnisseiten, indem auf den 'Weiter'-Button geklickt wird.
"""
all_companies = []
page_number = 1
while True:
logger.info(f"--- Verarbeite Seite {page_number} ---")
# Warten auf die erste Datenzeile auf der aktuellen Seite
company_name_selector = (By.CSS_SELECTOR, "td.sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(company_name_selector))
# Daten der aktuellen Seite extrahieren
page_results = self.extract_current_page_results()
if not page_results:
logger.warning("Konnte keine Ergebnisse auf der aktuellen Seite extrahieren.")
break # Stoppen, wenn keine Daten gefunden wurden
all_companies.extend(page_results)
logger.info(f"Seite {page_number}: {len(page_results)} Firmen gefunden. Gesamt: {len(all_companies)}")
# Weiter-Button finden
# Wir suchen nach einem Link, der ein SVG-Icon mit der Klasse 'fa-angle-right' enthält.
next_page_button_selector = (By.CSS_SELECTOR, "a.eb-pagination-button:not([disabled]) > svg.fa-angle-right")
try:
next_button = self.wait.until(EC.element_to_be_clickable(next_page_button_selector))
# Prüfen, ob der Button wirklich klickbar ist und nicht nur vorhanden
if next_button.is_displayed() and next_button.is_enabled():
logger.info("Weiter-Button gefunden und klickbar. Klicke auf Weiter...")
self.driver.execute_script("arguments[0].click();", next_button)
page_number += 1
# Nach dem Klicken auf "Weiter" warten wir wieder auf das Erscheinen des ersten Elements der NEUEN Seite.
# Das ist robuster als eine feste Zeit.
self.wait.until(EC.visibility_of_element_located(company_name_selector))
logger.info("Neue Seite geladen.")
else:
logger.info("Weiter-Button ist vorhanden, aber deaktiviert. Letzte Seite erreicht.")
break
except TimeoutException:
logger.info("Kein klickbarer 'Weiter'-Button gefunden. Letzte Seite erreicht.")
break
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
except NoSuchElementException:
logger.info("Kein 'Weiter'-Button gefunden. Letzte Seite erreicht.")
break
except Exception as e:
logger.error(f"Unerwarteter Fehler beim Navigieren zur nächsten Seite: {e}")
self._save_debug_artifacts()
break
return all_companies
pass
results.append({'name': name, 'website': website})
except NoSuchElementException:
continue
return results
# Die extract_current_page_results Methode bleibt, wie sie zuletzt war, mit den robusten Selektoren
def extract_current_page_results(self):
"""
Extrahiert Daten NUR aus den sichtbaren Ergebniszeilen und optimiert die Wartezeiten.
"""
try:
logger.info("Extrahiere Ergebnisse von der aktuellen Seite...")
results = []
def scrape_all_pages(self):
all_companies = {}
page_number = 1
while True:
logger.info(f"--- Verarbeite Seite {page_number} ---")
first_company_link_selector = (By.CSS_SELECTOR, "td.sticky-column a.t-highlight-text")
self.wait.until(EC.visibility_of_element_located(first_company_link_selector))
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table")))
rows_with_data_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]")
rows = self.driver.find_elements(*rows_with_data_selector)
if not rows:
logger.warning("Keine gültigen Datenzeilen gefunden.")
return []
try:
first_row_id = self.driver.find_element(By.CSS_SELECTOR, "table#t-result-table tbody tr[id]").get_attribute("id")
except NoSuchElementException:
logger.warning("Konnte keine Datenzeilen auf der Seite finden. Beende Paginierung.")
break
for row in rows:
try:
company_name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
website = "N/A"
try:
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
except NoSuchElementException:
pass
results.append({'name': company_name, 'website': website})
except NoSuchElementException:
logger.warning("Konnte Daten aus einer Zeile nicht extrahieren.")
continue
page_results = self.extract_current_page_results()
if not page_results and page_number > 1:
logger.info("Keine weiteren Ergebnisse auf der Seite extrahiert. Paginierung abgeschlossen.")
break
for company in page_results:
if company.get('website') != 'N/A' and company.get('website'):
all_companies[company['website']] = company
logger.info(f"{len(results)} Firmen extrahiert.")
return results
except Exception as e:
logger.error(f"Fehler bei der Extraktion: {type(e).__name__}", exc_info=True)
self._save_debug_artifacts()
return []
logger.info(f"Seite {page_number}: {len(page_results)} Firmen gefunden. Gesamt einzigartig: {len(all_companies)}")
try:
next_button = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "a.eb-pagination-button:not(.disabled) svg[data-icon='angle-right']")))
self.driver.execute_script("arguments[0].click();", next_button)
page_number += 1
# Warten, bis die alte erste Zeile verschwunden ist
self.wait.until(EC.staleness_of(self.driver.find_element(By.ID, first_row_id)))
except TimeoutException:
logger.info("Kein klickbarer 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.")
break
return list(all_companies.values())
def close(self):
if self.driver:
self.driver.quit()
logger.info("WebDriver geschlossen.")
if __name__ == "__main__":
scraper = None