From 07d8596eb949df2dc2e7832a7541ee235d526bb5 Mon Sep 17 00:00:00 2001 From: Floke Date: Sun, 20 Jul 2025 07:52:24 +0000 Subject: [PATCH] data_processor.py aktualisiert --- data_processor.py | 122 +++++++++++++++++++++++++++++----------------- 1 file changed, 77 insertions(+), 45 deletions(-) diff --git a/data_processor.py b/data_processor.py index 27d3b7dd..cd7d5d20 100644 --- a/data_processor.py +++ b/data_processor.py @@ -2069,29 +2069,15 @@ class DataProcessor: 'meta_details': meta_details_result } - def _scrape_raw_text_task(self, task_info): - """ - Interne Worker-Funktion für paralleles Scraping, die IMMER ein Dictionary zurückgibt. - """ - row_num = task_info['row_num'] - url = task_info['url'] - self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num}: {url}") - try: - # Ruft die robuste get_website_raw Funktion auf - raw_text = get_website_raw(url) - # Gibt das Ergebnis immer als Dictionary zurück - return {'row_num': row_num, 'raw_text': raw_text} - except Exception as e: - self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num}: {e}") - return {'row_num': row_num, 'raw_text': f"FEHLER im Task: {e}"} def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None): """ - Batch-Prozess NUR für Website-Scraping (Rohtext). Basiert auf der - bewährten Logik aus v1.7.9, angepasst an die neue modulare Struktur und fehlerbereinigt. + Stabiler und robuster Batch-Prozess für Website-Scraping. + Führt Scraping, Meta-Extraktion und Zusammenfassung in einem parallelen Lauf aus. """ - self.logger.info(f"Starte Website-Scraping (Batch). Limit: {limit or 'Unbegrenzt'}") - + self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}") + + # --- 1. Tasks sammeln --- if start_sheet_row is None: start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp") start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1 @@ -2100,52 +2086,98 @@ class DataProcessor: all_data = self.sheet_handler.get_all_data_with_headers() tasks = [] - processed_count = 0 for i in range(start_sheet_row - 1, len(all_data)): - if limit is not None and processed_count >= limit: - break - + if limit is not None and len(tasks) >= limit: break row_data = all_data[i] if self._needs_website_processing(row_data, force_reeval=False): - website_url = self._get_cell_value_safe(row_data, "CRM Website").strip() - if website_url and website_url.lower() not in ["k.a.", "http:"]: - tasks.append({"row_num": i + 1, "url": website_url}) - processed_count += 1 + tasks.append({ + 'row_num': i + 1, + 'company_name': self._get_cell_value_safe(row_data, "CRM Name"), + 'url': self._get_cell_value_safe(row_data, "CRM Website") + }) if not tasks: - self.logger.info("Keine Zeilen für Website-Scraping gefunden.") + self.logger.info("Keine Zeilen gefunden, die Website-Verarbeitung erfordern.") return + # --- 2. Worker-Funktion definieren --- + def _scrape_worker(task): + """Interne Worker-Funktion. Gibt IMMER ein Dictionary zurück.""" + company_name = task['company_name'] + website_url = task['url'] + + result = {'raw_text': 'k.A.', 'meta_text': 'k.A.', 'summary': 'k.A.', 'url_pruefstatus': 'URL_UNPROCESSED', 'final_url': website_url} + + try: + if not website_url or website_url.lower() == 'k.a.': + found_url = serp_website_lookup(company_name) + if found_url and 'k.a.' not in found_url.lower(): + website_url = found_url + result['final_url'] = found_url + result['url_pruefstatus'] = "URL_OK_SERP" + else: + result['url_pruefstatus'] = "URL_SERP_FAILED" + return result + + if website_url and 'k.a.' not in website_url.lower(): + raw_text = get_website_raw(website_url) + result['raw_text'] = raw_text + + if raw_text == URL_CHECK_MARKER: + result['url_pruefstatus'] = URL_CHECK_MARKER + elif raw_text and 'k.a.' not in raw_text.lower(): + result['url_pruefstatus'] = "URL_OK_SCRAPED" + result['meta_text'] = scrape_website_details(website_url) or 'k.A.' + result['summary'] = summarize_website_content(raw_text, company_name) or 'k.A.' + else: + result['url_pruefstatus'] = "URL_SCRAPE_EMPTY_OR_BANNER" + else: + result['url_pruefstatus'] = "URL_MISSING" + + return result + except Exception as e: + self.logger.error(f" -> Kritischer Fehler im Scrape-Worker für {company_name}: {e}") + result['raw_text'] = f"FEHLER: {type(e).__name__}" + result['url_pruefstatus'] = "URL_SCRAPE_ERROR" + return result + + # --- 3. Parallele Ausführung & Ergebnisse sammeln --- all_updates = [] now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - current_version = getattr(Config, 'VERSION', 'unknown') - + from concurrent.futures import ThreadPoolExecutor, as_completed - with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 10)) as executor: - self.logger.info(f"Starte paralleles Scraping für {len(tasks)} Websites...") - future_to_task = {executor.submit(self._scrape_raw_text_task, task): task for task in tasks} - + with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 5)) as executor: + self.logger.info(f"Starte parallele Verarbeitung für {len(tasks)} Websites...") + future_to_task = {executor.submit(_scrape_worker, task): task for task in tasks} + for future in as_completed(future_to_task): - task = future_to_task[future] - row_num = task['row_num'] + row_num = future_to_task[future]['row_num'] try: - # Hier erhalten wir jetzt garantiert ein Dictionary result_dict = future.result() - raw_text_res = result_dict['raw_text'] - # Updates vorbereiten - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[raw_text_res]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)}{row_num}', 'values': [[now_timestamp]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Version") + 1)}{row_num}', 'values': [[current_version]]}) - except Exception as e_future: - self.logger.error(f" -> Fehler beim Verarbeiten des Ergebnisses für Zeile {row_num}: {e_future}") + # Stelle sicher, dass result_dict ein Dictionary ist + if not isinstance(result_dict, dict): + self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {row_num}: Worker gab keinen Dictionary zurück. Bekam {type(result_dict)}.") + continue + # Bereite die Updates vor + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url', '')]]}) + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[result_dict.get('raw_text', '')]]}) + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Meta-Details") + 1)}{row_num}', 'values': [[result_dict.get('meta_text', '')]]}) + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Zusammenfassung") + 1)}{row_num}', 'values': [[result_dict.get('summary', '')]]}) + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("URL Prüfstatus") + 1)}{row_num}', 'values': [[result_dict.get('url_pruefstatus', '')]]}) + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)}{row_num}', 'values': [[now_timestamp]]}) + + except Exception as e_future: + self.logger.error(f"Fehler beim Abrufen des Ergebnisses für Zeile {row_num}: {e_future}", exc_info=True) + + # --- 4. Finales Schreiben --- if all_updates: self.logger.info(f"Sende Batch-Update für {len(tasks)} verarbeitete Websites...") self.sheet_handler.batch_update_cells(all_updates) - self.logger.info(f"Website-Scraping (Batch) abgeschlossen. {len(tasks)} Zeilen verarbeitet.") + self.logger.info(f"Website-Verarbeitung (Batch) abgeschlossen. {len(tasks)} Zeilen verarbeitet.") def process_website_scraping( self,