From e8c901a081da1e98fe22ab2583d080b0b7e50ad3 Mon Sep 17 00:00:00 2001 From: Floke Date: Sun, 20 Jul 2025 07:40:30 +0000 Subject: [PATCH] data_processor.py aktualisiert --- data_processor.py | 181 +++++++++++++++++++++++++--------------------- 1 file changed, 100 insertions(+), 81 deletions(-) diff --git a/data_processor.py b/data_processor.py index 846e3e67..826c0365 100644 --- a/data_processor.py +++ b/data_processor.py @@ -27,6 +27,7 @@ from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, classification_report, confusion_matrix from imblearn.over_sampling import SMOTE from imblearn.pipeline import Pipeline as ImbPipeline +from concurrent.futures import ThreadPoolExecutor, as_completed # Import der abhängigen Module from config import Config, COLUMN_MAP, MODEL_FILE, IMPUTER_FILE, PATTERNS_FILE_JSON @@ -458,7 +459,7 @@ class DataProcessor: updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Wikipedia Timestamp") + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]}) -# --- 3. ChatGPT Evaluationen (Branch, FSM, etc.) & Plausi --- + # --- 3. ChatGPT Evaluationen (Branch, FSM, etc.) & Plausi --- run_chat_step = 'chat' in steps_to_run # KORREKTUR: Initialisiere chat_steps_to_run, um den NameError zu beheben chat_steps_to_run = set() @@ -754,7 +755,7 @@ class DataProcessor: # === Abschluss der _process_single_row Verarbeitung ================== # ====================================================================== -# --- 5. Abschliessende Updates (Version, Tokens, ReEval Flag) --- + # --- 5. Abschliessende Updates (Version, Tokens, ReEval Flag) --- if any_processing_done: version_col_idx = get_col_idx("Version") # KORRIGIERT if version_col_idx is not None: @@ -811,6 +812,59 @@ class DataProcessor: # === Prozess Methoden (Sequentiell & Re-Evaluation) ===================== # ========================================================================== + def _scrape_and_summarize_task(self, task_info): + """ + Interne Worker-Funktion für paralleles Scraping und Summarizing. + Gibt IMMER ein Dictionary mit allen relevanten Website-Daten zurück. + """ + row_num = task_info['row_num'] + company_name = task_info['company_name'] + website_url = task_info['url'] + self.logger.debug(f" -> Scrape-Task gestartet für Zeile {row_num}: {website_url}") + + result = { + 'raw_text': 'k.A.', + 'meta_text': 'k.A.', + 'summary': 'k.A.', + 'url_pruefstatus': 'URL_UNPROCESSED', + 'final_url': website_url # Behalte die ursprüngliche URL für den Fall eines SERP-Lookups + } + + try: + # 1. SERP-Lookup, falls keine URL vorhanden ist + if not website_url or website_url.lower() == 'k.a.': + found_url = serp_website_lookup(company_name) + if found_url and 'k.a.' not in found_url.lower(): + website_url = found_url + result['final_url'] = found_url + result['url_pruefstatus'] = "URL_OK_SERP" + else: + result['url_pruefstatus'] = "URL_SERP_FAILED" + return result # Beende hier, wenn keine URL gefunden wurde + + # 2. Scrape Rohtext + raw_text = get_website_raw(website_url) + result['raw_text'] = raw_text + + # 3. Bewerte das Ergebnis des Scrapings + if raw_text == URL_CHECK_MARKER: + result['url_pruefstatus'] = URL_CHECK_MARKER + elif raw_text and 'k.a.' not in raw_text.lower(): + result['url_pruefstatus'] = "URL_OK_SCRAPED" + # 4. Scrape Meta-Daten und erstelle Zusammenfassung nur bei Erfolg + result['meta_text'] = scrape_website_details(website_url) or 'k.A.' + result['summary'] = summarize_website_content(raw_text, company_name) or 'k.A.' + else: + result['url_pruefstatus'] = "URL_SCRAPE_EMPTY_OR_BANNER" + + return result + + except Exception as e: + self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num}: {e}") + result['raw_text'] = f"FEHLER: {type(e).__name__}" + result['url_pruefstatus'] = "URL_SCRAPE_ERROR" + return result + def process_rows_sequentially( self, start_sheet_row, @@ -2032,97 +2086,62 @@ class DataProcessor: def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None): """ - Batch-Prozess NUR fuer Website-Scraping (Rohtext AR). - Basiert auf der Logik aus v1.7.9, angepasst an die neue modulare Struktur und fehlerbereinigt. + Effizienter Batch-Prozess, der Websites parallel scraped, Meta-Daten extrahiert + und Zusammenfassungen erstellt. """ - self.logger.info(f"Starte Website-Scraping (Batch). Bereich: {start_sheet_row or 'Start'}-{end_sheet_row or 'Ende'}, Limit: {limit or 'Unbegrenzt'}") + self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}") - # --- Daten laden und Startzeile ermitteln --- if start_sheet_row is None: - start_data_idx = self.sheet_handler.get_start_row_index(check_column_key="Website Scrape Timestamp") - if start_data_idx == -1: - self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") - return + start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp") start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1 - + if not self.sheet_handler.load_data(): return all_data = self.sheet_handler.get_all_data_with_headers() - header_rows = self.sheet_handler._header_rows - total_sheet_rows = len(all_data) - effective_end_row = end_sheet_row if end_sheet_row is not None else total_sheet_rows - self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {effective_end_row}.") - if start_sheet_row > effective_end_row: - self.logger.info("Start liegt nach dem Ende. Keine Zeilen zu verarbeiten.") - return - - # --- Hauptlogik: Iteriere und sammle Batches --- - processing_batch_size = getattr(Config, 'PROCESSING_BATCH_SIZE', 20) - max_scraping_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10) - update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50) - - tasks_for_processing_batch = [] - all_sheet_updates = [] - processed_count = 0 + tasks = [] + for i in range(start_sheet_row - 1, len(all_data)): + if limit is not None and len(tasks) >= limit: break + row_data = all_data[i] + if self._needs_website_processing(row_data, force_reeval=False): + tasks.append({ + 'row_num': i + 1, + 'company_name': self._get_cell_value_safe(row_data, "CRM Name"), + 'url': self._get_cell_value_safe(row_data, "CRM Website") + }) - for i in range(start_sheet_row, effective_end_row + 1): - row_index_in_list = i - 1 - if row_index_in_list >= total_sheet_rows: break + if not tasks: + self.logger.info("Keine Zeilen gefunden, die Website-Verarbeitung erfordern.") + return + + all_updates = [] + now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + + with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 5)) as executor: + self.logger.info(f"Starte parallele Verarbeitung für {len(tasks)} Websites...") + future_to_task = {executor.submit(self._scrape_and_summarize_task, task): task for task in tasks} - row = all_data[row_index_in_list] - if not any(cell and str(cell).strip() for cell in row): continue - - # --- Pruefung, ob Verarbeitung noetig --- - if self._needs_website_processing(row, force_reeval=False): - website_url = self._get_cell_value_safe(row, "CRM Website").strip() - if website_url and website_url.lower() not in ["k.a.", "http:"]: - if limit is not None and processed_count >= limit: - self.logger.info(f"Verarbeitungslimit ({limit}) erreicht.") - break + for future in as_completed(future_to_task): + row_num = future_to_task[future]['row_num'] + try: + result_dict = future.result() - tasks_for_processing_batch.append({"row_num": i, "url": website_url}) - processed_count += 1 + # Schreibe alle Ergebnisse zurück + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url')]]}) + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[result_dict.get('raw_text')]]}) + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Meta-Details") + 1)}{row_num}', 'values': [[result_dict.get('meta_text')]]}) + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Zusammenfassung") + 1)}{row_num}', 'values': [[result_dict.get('summary')]]}) + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("URL Prüfstatus") + 1)}{row_num}', 'values': [[result_dict.get('url_pruefstatus')]]}) + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)}{row_num}', 'values': [[now_timestamp]]}) + + except Exception as e_future: + self.logger.error(f"Fehler beim Abrufen des Ergebnisses für Zeile {row_num}: {e_future}") - # --- Verarbeite den Batch, wenn voll --- - if len(tasks_for_processing_batch) >= processing_batch_size or (i == effective_end_row and tasks_for_processing_batch): - self.logger.debug(f"--- Starte Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks) ---") - scraping_results = {} - - with ThreadPoolExecutor(max_workers=max_scraping_workers) as executor: - future_to_task = {executor.submit(self._scrape_raw_text_task, task, get_website_raw): task for task in tasks_for_processing_batch} - for future in as_completed(future_to_task): - try: - result = future.result() - scraping_results[result['row_num']] = result['raw_text'] - except Exception as exc: - task = future_to_task[future] - self.logger.error(f"Unerwarteter Fehler bei Ergebnisabfrage für Zeile {task['row_num']}: {exc}") - scraping_results[task['row_num']] = "k.A. (Unerwarteter Fehler Task)" + if all_updates: + self.logger.info(f"Sende Batch-Update für {len(tasks)} verarbeitete Websites...") + self.sheet_handler.batch_update_cells(all_updates) - if scraping_results: - current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - current_version = getattr(Config, 'VERSION', 'unknown') - batch_sheet_updates = [] - for row_num, raw_text_res in scraping_results.items(): - # KORRIGIERT: Nutze die sichere `get_col_idx`-Funktion - batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[raw_text_res]]}) - batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)}{row_num}', 'values': [[current_timestamp]]}) - batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Version") + 1)}{row_num}', 'values': [[current_version]]}) - all_sheet_updates.extend(batch_sheet_updates) - - tasks_for_processing_batch = [] - - if len(all_sheet_updates) >= (update_batch_row_limit * 3): # 3 Updates pro Zeile - self.logger.info(f"Sende gesammelte Sheet-Updates ({len(all_sheet_updates) // 3} Zeilen)...") - self.sheet_handler.batch_update_cells(all_sheet_updates) - all_sheet_updates = [] - - # --- Finale Sheet Updates senden --- - if all_sheet_updates: - self.logger.info(f"Sende FINALE gesammelte Sheet-Updates ({len(all_sheet_updates) // 3} Zeilen)...") - self.sheet_handler.batch_update_cells(all_sheet_updates) - - self.logger.info(f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen zur Verarbeitung ausgewählt.") + self.logger.info(f"Website-Verarbeitung (Batch) abgeschlossen. {len(tasks)} Zeilen verarbeitet.") def process_website_scraping( self,