diff --git a/brancheneinstufung.py b/brancheneinstufung.py index d87a80cf..660d33ab 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -2496,17 +2496,16 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ """ Batch-Prozess für Brancheneinschätzung mit paralleler Verarbeitung via Threads. Prüft Timestamp AO, führt evaluate_branche_chatgpt parallel aus (limitiert), - setzt W, X, Y, AO + AP und sendet Sheet-Updates gebündelt. + setzt W, X, Y, AO + AP und sendet Sheet-Updates GEBÜNDELT PRO VERARBEITUNGS-BATCH. """ debug_print(f"Starte Brancheneinschätzung (Parallel Batch) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...") - # --- Lade Daten --- if not sheet_handler.load_data(): return all_data = sheet_handler.get_all_data_with_headers() if not all_data or len(all_data) <= 5: return header_rows = 5 - # --- Indizes und Buchstaben --- + # Indizes und Buchstaben holen timestamp_col_key = "Timestamp letzte Prüfung" timestamp_col_index = COLUMN_MAP.get(timestamp_col_key) branche_crm_idx = COLUMN_MAP.get("CRM Branche") @@ -2521,60 +2520,44 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ required_indices = [timestamp_col_index, branche_crm_idx, beschreibung_idx, branche_wiki_idx, kategorien_wiki_idx, summary_web_idx, version_col_index, branch_w_idx, branch_x_idx, branch_y_idx] - if None in required_indices: return debug_print(f"FEHLER: Indizes für process_branch_batch fehlen.") + if None in required_indices: return debug_print(f"FEHLER: Indizes fehlen.") ts_col_letter = sheet_handler._get_col_letter(timestamp_col_index + 1) - version_col_letter = sheet_handler._get_col_letter(version_col_index + 1) + version_col_letter = sheet_handler._get_col_letter(version_col_idx + 1) branch_w_letter = sheet_handler._get_col_letter(branch_w_idx + 1) branch_x_letter = sheet_handler._get_col_letter(branch_x_idx + 1) branch_y_letter = sheet_handler._get_col_letter(branch_y_idx + 1) - # --- Konfiguration für Parallelisierung --- - MAX_BRANCH_WORKERS = 10 # Wie viele Threads für parallele Bewertung? - # Limit für gleichzeitige OpenAI Calls (wichtig wegen Rate Limits!) - OPENAI_CONCURRENCY_LIMIT = 5 + # Konfiguration für Parallelisierung + MAX_BRANCH_WORKERS = Config.MAX_BRANCH_WORKERS + OPENAI_CONCURRENCY_LIMIT = Config.OPENAI_CONCURRENCY_LIMIT # Hole aus Config openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT) - # Batch-Größe für das Sammeln von Aufgaben, bevor der ThreadPool gestartet wird - PROCESSING_BRANCH_BATCH_SIZE = Config.PROCESSING_BATCH_SIZE # z.B. 20 aus Config nehmen - # Batch-Größe für das Senden von Sheet-Updates - update_batch_row_limit = Config.UPDATE_BATCH_ROW_LIMIT # z.B. 50 aus Config + PROCESSING_BRANCH_BATCH_SIZE = Config.PROCESSING_BATCH_SIZE - # --- Worker Funktion --- + # Worker Funktion (unverändert) def evaluate_branch_task(task_data): row_num = task_data['row_num'] result = {"branch": "k.A. (Fehler Task)", "consistency": "error", "justification": "Fehler in Worker-Task"} error = None try: - # Limitiere OpenAI Calls mit Semaphore with openai_semaphore_branch: - # debug_print(f"Task Zeile {row_num}: Starte evaluate_branche_chatgpt...") # Kann sehr laut sein - # Füge kleine Pause hinzu, um Limits zu respektieren - time.sleep(0.3) # Kleine Pause vor jedem Call + time.sleep(0.3) result = evaluate_branche_chatgpt( - task_data['crm_branche'], - task_data['beschreibung'], - task_data['wiki_branche'], - task_data['wiki_kategorien'], - task_data['website_summary'] + task_data['crm_branche'], task_data['beschreibung'], task_data['wiki_branche'], + task_data['wiki_kategorien'], task_data['website_summary'] ) - # debug_print(f"Task Zeile {row_num}: evaluate_branche_chatgpt beendet.") except Exception as e: - error = f"Fehler bei Branchenevaluation Zeile {row_num}: {e}" - debug_print(error) - # Füge Fehlerinfo zum Ergebnis hinzu, falls evaluate_branche_chatgpt selbst keinen Fehler liefert - result['justification'] = error[:500] # Schreibe Fehler in Begründung (gekürzt) - result['consistency'] = 'error_task' + error = f"Fehler Brancheneval Zeile {row_num}: {e}" + debug_print(error); result['justification'] = error[:500]; result['consistency'] = 'error_task' return {"row_num": row_num, "result": result, "error": error} - # --- Hauptverarbeitung --- + # Hauptverarbeitung tasks_for_processing_batch = [] - all_sheet_updates = [] total_processed_count = 0 total_skipped_count = 0 total_error_count = 0 - # Stelle sicher, dass Branchenschema geladen ist if not ALLOWED_TARGET_BRANCHES: load_target_schema(); - if not ALLOWED_TARGET_BRANCHES: return debug_print("FEHLER: Ziel-Schema nicht geladen. Abbruch Branch-Batch.") + if not ALLOWED_TARGET_BRANCHES: return debug_print("FEHLER: Ziel-Schema nicht geladen.") for i in range(start_row_index_in_sheet, end_row_index_in_sheet + 1): row_index_in_list = i - 1 @@ -2590,7 +2573,7 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ total_skipped_count += 1 continue - # Daten für den Task sammeln + # Task sammeln task_data = { "row_num": i, "crm_branche": row[branche_crm_idx] if len(row) > branche_crm_idx else "", @@ -2601,7 +2584,7 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ } tasks_for_processing_batch.append(task_data) - # --- Verarbeitungs-Batch ausführen, wenn voll oder letzte Zeile --- + # --- Verarbeitungs-Batch ausführen --- if len(tasks_for_processing_batch) >= PROCESSING_BRANCH_BATCH_SIZE or i == end_row_index_in_sheet: if tasks_for_processing_batch: batch_start_row = tasks_for_processing_batch[0]['row_num'] @@ -2609,8 +2592,7 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ batch_task_count = len(tasks_for_processing_batch) debug_print(f"\n--- Starte Branch-Evaluation Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") - # --- Parallele Ausführung --- - results_list = [] # Liste der Ergebnis-Dicts + results_list = [] batch_error_count = 0 debug_print(f" Evaluiere {batch_task_count} Zeilen parallel (max {MAX_BRANCH_WORKERS} worker, {OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)...") with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor: @@ -2619,13 +2601,10 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ task = future_to_task[future] try: result_data = future.result() - results_list.append(result_data) # Füge komplettes Ergebnis hinzu + results_list.append(result_data) if result_data['error']: batch_error_count += 1; total_error_count +=1 except Exception as exc: - row_num = task['row_num'] - err_msg = f"Generischer Fehler Branch Task Zeile {row_num}: {exc}" - debug_print(err_msg) - # Füge Fehler-Ergebnis hinzu + row_num = task['row_num']; err_msg = f"Generischer Fehler Branch Task Zeile {row_num}: {exc}"; debug_print(err_msg) results_list.append({"row_num": row_num, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg[:500]}, "error": err_msg}) batch_error_count += 1; total_error_count +=1 @@ -2633,11 +2612,11 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ total_processed_count += current_batch_processed_count debug_print(f" Branch-Evaluation für Batch beendet. {current_batch_processed_count} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") - # --- Sheet Updates vorbereiten --- + # Sheet Updates vorbereiten FÜR DIESEN BATCH if results_list: current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") current_version = Config.VERSION - batch_sheet_updates = [] + batch_sheet_updates = [] # Nur Updates für diesen Batch for res_data in results_list: row_num = res_data['row_num'] result = res_data['result'] @@ -2649,22 +2628,24 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ {'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]} # AP Version ] batch_sheet_updates.extend(row_updates) - all_sheet_updates.extend(batch_sheet_updates) # Sammle für größeren Batch-Update + + # Sende Updates für DIESEN Batch SOFORT + if batch_sheet_updates: + debug_print(f" Sende Sheet-Update für {len(results_list)} Zeilen ({len(batch_sheet_updates)} Zellen)...") + success = sheet_handler.batch_update_cells(batch_sheet_updates) + if success: debug_print(f" Sheet-Update für Batch {batch_start_row}-{batch_end_row} erfolgreich.") + else: debug_print(f" FEHLER beim Sheet-Update für Batch {batch_start_row}-{batch_end_row}.") + else: + debug_print(f" Keine Sheet-Updates für Batch {batch_start_row}-{batch_end_row} vorbereitet.") tasks_for_processing_batch = [] # Batch leeren + debug_print(f"--- Verarbeitungs-Batch {batch_start_row}-{batch_end_row} abgeschlossen ---") + # Kurze Pause NACHDEM ein Batch komplett verarbeitet und geschrieben wurde + time.sleep(1) # Kurze Pause von 1 Sekunde - # --- Sheet Updates senden (wenn update_batch_row_limit erreicht) --- - if len(all_sheet_updates) >= update_batch_row_limit * 5: # *5 Updates pro Zeile - debug_print(f" Sende gesammelte Sheet-Updates ({len(all_sheet_updates)} Zellen)...") - success = sheet_handler.batch_update_cells(all_sheet_updates) - if success: debug_print(f" Sheet-Update bis Zeile {batch_end_row} erfolgreich.") - else: debug_print(f" FEHLER beim Sheet-Update bis Zeile {batch_end_row}.") - all_sheet_updates = [] + # --- time.sleep(Config.RETRY_DELAY) wurde HIER entfernt --- - # --- Finale Sheet Updates senden --- - if all_sheet_updates: - debug_print(f"Sende finale Sheet-Updates ({len(all_sheet_updates)} Zellen)...") - sheet_handler.batch_update_cells(all_sheet_updates) + # Kein finales Update senden mehr nötig, da nach jedem Batch gesendet wird debug_print(f"Brancheneinschätzung (Parallel Batch) abgeschlossen. {total_processed_count} Zeilen verarbeitet (inkl. Fehler), {total_error_count} Fehler, {total_skipped_count} Zeilen wg. Timestamp übersprungen.")