diff --git a/brancheneinstufung.py b/brancheneinstufung.py index 660d33ab..065dbcb2 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -2505,21 +2505,14 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ if not all_data or len(all_data) <= 5: return header_rows = 5 - # Indizes und Buchstaben holen - timestamp_col_key = "Timestamp letzte Prüfung" - timestamp_col_index = COLUMN_MAP.get(timestamp_col_key) - branche_crm_idx = COLUMN_MAP.get("CRM Branche") - beschreibung_idx = COLUMN_MAP.get("CRM Beschreibung") - branche_wiki_idx = COLUMN_MAP.get("Wiki Branche") - kategorien_wiki_idx = COLUMN_MAP.get("Wiki Kategorien") - summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung") - version_col_idx = COLUMN_MAP.get("Version") - branch_w_idx = COLUMN_MAP.get("Chat Vorschlag Branche") - branch_x_idx = COLUMN_MAP.get("Chat Konsistenz Branche") + # --- Indizes und Buchstaben --- + timestamp_col_key = "Timestamp letzte Prüfung"; timestamp_col_index = COLUMN_MAP.get(timestamp_col_key) + branche_crm_idx = COLUMN_MAP.get("CRM Branche"); beschreibung_idx = COLUMN_MAP.get("CRM Beschreibung") + branche_wiki_idx = COLUMN_MAP.get("Wiki Branche"); kategorien_wiki_idx = COLUMN_MAP.get("Wiki Kategorien") + summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung"); version_col_idx = COLUMN_MAP.get("Version") + branch_w_idx = COLUMN_MAP.get("Chat Vorschlag Branche"); branch_x_idx = COLUMN_MAP.get("Chat Konsistenz Branche") branch_y_idx = COLUMN_MAP.get("Chat Begründung Abweichung Branche") - required_indices = [timestamp_col_index, branche_crm_idx, beschreibung_idx, branche_wiki_idx, - kategorien_wiki_idx, summary_web_idx, version_col_index, branch_w_idx, - branch_x_idx, branch_y_idx] + required_indices = [timestamp_col_index, branche_crm_idx, beschreibung_idx, branche_wiki_idx, kategorien_wiki_idx, summary_web_idx, version_col_idx, branch_w_idx, branch_x_idx, branch_y_idx] if None in required_indices: return debug_print(f"FEHLER: Indizes fehlen.") ts_col_letter = sheet_handler._get_col_letter(timestamp_col_index + 1) version_col_letter = sheet_handler._get_col_letter(version_col_idx + 1) @@ -2527,34 +2520,30 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ branch_x_letter = sheet_handler._get_col_letter(branch_x_idx + 1) branch_y_letter = sheet_handler._get_col_letter(branch_y_idx + 1) - # Konfiguration für Parallelisierung + # --- Konfiguration --- MAX_BRANCH_WORKERS = Config.MAX_BRANCH_WORKERS - OPENAI_CONCURRENCY_LIMIT = Config.OPENAI_CONCURRENCY_LIMIT # Hole aus Config + OPENAI_CONCURRENCY_LIMIT = Config.OPENAI_CONCURRENCY_LIMIT openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT) - PROCESSING_BRANCH_BATCH_SIZE = Config.PROCESSING_BATCH_SIZE + PROCESSING_BRANCH_BATCH_SIZE = Config.PROCESSING_BRANCH_BATCH_SIZE + update_batch_row_limit = Config.UPDATE_BATCH_ROW_LIMIT # Wird derzeit nicht verwendet, da wir pro Batch senden - # Worker Funktion (unverändert) + # --- Worker Funktion --- def evaluate_branch_task(task_data): - row_num = task_data['row_num'] - result = {"branch": "k.A. (Fehler Task)", "consistency": "error", "justification": "Fehler in Worker-Task"} - error = None + row_num = task_data['row_num']; result = {"branch": "k.A. (Fehler Task)", "consistency": "error", "justification": "Fehler in Worker-Task"}; error = None try: with openai_semaphore_branch: - time.sleep(0.3) - result = evaluate_branche_chatgpt( - task_data['crm_branche'], task_data['beschreibung'], task_data['wiki_branche'], - task_data['wiki_kategorien'], task_data['website_summary'] - ) + # debug_print(f" Task {row_num}: Warte auf Semaphore...") # Sehr detailliertes Logging + # time.sleep(0.1) # Minimale Pause reduziert manchmal Race Conditions bei hoher Last + # debug_print(f" Task {row_num}: Semaphore erhalten, starte evaluate_branche_chatgpt...") + result = evaluate_branche_chatgpt( task_data['crm_branche'], task_data['beschreibung'], task_data['wiki_branche'], task_data['wiki_kategorien'], task_data['website_summary']) + # debug_print(f" Task {row_num}: evaluate_branche_chatgpt beendet.") except Exception as e: - error = f"Fehler Brancheneval Zeile {row_num}: {e}" - debug_print(error); result['justification'] = error[:500]; result['consistency'] = 'error_task' + error = f"Fehler bei Branchenevaluation Zeile {row_num}: {e}"; debug_print(error); result['justification'] = error[:500]; result['consistency'] = 'error_task' return {"row_num": row_num, "result": result, "error": error} - # Hauptverarbeitung + # --- Hauptverarbeitung --- tasks_for_processing_batch = [] - total_processed_count = 0 - total_skipped_count = 0 - total_error_count = 0 + total_processed_count = 0; total_skipped_count = 0; total_error_count = 0 if not ALLOWED_TARGET_BRANCHES: load_target_schema(); if not ALLOWED_TARGET_BRANCHES: return debug_print("FEHLER: Ziel-Schema nicht geladen.") @@ -2566,48 +2555,37 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ # Timestamp-Prüfung (AO) should_skip = False - if len(row) > timestamp_col_index and str(row[timestamp_col_index]).strip(): - should_skip = True - - if should_skip: - total_skipped_count += 1 - continue + if len(row) > timestamp_col_index and str(row[timestamp_col_index]).strip(): should_skip = True + if should_skip: total_skipped_count += 1; continue # Task sammeln - task_data = { - "row_num": i, - "crm_branche": row[branche_crm_idx] if len(row) > branche_crm_idx else "", - "beschreibung": row[beschreibung_idx] if len(row) > beschreibung_idx else "", - "wiki_branche": row[branche_wiki_idx] if len(row) > branche_wiki_idx else "", - "wiki_kategorien": row[kategorien_wiki_idx] if len(row) > kategorien_wiki_idx else "", - "website_summary": row[summary_web_idx] if len(row) > summary_web_idx else "" - } + task_data = { "row_num": i, "crm_branche": row[branche_crm_idx] if len(row) > branche_crm_idx else "", "beschreibung": row[beschreibung_idx] if len(row) > beschreibung_idx else "", "wiki_branche": row[branche_wiki_idx] if len(row) > branche_wiki_idx else "", "wiki_kategorien": row[kategorien_wiki_idx] if len(row) > kategorien_wiki_idx else "", "website_summary": row[summary_web_idx] if len(row) > summary_web_idx else ""} tasks_for_processing_batch.append(task_data) # --- Verarbeitungs-Batch ausführen --- if len(tasks_for_processing_batch) >= PROCESSING_BRANCH_BATCH_SIZE or i == end_row_index_in_sheet: if tasks_for_processing_batch: - batch_start_row = tasks_for_processing_batch[0]['row_num'] - batch_end_row = tasks_for_processing_batch[-1]['row_num'] + batch_start_row = tasks_for_processing_batch[0]['row_num']; batch_end_row = tasks_for_processing_batch[-1]['row_num'] batch_task_count = len(tasks_for_processing_batch) debug_print(f"\n--- Starte Branch-Evaluation Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") - results_list = [] - batch_error_count = 0 + results_list = []; batch_error_count = 0 debug_print(f" Evaluiere {batch_task_count} Zeilen parallel (max {MAX_BRANCH_WORKERS} worker, {OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)...") + # *** BEGINN PARALLELE VERARBEITUNG *** with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor: future_to_task = {executor.submit(evaluate_branch_task, task): task for task in tasks_for_processing_batch} + # Warte auf Ergebnisse und sammle sie for future in concurrent.futures.as_completed(future_to_task): task = future_to_task[future] - try: - result_data = future.result() - results_list.append(result_data) - if result_data['error']: batch_error_count += 1; total_error_count +=1 + try: result_data = future.result(); results_list.append(result_data); except Exception as exc: row_num = task['row_num']; err_msg = f"Generischer Fehler Branch Task Zeile {row_num}: {exc}"; debug_print(err_msg) results_list.append({"row_num": row_num, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg[:500]}, "error": err_msg}) batch_error_count += 1; total_error_count +=1 + # Zähle Fehler aus dem Ergebnis-Dict + if results_list[-1]['error']: batch_error_count += 1; total_error_count +=1 + # *** ENDE PARALLELE VERARBEITUNG *** current_batch_processed_count = len(results_list) total_processed_count += current_batch_processed_count debug_print(f" Branch-Evaluation für Batch beendet. {current_batch_processed_count} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") @@ -2616,16 +2594,19 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ if results_list: current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") current_version = Config.VERSION - batch_sheet_updates = [] # Nur Updates für diesen Batch + batch_sheet_updates = [] + # Sortiere Ergebnisse nach Zeilennummer für geordnetes Schreiben (optional) + results_list.sort(key=lambda x: x['row_num']) for res_data in results_list: - row_num = res_data['row_num'] - result = res_data['result'] + row_num = res_data['row_num']; result = res_data['result'] + # Logge das individuelle Ergebnis VOR dem Update + debug_print(f" Zeile {row_num}: Ergebnis -> Branch='{result.get('branch')}', Consistency='{result.get('consistency')}', Justification='{result.get('justification', '')[:50]}...'") row_updates = [ {'range': f'{branch_w_letter}{row_num}', 'values': [[result.get("branch", "Fehler")]]}, {'range': f'{branch_x_letter}{row_num}', 'values': [[result.get("consistency", "Fehler")]]}, {'range': f'{branch_y_letter}{row_num}', 'values': [[result.get("justification", "Fehler")]]}, - {'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]}, # AO Timestamp - {'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]} # AP Version + {'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]}, + {'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]} ] batch_sheet_updates.extend(row_updates) @@ -2635,17 +2616,16 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ success = sheet_handler.batch_update_cells(batch_sheet_updates) if success: debug_print(f" Sheet-Update für Batch {batch_start_row}-{batch_end_row} erfolgreich.") else: debug_print(f" FEHLER beim Sheet-Update für Batch {batch_start_row}-{batch_end_row}.") - else: - debug_print(f" Keine Sheet-Updates für Batch {batch_start_row}-{batch_end_row} vorbereitet.") + else: debug_print(f" Keine Sheet-Updates für Batch {batch_start_row}-{batch_end_row} vorbereitet.") tasks_for_processing_batch = [] # Batch leeren debug_print(f"--- Verarbeitungs-Batch {batch_start_row}-{batch_end_row} abgeschlossen ---") # Kurze Pause NACHDEM ein Batch komplett verarbeitet und geschrieben wurde - time.sleep(1) # Kurze Pause von 1 Sekunde + # debug_print(" Warte 1 Sekunde...") # Test-Log + time.sleep(1) - # --- time.sleep(Config.RETRY_DELAY) wurde HIER entfernt --- - - # Kein finales Update senden mehr nötig, da nach jedem Batch gesendet wird + # !!! HIER DARF KEIN SLEEP STEHEN !!! + # time.sleep(Config.RETRY_DELAY) # <<< DIESE ZEILE MUSS WEG SEIN in deinem Code! debug_print(f"Brancheneinschätzung (Parallel Batch) abgeschlossen. {total_processed_count} Zeilen verarbeitet (inkl. Fehler), {total_error_count} Fehler, {total_skipped_count} Zeilen wg. Timestamp übersprungen.")