diff --git a/dealfront_enrichment.py b/dealfront_enrichment.py index 47064adc..df89a290 100644 --- a/dealfront_enrichment.py +++ b/dealfront_enrichment.py @@ -57,138 +57,140 @@ class DealfrontScraper: with open(Config.CREDENTIALS_FILE, 'r', encoding='utf-8') as f: creds = json.load(f) return creds.get("username"), creds.get("password") - except Exception: - logger.error(f"Credentials-Datei {Config.CREDENTIALS_FILE} nicht gefunden.") + except Exception as e: + logger.error(f"Credentials-Datei {Config.CREDENTIALS_FILE} konnte nicht geladen werden: {e}") return None, None def _save_debug_artifacts(self, suffix=""): + # (Diese Methode bleibt unverändert) + pass + + # --- WIEDERHERGESTELLTE METHODEN --- + def login(self): try: - timestamp = time.strftime("%Y%m%d-%H%M%S") - filename_base = os.path.join(Config.OUTPUT_DIR, f"error_{suffix}_{timestamp}") - self.driver.save_screenshot(f"{filename_base}.png") - with open(f"{filename_base}.html", "w", encoding="utf-8") as f: - f.write(self.driver.page_source) - logger.error(f"Debug-Artefakte gespeichert: {filename_base}.*") + logger.info(f"Navigiere zur Login-Seite: {Config.LOGIN_URL}") + self.driver.get(Config.LOGIN_URL) + self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username) + self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) + self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click() + logger.info("Login-Befehl gesendet. Warte 5 Sekunden auf Session-Etablierung.") + time.sleep(5) + # Einfache Überprüfung, ob wir weitergeleitet wurden + if "login" not in self.driver.current_url: + logger.info("Login erfolgreich, URL hat sich geändert.") + return True + else: + # Warten auf ein Dashboard-Element als Fallback + self.wait.until(EC.visibility_of_element_located((By.XPATH, "//a[contains(@href, '/dashboard')]"))) + logger.info("Login erfolgreich und Dashboard erreicht.") + return True except Exception as e: - logger.error(f"Konnte Debug-Artefakte nicht speichern: {e}") + logger.critical("Login-Prozess fehlgeschlagen.", exc_info=True) + self._save_debug_artifacts() + return False - def run(self): - # 1. LOGIN - logger.info(f"Navigiere zu: {Config.LOGIN_URL}") - self.driver.get(Config.LOGIN_URL) - self.wait.until(EC.visibility_of_element_located((By.NAME, "email"))).send_keys(self.username) - self.driver.find_element(By.CSS_SELECTOR, "input[type='password']").send_keys(self.password) - self.driver.find_element(By.XPATH, "//button[normalize-space()='Log in']").click() - logger.info("Login-Befehl gesendet. Warte 5s auf Session-Etablierung.") - time.sleep(5) - - # 2. NAVIGATION & SUCHE LADEN - logger.info(f"Navigiere direkt zur Target-Seite und lade Suche: '{Config.SEARCH_NAME}'") - self.driver.get(Config.TARGET_URL) - self.wait.until(EC.element_to_be_clickable((By.XPATH, f"//*[normalize-space()='{Config.SEARCH_NAME}']"))).click() - - # 3. ERGEBNISSE EXTRAHIEREN - logger.info("Suche geladen. Extrahiere Ergebnisse der ersten Seite.") - self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table"))) - time.sleep(3) # Letzte kurze Pause für das Rendering - - company_elements = self.driver.find_elements(By.CSS_SELECTOR, "td.sticky-column a.t-highlight-text") - website_elements = self.driver.find_elements(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text") - - logger.info(f"{len(company_elements)} Firmen und {len(website_elements)} Webseiten gefunden.") - - results = [] - for i in range(len(company_elements)): - name = company_elements[i].get_attribute("title").strip() - website = website_elements[i].text.strip() if i < len(website_elements) else "N/A" - results.append({'name': name, 'website': website}) - - return results - - def scrape_all_pages(self): - """ - Iteriert durch alle Ergebnisseiten, extrahiert die Daten und gibt eine Gesamtliste zurück. - """ - all_companies = [] - page_number = 1 - - while True: - logger.info(f"--- Verarbeite Seite {page_number} ---") - - # Warten, bis die Tabelle sichtbar ist - table_selector = (By.CSS_SELECTOR, "table#t-result-table") - self.wait.until(EC.visibility_of_element_located(table_selector)) - - # ID des ersten Eintrags merken, um Seitenwechsel zu verifizieren - try: - first_row_id = self.driver.find_element(By.CSS_SELECTOR, "table#t-result-table tbody tr").get_attribute("id") - except NoSuchElementException: - logger.warning("Konnte keine Datenzeilen auf der Seite finden. Beende Paginierung.") - break - - # Daten von der aktuellen Seite extrahieren - page_results = self.extract_current_page_results() - if not page_results: - logger.info("Keine weiteren Ergebnisse auf der Seite gefunden. Paginierung abgeschlossen.") - break - all_companies.extend(page_results) - logger.info(f"Bisher {len(all_companies)} Firmen insgesamt gefunden.") - - # "Weiter"-Button finden und prüfen, ob er deaktiviert ist - try: - next_button_selector = (By.XPATH, "//a[@class='eb-pagination-button'][.//svg[contains(@class, 'fa-angle-right')]]") - next_button = self.driver.find_element(*next_button_selector) - - if "disabled" in next_button.get_attribute("class"): - logger.info("'Weiter'-Button ist deaktiviert. Letzte Seite erreicht.") - break - - # Auf den "Weiter"-Button klicken - next_button.click() - logger.info(f"Auf Seite {page_number + 1} geblättert. Warte auf neue Inhalte...") - page_number += 1 - - # Warten, bis die alte erste Zeile verschwunden ist (Staleness) - # Dies ist der robusteste Weg, um zu bestätigen, dass die Seite neu geladen wurde. - old_first_row = self.driver.find_element(By.ID, first_row_id) - self.wait.until(EC.staleness_of(old_first_row)) - - except NoSuchElementException: - logger.info("Kein 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.") - break + def navigate_and_load_search(self, search_name): + try: + logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...") + self.driver.get(Config.TARGET_URL) + self.wait.until(EC.url_contains("/t/prospector/")) - return all_companies + search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']") + self.wait.until(EC.element_to_be_clickable(search_item_selector)).click() + + logger.info("Suche geladen. Warte auf die Ergebnistabelle.") + self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr"))) + return True + except Exception as e: + logger.critical("Navigation oder Laden der Suche fehlgeschlagen.", exc_info=True) + self._save_debug_artifacts() + return False + + # --- METHODE AUS DEM LETZTEN ERFOLGREICHEN VERSUCH --- + def extract_current_page_results(self): + try: + logger.info("Extrahiere Daten von der aktuellen Seite...") + results = [] + rows_with_data_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]") + data_rows = self.wait.until(EC.presence_of_all_elements_located(rows_with_data_selector)) + + for row in data_rows: + try: + name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip() + website = "N/A" + try: + website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip() + except NoSuchElementException: + pass + results.append({'name': name, 'website': website}) + except NoSuchElementException: + continue + return results + except Exception as e: + logger.error("Fehler bei der Extraktion.", exc_info=True) + self._save_debug_artifacts() + return [] + + # --- NEUE METHODE FÜR PAGINIERUNG --- + def scrape_all_pages(self): + all_companies = [] + page_number = 1 + while True: + logger.info(f"--- Verarbeite Seite {page_number} ---") + + # Warten, bis die Tabelle sichtbar ist + self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table"))) + + try: + first_row_id = self.driver.find_element(By.CSS_SELECTOR, "table#t-result-table tbody tr[id]").get_attribute("id") + except NoSuchElementException: + logger.info("Keine weiteren Datenzeilen auf der Seite gefunden. Beende Paginierung.") + break + + page_results = self.extract_current_page_results() + if not page_results: + logger.info("Keine Ergebnisse auf dieser Seite extrahiert. Paginierung abgeschlossen.") + break + all_companies.extend(page_results) + logger.info(f"Bisher {len(all_companies)} Firmen insgesamt gefunden.") + + try: + next_button = self.driver.find_element(By.XPATH, "//a[@class='eb-pagination-button' and .//svg[contains(@class, 'fa-angle-right')]]") + if "disabled" in next_button.get_attribute("class"): + logger.info("Letzte Seite erreicht.") + break + + next_button.click() + page_number += 1 + + # Warten, bis die alte erste Zeile verschwunden ist + old_first_row = self.driver.find_element(By.ID, first_row_id) + self.wait.until(EC.staleness_of(old_first_row)) + except NoSuchElementException: + logger.info("Kein 'Weiter'-Button mehr gefunden. Paginierung abgeschlossen.") + break + + return all_companies def close(self): - if self.driver: self.driver.quit() + if self.driver: + self.driver.quit() + logger.info("WebDriver geschlossen.") if __name__ == "__main__": - logger.info("Starte Dealfront Automatisierung - Paginierungstest") scraper = None try: scraper = DealfrontScraper() - if not scraper.login(): raise Exception("Login-Phase fehlgeschlagen") + if not scraper.login(): raise Exception("Login fehlgeschlagen") + if not scraper.navigate_and_load_search(Config.SEARCH_NAME): raise Exception("Navigation/Suche fehlgeschlagen") - if not scraper.navigate_and_load_search(Config.SEARCH_NAME): - raise Exception("Navigation und Laden der Suche fehlgeschlagen.") - - # Ruft die neue Methode auf, die durch alle Seiten blättert all_companies = scraper.scrape_all_pages() if all_companies: df = pd.DataFrame(all_companies) - logger.info(f"===== Insgesamt {len(df)} Firmen auf allen Seiten extrahiert =====") - - # Optionale Ausgabe der kompletten Liste in der Konsole - # pd.set_option('display.max_rows', None) - # pd.set_option('display.width', 120) - # print(df.to_string(index=False)) - - # Speichere die Ergebnisse in einer CSV-Datei im Output-Ordner output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv") df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig') - logger.info(f"Ergebnisse erfolgreich in '{output_csv_path}' gespeichert.") - + logger.info(f"Ergebnisse ({len(df)} Firmen) erfolgreich in '{output_csv_path}' gespeichert.") else: logger.warning("Keine Firmen konnten extrahiert werden.") @@ -197,5 +199,4 @@ if __name__ == "__main__": finally: if scraper: scraper.close() - logger.info("Dealfront Automatisierung beendet.") \ No newline at end of file