dealfront_enrichment.py aktualisiert

This commit is contained in:
2025-07-14 07:38:41 +00:00
parent 85f559b684
commit 908563a2a5

View File

@@ -35,17 +35,15 @@ class DealfrontScraper:
logger.info("Initialisiere WebDriver...") logger.info("Initialisiere WebDriver...")
chrome_options = ChromeOptions() chrome_options = ChromeOptions()
chrome_options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2}) chrome_options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2})
chrome_options.add_argument("--headless=new") # chrome_options.add_argument("--headless=new") # Headless DEAKTIVIERT für Debugging!
chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1200") chrome_options.add_argument("--window-size=1920,1200")
try: try:
self.driver = webdriver.Chrome(options=chrome_options) self.driver = webdriver.Chrome(options=chrome_options)
except Exception as e: except Exception as e:
logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True) logger.critical("WebDriver konnte nicht initialisiert werden.", exc_info=True)
raise raise
self.wait = WebDriverWait(self.driver, 30) self.wait = WebDriverWait(self.driver, 30)
self.username, self.password = self._load_credentials() self.username, self.password = self._load_credentials()
if not self.username or not self.password: if not self.username or not self.password:
@@ -112,10 +110,8 @@ class DealfrontScraper:
logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...") logger.info(f"Navigiere direkt zur Target-Seite und lade die Suche...")
self.driver.get(Config.TARGET_URL) self.driver.get(Config.TARGET_URL)
self.wait.until(EC.url_contains("/t/prospector/")) self.wait.until(EC.url_contains("/t/prospector/"))
search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']") search_item_selector = (By.XPATH, f"//div[contains(@class, 'truncate') and normalize-space()='{search_name}']")
self.wait.until(EC.element_to_be_clickable(search_item_selector)).click() self.wait.until(EC.element_to_be_clickable(search_item_selector)).click()
logger.info("Suche geladen. Warte auf das Rendern der Ergebnistabelle.") logger.info("Suche geladen. Warte auf das Rendern der Ergebnistabelle.")
self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr"))) self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "table#t-result-table tbody tr")))
return True return True
@@ -124,35 +120,30 @@ class DealfrontScraper:
self._save_debug_artifacts("navigation_or_search_load") self._save_debug_artifacts("navigation_or_search_load")
return False return False
def extract_current_page_results(self, page_number=None): def extract_visible_firmennamen_js(self):
results = [] """
rows_selector = (By.XPATH, "//table[@id='t-result-table']/tbody/tr[.//a[contains(@class, 't-highlight-text')]]") Extrahiert die sichtbaren Firmennamen und Websites direkt per JavaScript aus der Tabelle.
try: """
data_rows = [row for row in self.driver.find_elements(*rows_selector) if row.is_displayed()] script = """
for row in data_rows: let rows = document.querySelectorAll('table#t-result-table tbody tr');
try: let result = [];
name = row.find_element(By.CSS_SELECTOR, ".sticky-column a.t-highlight-text").get_attribute("title").strip() for (let row of rows) {
website = "N/A" let nameElem = row.querySelector('.sticky-column a.t-highlight-text');
try: let websiteElem = row.querySelector('a.text-gray-400.t-highlight-text');
website = row.find_element(By.CSS_SELECTOR, "a.text-gray-400.t-highlight-text").text.strip() if (nameElem) {
except NoSuchElementException: result.push({
pass name: nameElem.getAttribute('title') || nameElem.innerText,
res = {'name': name, 'website': website} website: websiteElem ? websiteElem.innerText : ''
if page_number is not None: });
res['page'] = page_number }
results.append(res) }
except NoSuchElementException: return result;
continue """
except Exception as e: return self.driver.execute_script("return " + script)
logger.error(f"Fehler bei der Extraktion auf der aktuellen Seite: {e}")
return results
def scrape_all_pages(self, max_pages=10): def scrape_all_pages(self, max_pages=10):
"""
Iteriert durch alle Ergebnisseiten, wartet nach jedem Paginieren explizit auf neue Seitenzahl,
wartet nach jedem Seitenwechsel und Scrollen ausreichend, und sammelt alle Firmenzeilen seitengetreu.
"""
all_companies = [] all_companies = []
previous_first_name = None
for page_number in range(1, max_pages + 1): for page_number in range(1, max_pages + 1):
logger.info(f"--- Verarbeite Seite {page_number} ---") logger.info(f"--- Verarbeite Seite {page_number} ---")
try: try:
@@ -164,21 +155,17 @@ class DealfrontScraper:
logger.info("Warte 5 Sekunden, um sicherzugehen, dass alle Daten geladen sind...") logger.info("Warte 5 Sekunden, um sicherzugehen, dass alle Daten geladen sind...")
time.sleep(5) time.sleep(5)
# Vor Pagination: Aktive Seitenzahl merken # Scroll an den Anfang und dann langsam nach unten
try: self.driver.execute_script("window.scrollTo(0, 0);")
active_page_elem = self.driver.find_element(By.CSS_SELECTOR, "a.eb-pagination-button.active") time.sleep(0.5)
active_page_num = active_page_elem.text.strip()
logger.info(f"Vor Pagination: Aktive Seite: {active_page_num}")
except NoSuchElementException:
active_page_num = None
logger.warning("Vor Pagination: Konnte aktive Seite nicht ermitteln.")
self.scroll_table_slowly() self.scroll_table_slowly()
logger.info("Warte nach Scrollen nochmals 3 Sekunden...") logger.info("Warte nach Scrollen nochmals 2 Sekunden...")
time.sleep(3) time.sleep(2)
# Jetzt erst Daten extrahieren # Jetzt per JS extrahieren
page_results = self.extract_current_page_results(page_number=page_number) page_results = self.extract_visible_firmennamen_js()
for r in page_results:
r['page'] = page_number
logger.info(f"Seite {page_number}: {len(page_results)} Firmen gefunden. Erste Firmen: {[r['name'] for r in page_results[:3]]}") logger.info(f"Seite {page_number}: {len(page_results)} Firmen gefunden. Erste Firmen: {[r['name'] for r in page_results[:3]]}")
all_companies.extend(page_results) all_companies.extend(page_results)
@@ -221,19 +208,24 @@ class DealfrontScraper:
self.driver.execute_script("arguments[0].click();", next_button) self.driver.execute_script("arguments[0].click();", next_button)
logger.info("Klick auf Weiter-Button ausgeführt.") logger.info("Klick auf Weiter-Button ausgeführt.")
# Warte auf neue aktive Seitenzahl # Warte auf Änderung des ersten Firmennamens
if page_results:
previous_first_name = page_results[0]['name']
else:
previous_first_name = ""
def page_changed(driver): def page_changed(driver):
try: try:
elem = driver.find_element(By.CSS_SELECTOR, "a.eb-pagination-button.active") name = driver.execute_script("""
page_num = elem.text.strip() let row = document.querySelector('table#t-result-table tbody tr');
logger.debug(f"Nach Pagination: Aktive Seite: {page_num}") if (!row) return '';
return page_num != active_page_num let nameElem = row.querySelector('.sticky-column a.t-highlight-text');
return nameElem ? (nameElem.getAttribute('title') || nameElem.innerText) : '';
""")
return name and name != previous_first_name
except Exception: except Exception:
return False return False
self.wait.until(page_changed) self.wait.until(page_changed)
logger.info("Seitenwechsel erfolgreich verifiziert (aktive Seitenzahl hat sich geändert).") logger.info("Seitenwechsel erfolgreich verifiziert (erster Firmenname hat sich geändert).")
except Exception as e: except Exception as e:
logger.error(f"Fehler beim Klicken auf den Weiter-Button oder beim Warten auf neue Seite: {e}") logger.error(f"Fehler beim Klicken auf den Weiter-Button oder beim Warten auf neue Seite: {e}")
try: try:
@@ -263,8 +255,6 @@ if __name__ == "__main__":
if all_companies: if all_companies:
df = pd.DataFrame(all_companies) df = pd.DataFrame(all_companies)
# Optional: Deduplizieren nach Name, Website und Seite
# df = df.drop_duplicates(subset=["name", "website", "page"])
output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv") output_csv_path = os.path.join(Config.OUTPUT_DIR, f"dealfront_results_{time.strftime('%Y%m%d-%H%M%S')}.csv")
df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig') df.to_csv(output_csv_path, index=False, sep=';', encoding='utf-8-sig')
logger.info(f"Ergebnisse ({len(df)} Firmen) erfolgreich in '{output_csv_path}' gespeichert.") logger.info(f"Ergebnisse ({len(df)} Firmen) erfolgreich in '{output_csv_path}' gespeichert.")