diff --git a/data_processor.py b/data_processor.py index 3ef337a7..3b7d4f6e 100644 --- a/data_processor.py +++ b/data_processor.py @@ -2078,6 +2078,7 @@ class DataProcessor: """ self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}") + # --- 1. Tasks sammeln --- if start_sheet_row is None: start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp") if start_data_idx == -1: return @@ -2090,6 +2091,7 @@ class DataProcessor: for i in range(start_sheet_row - 1, len(all_data)): if limit is not None and len(tasks) >= limit: break row_data = all_data[i] + # VERWENDET IHRE EXAKTE LOGIK: if self._needs_website_processing(row_data, force_reeval=False): tasks.append({ 'row_num': i + 1, @@ -2101,10 +2103,12 @@ class DataProcessor: self.logger.info("Keine Zeilen gefunden, die Website-Verarbeitung erfordern.") return + # --- 2. Worker-Funktion definieren --- def _scrape_worker(task): """Interne Worker-Funktion. Gibt IMMER ein Dictionary zurück.""" company_name = task['company_name'] website_url = task['url'] + result = {'raw_text': 'k.A.', 'meta_text': 'k.A.', 'summary': 'k.A.', 'url_pruefstatus': 'URL_UNPROCESSED', 'final_url': website_url} try: @@ -2139,6 +2143,7 @@ class DataProcessor: result['url_pruefstatus'] = "URL_SCRAPE_ERROR" return result + # --- 3. Parallele Ausführung & Ergebnisse sammeln --- all_updates = [] now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -2154,7 +2159,7 @@ class DataProcessor: result_dict = future.result() if not isinstance(result_dict, dict): - self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {row_num}: Worker gab keinen Dictionary zurück. Überspringe.") + self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {row_num}: Worker gab keinen Dictionary zurück. Bekam {type(result_dict)}. Überspringe.") continue all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url')]]}) @@ -2167,6 +2172,7 @@ class DataProcessor: except Exception as e_future: self.logger.error(f"Fehler beim Abrufen des Ergebnisses für Zeile {row_num}: {e_future}", exc_info=True) + # --- 4. Finales Schreiben --- if all_updates: self.logger.info(f"Sende Batch-Update für {len(tasks)} verarbeitete Websites...") self.sheet_handler.batch_update_cells(all_updates) @@ -2606,23 +2612,17 @@ class DataProcessor: def _scrape_raw_text_task(self, task_info, scraper_function): """ - Interne Worker-Funktion für paralleles Scraping. - Gibt IMMER ein Dictionary zurück, um den `Erwartete dict, bekam `-Fehler zu beheben. + Worker-Funktion für Threading. Gibt IMMER ein Dictionary zurück, + um den `Erwartete dict, bekam `-Fehler zu beheben. """ row_num = task_info['row_num'] url = task_info['url'] self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num + 1}: {url}") - try: - # Ruft die übergebene scraper_function (get_website_raw) auf raw_text = scraper_function(url) - # Das ist der entscheidende Fix: Das Ergebnis wird immer in ein Dictionary verpackt. - # Wir prüfen auch, ob das Ergebnis auf einen Fehler hindeutet. is_error = "k.A." in raw_text or "FEHLER" in raw_text - return {'row_num': row_num, 'raw_text': raw_text, 'error': is_error} - except Exception as e: self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num + 1}: {e}") # Auch im absoluten Fehlerfall wird ein Dictionary zurückgegeben. @@ -3197,14 +3197,33 @@ class DataProcessor: self.evaluate_branch_task, task, openai_sem): task for task in batch_tasks_to_run} - for future in concurrent.futures.as_completed(future_map): - task_info = future_map[future] - try: - res_data = future.result() - current_batch_results.append(res_data) - if res_data.get('error'): - current_batch_errors += 1 - except Exception as exc_future: + for future in concurrent.futures.as_completed(future_to_task): + task = future_to_task[future] + try: + # HIER KOMMT JETZT GARANTIERT EIN DICTIONARY AN + result_dict = future.result() + + # Prüfe explizit, ob es wirklich ein Dictionary ist (doppelte Sicherheit) + if not isinstance(result_dict, dict): + self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {task['row_num']}: Worker gab keinen Dictionary zurück. Bekam {type(result_dict)}. Überspringe.") + batch_error_count += 1 + continue + + # Lese die Werte aus dem Dictionary, anstatt das ganze Objekt zu speichern + row_num_from_result = result_dict['row_num'] + raw_text_res = result_dict['raw_text'] + + scraping_results[row_num_from_result] = raw_text_res + + if result_dict.get('error'): + batch_error_count += 1 + + except Exception as exc: + row_num = task['row_num'] + err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage für Zeile {row_num} ({task['url'][:100]}): {exc}" + self.logger.error(err_msg) + scraping_results[row_num] = "k.A. (Unerwarteter Fehler Task)" + batch_error_count += 1 self.logger.error( f"Exception im Future für Zeile {task_info['row_num']}: {exc_future}") current_batch_results.append(