dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-11 07:57:54 +00:00
parent 069f43827a
commit 0053363062

View File

@@ -57,102 +57,115 @@ class DealfrontScraper:
with open(Config.CREDENTIALS_FILE, 'r', encoding='utf-8') as f: with open(Config.CREDENTIALS_FILE, 'r', encoding='utf-8') as f:
creds = json.load(f) creds = json.load(f)
return creds.get("username"), creds.get("password") return creds.get("username"), creds.get("password")
except Exception: except Exception as e:
logger.error(f"Credentials-Datei {Config.CREDENTIALS_FILE} nicht gefunden.") logger.error(f"Credentials-Datei {Config.CREDENTIALS_FILE} konnte nicht geladen werden: {e}")
return None, None return None, None
def _save_debug_artifacts(self, suffix=""): def _save_debug_artifacts(self, suffix=""):
try: # (Diese Methode bleibt unverändert)
timestamp = time.strftime("%Y%m%d-%H%M%S") pass
filename_base = os.path.join(Config.OUTPUT_DIR, f"error_{suffix}_{timestamp}")
self.driver.save_screenshot(f"{filename_base}.png")
with open(f"{filename_base}.html", "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.error(f"Debug-Artefakte gespeichert: {filename_base}.*")
except Exception as e:
logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}")
def run(self): # --- WIEDERHERGESTELLTE METHODEN ---
# 1. LOGIN def login(self):
logger.info(f"Navigiere zu: {Config.LOGIN_URL}") try:
logger.info(f"Navigiere zur Login-Seite: {Config.LOGIN_URL}")
self.driver.get(Config.LOGIN_URL) self.driver.get(Config.LOGIN_URL)
self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username) self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username)
self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password)
self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click() self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click()
logger.info("Login-Befehl gesendet. Warte 5s auf Session-Etablierung.") logger.info("Login-Befehl gesendet. Warte 5 Sekunden auf Session-Etablierung.")
time.sleep(5) time.sleep(5)
# Einfache Überprüfung, ob wir weitergeleitet wurden
if "login" not in self.driver.current_url:
logger.info("Login erfolgreich, URL hat sich geändert.")
return True
else:
# Warten auf ein Dashboard-Element als Fallback
self.wait.until(EC.visibility_of_element_located((By.XPATH, "//a[contains(@href, '/dashboard')]")))
logger.info("Login erfolgreich und Dashboard erreicht.")
return True
except Exception as e:
logger.critical("Login-Prozess fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts()
return False
# 2. NAVIGATION & SUCHE LADEN def navigate_and_load_search(self, search_name):
logger.info(f"Navigiere direkt zur Target-Seite und lade Suche: '{Config.SEARCH_NAME}'") try:
logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...")
self.driver.get(Config.TARGET_URL) self.driver.get(Config.TARGET_URL)
self.wait.until(EC.element_to_be_clickable((By.XPATH, f"//*[normalize-space()='{Config.SEARCH_NAME}']"))).click() self.wait.until(EC.url_contains("/t/prospector/"))
# 3. ERGEBNISSE EXTRAHIEREN search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
logger.info("Suche geladen. Extrahiere Ergebnisse der ersten Seite.") self.wait.until(EC.element_to_be_clickable(search_item_selector)).click()
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table")))
time.sleep(3) # Letzte kurze Pause für das Rendering
company_elements = self.driver.find_elements(By.CSS_SELECTOR, "td.sticky-column a.t-highlight-text") logger.info("Suche geladen. Warte auf die Ergebnistabelle.")
website_elements = self.driver.find_elements(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text") self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr")))
return True
logger.info(f"{len(company_elements)} Firmen und {len(website_elements)} Webseiten gefunden.") except Exception as e:
logger.critical("Navigation oder Laden der Suche fehlgeschlagen.", exc_info=True)
self._save_debug_artifacts()
return False
# --- METHODE AUS DEM LETZTEN ERFOLGREICHEN VERSUCH ---
def extract_current_page_results(self):
try:
logger.info("Extrahiere Daten von der aktuellen Seite...")
results = [] results = []
for i in range(len(company_elements)): rows_with_data_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]")
name = company_elements[i].get_attribute("title").strip() data_rows = self.wait.until(EC.presence_of_all_elements_located(rows_with_data_selector))
website = website_elements[i].text.strip() if i < len(website_elements) else "N/A"
for row in data_rows:
try:
name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip()
website = "N/A"
try:
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip()
except NoSuchElementException:
pass
results.append({'name': name, 'website': website}) results.append({'name': name, 'website': website})
except NoSuchElementException:
continue
return results return results
except Exception as e:
logger.error("Fehler bei der Extraktion.", exc_info=True)
self._save_debug_artifacts()
return []
# --- NEUE METHODE FÜR PAGINIERUNG ---
def scrape_all_pages(self): def scrape_all_pages(self):
"""
Iteriert durch alle Ergebnisseiten, extrahiert die Daten und gibt eine Gesamtliste zurück.
"""
all_companies = [] all_companies = []
page_number = 1 page_number = 1
while True: while True:
logger.info(f"--- Verarbeite Seite {page_number} ---") logger.info(f"--- Verarbeite Seite {page_number} ---")
# Warten, bis die Tabelle sichtbar ist # Warten, bis die Tabelle sichtbar ist
table_selector = (By.CSS_SELECTOR, "table#t-result-table") self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table")))
self.wait.until(EC.visibility_of_element_located(table_selector))
# ID des ersten Eintrags merken, um Seitenwechsel zu verifizieren
try: try:
first_row_id = self.driver.find_element(By.CSS_SELECTOR, "table#t-result-table tbody tr").get_attribute("id") first_row_id = self.driver.find_element(By.CSS_SELECTOR, "table#t-result-table tbody tr[id]").get_attribute("id")
except NoSuchElementException: except NoSuchElementException:
logger.warning("Konnte keine Datenzeilen auf der Seite finden. Beende Paginierung.") logger.info("Keine weiteren Datenzeilen auf der Seite gefunden. Beende Paginierung.")
break break
# Daten von der aktuellen Seite extrahieren
page_results = self.extract_current_page_results() page_results = self.extract_current_page_results()
if not page_results: if not page_results:
logger.info("Keine weiteren Ergebnisse auf der Seite gefunden. Paginierung abgeschlossen.") logger.info("Keine Ergebnisse auf dieser Seite extrahiert. Paginierung abgeschlossen.")
break break
all_companies.extend(page_results) all_companies.extend(page_results)
logger.info(f"Bisher {len(all_companies)} Firmen insgesamt gefunden.") logger.info(f"Bisher {len(all_companies)} Firmen insgesamt gefunden.")
# "Weiter"-Button finden und prüfen, ob er deaktiviert ist
try: try:
next_button_selector = (By.XPATH, "//a[@class='eb-pagination-button'][.//svg[contains(@class, 'fa-angle-right')]]") next_button = self.driver.find_element(By.XPATH, "//a[@class='eb-pagination-button' and .//svg[contains(@class, 'fa-angle-right')]]")
next_button = self.driver.find_element(*next_button_selector)
if "disabled" in next_button.get_attribute("class"): if "disabled" in next_button.get_attribute("class"):
logger.info("'Weiter'-Button ist deaktiviert. Letzte Seite erreicht.") logger.info("Letzte Seite erreicht.")
break break
# Auf den "Weiter"-Button klicken
next_button.click() next_button.click()
logger.info(f"Auf Seite {page_number + 1} geblättert. Warte auf neue Inhalte...")
page_number += 1 page_number += 1
# Warten, bis die alte erste Zeile verschwunden ist (Staleness) # Warten, bis die alte erste Zeile verschwunden ist
# Dies ist der robusteste Weg, um zu bestätigen, dass die Seite neu geladen wurde.
old_first_row = self.driver.find_element(By.ID, first_row_id) old_first_row = self.driver.find_element(By.ID, first_row_id)
self.wait.until(EC.staleness_of(old_first_row)) self.wait.until(EC.staleness_of(old_first_row))
except NoSuchElementException: except NoSuchElementException:
logger.info("Kein 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.") logger.info("Kein 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.")
break break
@@ -160,35 +173,24 @@ class DealfrontScraper:
return all_companies return all_companies
def close(self): def close(self):
if self.driver: self.driver.quit() if self.driver:
self.driver.quit()
logger.info("WebDriver geschlossen.")
if __name__ == "__main__": if __name__ == "__main__":
logger.info("Starte Dealfront Automatisierung - Paginierungstest")
scraper = None scraper = None
try: try:
scraper = DealfrontScraper() scraper = DealfrontScraper()
if not scraper.login(): raise Exception("Login-Phase fehlgeschlagen") if not scraper.login(): raise Exception("Login fehlgeschlagen")
if not scraper.navigate_and_load_search(Config.SEARCH_NAME): raise Exception("Navigation/Suche fehlgeschlagen")
if not scraper.navigate_and_load_search(Config.SEARCH_NAME):
raise Exception("Navigation und Laden der Suche fehlgeschlagen.")
# Ruft die neue Methode auf, die durch alle Seiten blättert
all_companies = scraper.scrape_all_pages() all_companies = scraper.scrape_all_pages()
if all_companies: if all_companies:
df = pd.DataFrame(all_companies) df = pd.DataFrame(all_companies)
logger.info(f"===== Insgesamt {len(df)} Firmen auf allen Seiten extrahiert =====")
# Optionale Ausgabe der kompletten Liste in der Konsole
# pd.set_option('display.max_rows', None)
# pd.set_option('display.width', 120)
# print(df.to_string(index=False))
# Speichere die Ergebnisse in einer CSV-Datei im Output-Ordner
output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv") output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv")
df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig') df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig')
logger.info(f"Ergebnisse erfolgreich in '{output_csv_path}' gespeichert.") logger.info(f"Ergebnisse ({len(df)} Firmen) erfolgreich in '{output_csv_path}' gespeichert.")
else: else:
logger.warning("Keine Firmen konnten extrahiert werden.") logger.warning("Keine Firmen konnten extrahiert werden.")
@@ -197,5 +199,4 @@ if __name__ == "__main__":
finally: finally:
if scraper: if scraper:
scraper.close() scraper.close()
logger.info("Dealfront Automatisierung beendet.") logger.info("Dealfront Automatisierung beendet.")