From b0a7b8893a47b30bf720c901a819ae410985cc84 Mon Sep 17 00:00:00 2001 From: Floke Date: Sun, 20 Jul 2025 08:47:54 +0000 Subject: [PATCH] data_processor.py aktualisiert --- data_processor.py | 204 ++++++++++++++++++++++++---------------------- 1 file changed, 105 insertions(+), 99 deletions(-) diff --git a/data_processor.py b/data_processor.py index 3b7d4f6e..3478596e 100644 --- a/data_processor.py +++ b/data_processor.py @@ -2070,114 +2070,123 @@ class DataProcessor: } - def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None): + def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None): """ - Stabiler und robuster Batch-Prozess für Website-Scraping. - Führt Scraping, Meta-Extraktion und Zusammenfassung in einem parallelen Lauf aus. - FINALE, GETESTETE VERSION. + Batch-Prozess NUR für Website-Scraping (Rohtext). Basiert auf der + bewährten Logik aus v1.7.9, angepasst an die neue modulare Struktur und fehlerbereinigt. """ - self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}") + self.logger.info(f"Starte Website-Scraping (Batch). Bereich: {start_sheet_row or 'Start'}-{end_sheet_row or 'Ende'}, Limit: {limit or 'Unbegrenzt'}") - # --- 1. Tasks sammeln --- + # --- Daten laden und Startzeile ermitteln --- if start_sheet_row is None: - start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp") - if start_data_idx == -1: return + self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren Timestamp...") + start_data_idx = self.sheet_handler.get_start_row_index(check_column_key="Website Scrape Timestamp") + if start_data_idx == -1: + self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") + return start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1 + self.logger.info(f"Automatisch ermittelte Startzeile: {start_sheet_row}") - if not self.sheet_handler.load_data(): return - all_data = self.sheet_handler.get_all_data_with_headers() - - tasks = [] - for i in range(start_sheet_row - 1, len(all_data)): - if limit is not None and len(tasks) >= limit: break - row_data = all_data[i] - # VERWENDET IHRE EXAKTE LOGIK: - if self._needs_website_processing(row_data, force_reeval=False): - tasks.append({ - 'row_num': i + 1, - 'company_name': self._get_cell_value_safe(row_data, "CRM Name"), - 'url': self._get_cell_value_safe(row_data, "CRM Website") - }) - - if not tasks: - self.logger.info("Keine Zeilen gefunden, die Website-Verarbeitung erfordern.") + if not self.sheet_handler.load_data(): + self.logger.error("FEHLER beim Laden der Daten für Batch-Verarbeitung.") return - # --- 2. Worker-Funktion definieren --- - def _scrape_worker(task): - """Interne Worker-Funktion. Gibt IMMER ein Dictionary zurück.""" - company_name = task['company_name'] - website_url = task['url'] - - result = {'raw_text': 'k.A.', 'meta_text': 'k.A.', 'summary': 'k.A.', 'url_pruefstatus': 'URL_UNPROCESSED', 'final_url': website_url} - - try: - if not website_url or website_url.lower() == 'k.a.': - found_url = serp_website_lookup(company_name) - if found_url and 'k.a.' not in found_url.lower(): - website_url = found_url - result['final_url'] = found_url - result['url_pruefstatus'] = "URL_OK_SERP" - else: - result['url_pruefstatus'] = "URL_SERP_FAILED"; return result - - if website_url and 'k.a.' not in website_url.lower(): - raw_text = get_website_raw(website_url) - result['raw_text'] = raw_text - - if raw_text == URL_CHECK_MARKER: - result['url_pruefstatus'] = URL_CHECK_MARKER - elif raw_text and 'k.a.' not in raw_text.lower(): - result['url_pruefstatus'] = "URL_OK_SCRAPED" - result['meta_text'] = scrape_website_details(website_url) or 'k.A.' - result['summary'] = summarize_website_content(raw_text, company_name) or 'k.A.' - else: - result['url_pruefstatus'] = "URL_SCRAPE_EMPTY_OR_BANNER" - else: - result['url_pruefstatus'] = "URL_MISSING" - - return result - except Exception as e: - self.logger.error(f" -> Kritischer Fehler im Scrape-Worker für {company_name}: {e}") - result['raw_text'] = f"FEHLER: {type(e).__name__}" - result['url_pruefstatus'] = "URL_SCRAPE_ERROR" - return result - - # --- 3. Parallele Ausführung & Ergebnisse sammeln --- - all_updates = [] - now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + all_data = self.sheet_handler.get_all_data_with_headers() + header_rows = self.sheet_handler._header_rows + total_sheet_rows = len(all_data) + effective_end_row = end_sheet_row if end_sheet_row is not None else total_sheet_rows - from concurrent.futures import ThreadPoolExecutor, as_completed + self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {effective_end_row}.") + if start_sheet_row > effective_end_row: + self.logger.info("Start liegt nach dem Ende. Keine Zeilen zu verarbeiten.") + return - with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 5)) as executor: - self.logger.info(f"Starte parallele Verarbeitung für {len(tasks)} Websites...") - future_to_task = {executor.submit(_scrape_worker, task): task for task in tasks} + # --- Indizes und Buchstaben --- + rohtext_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1) + version_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Version") + 1) + timestamp_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1) + + # --- Hauptlogik: Iteriere und sammle Batches --- + processing_batch_size = getattr(Config, 'PROCESSING_BATCH_SIZE', 20) + max_scraping_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10) + update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50) + + tasks_for_processing_batch = [] + all_sheet_updates = [] + processed_count = 0 + skipped_count = 0 + + for i in range(start_sheet_row, effective_end_row + 1): + row_index_in_list = i - 1 + if row_index_in_list >= total_sheet_rows: break - for future in as_completed(future_to_task): - row_num = future_to_task[future]['row_num'] - try: - result_dict = future.result() + row = all_data[row_index_in_list] + if not any(cell and str(cell).strip() for cell in row): + skipped_count += 1 + continue + + if self._needs_website_processing(row, force_reeval=False): + website_url = self._get_cell_value_safe(row, "CRM Website").strip() + if website_url and website_url.lower() not in ["k.a.", "http:"]: + if limit is not None and processed_count >= limit: + self.logger.info(f"Verarbeitungslimit ({limit}) erreicht.") + break - if not isinstance(result_dict, dict): - self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {row_num}: Worker gab keinen Dictionary zurück. Bekam {type(result_dict)}. Überspringe.") - continue + # WICHTIG: row_num muss 1-basiert sein für die Ausgabe + tasks_for_processing_batch.append({"row_num": i, "url": website_url}) + processed_count += 1 + else: + skipped_count += 1 + else: + skipped_count += 1 - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url')]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[result_dict.get('raw_text')]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Meta-Details") + 1)}{row_num}', 'values': [[result_dict.get('meta_text')]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Zusammenfassung") + 1)}{row_num}', 'values': [[result_dict.get('summary')]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("URL Prüfstatus") + 1)}{row_num}', 'values': [[result_dict.get('url_pruefstatus')]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)}{row_num}', 'values': [[now_timestamp]]}) - - except Exception as e_future: - self.logger.error(f"Fehler beim Abrufen des Ergebnisses für Zeile {row_num}: {e_future}", exc_info=True) + if len(tasks_for_processing_batch) >= processing_batch_size or (i == effective_end_row and tasks_for_processing_batch): + self.logger.debug(f"--- Starte Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks) ---") + scraping_results = {} + batch_error_count = 0 + + with ThreadPoolExecutor(max_workers=max_scraping_workers) as executor: + future_to_task = {executor.submit(self._scrape_raw_text_task, task, get_website_raw): task for task in tasks_for_processing_batch} + for future in as_completed(future_to_task): + try: + result_dict = future.result() + if isinstance(result_dict, dict): + scraping_results[result_dict['row_num']] = result_dict['raw_text'] + if result_dict.get('error'): + batch_error_count += 1 + else: + task = future_to_task[future] + self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {task['row_num']}: Erwartete dict, bekam {type(result_dict)}. Überspringe.") + scraping_results[task['row_num']] = "FEHLER (Inkonsistenter Rückgabetyp)" + batch_error_count += 1 + except Exception as exc: + task = future_to_task[future] + self.logger.error(f"Unerwarteter Fehler bei Ergebnisabfrage für Zeile {task['row_num']}: {exc}") + scraping_results[task['row_num']] = "FEHLER (Task Exception)" + batch_error_count += 1 + + self.logger.debug(f" Scraping für Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler).") - # --- 4. Finales Schreiben --- - if all_updates: - self.logger.info(f"Sende Batch-Update für {len(tasks)} verarbeitete Websites...") - self.sheet_handler.batch_update_cells(all_updates) + if scraping_results: + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_version = getattr(Config, 'VERSION', 'unknown') + for row_num, raw_text_res in scraping_results.items(): + all_sheet_updates.append({'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]}) + all_sheet_updates.append({'range': f'{timestamp_col_letter}{row_num}', 'values': [[current_timestamp]]}) + all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) - self.logger.info(f"Website-Verarbeitung (Batch) abgeschlossen. {len(tasks)} Zeilen verarbeitet.") + tasks_for_processing_batch = [] + + if len(all_sheet_updates) >= (update_batch_row_limit * 3): + self.logger.info(f"Sende gesammelte Sheet-Updates ({len(all_sheet_updates) // 3} Zeilen)...") + self.sheet_handler.batch_update_cells(all_sheet_updates) + all_sheet_updates = [] + + if all_sheet_updates: + self.logger.info(f"Sende FINALE gesammelte Sheet-Updates ({len(all_sheet_updates) // 3} Zeilen)...") + self.sheet_handler.batch_update_cells(all_sheet_updates) + + self.logger.info(f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen zur Verarbeitung ausgewählt, {skipped_count} Zeilen übersprungen.") def process_website_scraping( self, @@ -2612,20 +2621,17 @@ class DataProcessor: def _scrape_raw_text_task(self, task_info, scraper_function): """ - Worker-Funktion für Threading. Gibt IMMER ein Dictionary zurück, - um den `Erwartete dict, bekam `-Fehler zu beheben. + Worker-Funktion für Threading. Gibt IMMER ein Dictionary zurück. """ row_num = task_info['row_num'] url = task_info['url'] - self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num + 1}: {url}") + self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num}: {url}") try: raw_text = scraper_function(url) - # Das ist der entscheidende Fix: Das Ergebnis wird immer in ein Dictionary verpackt. is_error = "k.A." in raw_text or "FEHLER" in raw_text return {'row_num': row_num, 'raw_text': raw_text, 'error': is_error} except Exception as e: - self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num + 1}: {e}") - # Auch im absoluten Fehlerfall wird ein Dictionary zurückgegeben. + self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num}: {e}") return {'row_num': row_num, 'raw_text': f"FEHLER im Task: {e}", 'error': True}