diff --git a/brancheneinstufung.py b/brancheneinstufung.py index 05f00873..d87a80cf 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -2494,48 +2494,87 @@ def process_website_summarization_batch(sheet_handler, start_row_index_in_sheet, # Komplette Funktion process_branch_batch (MIT Korrektur und Prüfung auf AO) def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_in_sheet): """ - Batch-Prozess für Brancheneinschätzung. Lädt Daten neu, prüft Timestamp AO, - liest vorhandene Zusammenfassung (AS) und setzt AO + AP für bearbeitete Zeilen. + Batch-Prozess für Brancheneinschätzung mit paralleler Verarbeitung via Threads. + Prüft Timestamp AO, führt evaluate_branche_chatgpt parallel aus (limitiert), + setzt W, X, Y, AO + AP und sendet Sheet-Updates gebündelt. """ - debug_print(f"Starte Brancheneinschätzung (Batch) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...") + debug_print(f"Starte Brancheneinschätzung (Parallel Batch) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...") + # --- Lade Daten --- if not sheet_handler.load_data(): return all_data = sheet_handler.get_all_data_with_headers() if not all_data or len(all_data) <= 5: return - sheet = sheet_handler.sheet + header_rows = 5 - # Indizes holen + # --- Indizes und Buchstaben --- timestamp_col_key = "Timestamp letzte Prüfung" timestamp_col_index = COLUMN_MAP.get(timestamp_col_key) branche_crm_idx = COLUMN_MAP.get("CRM Branche") beschreibung_idx = COLUMN_MAP.get("CRM Beschreibung") branche_wiki_idx = COLUMN_MAP.get("Wiki Branche") kategorien_wiki_idx = COLUMN_MAP.get("Wiki Kategorien") - summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung") # Index für AS + summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung") version_col_idx = COLUMN_MAP.get("Version") branch_w_idx = COLUMN_MAP.get("Chat Vorschlag Branche") branch_x_idx = COLUMN_MAP.get("Chat Konsistenz Branche") branch_y_idx = COLUMN_MAP.get("Chat Begründung Abweichung Branche") - required_indices = [timestamp_col_index, branche_crm_idx, beschreibung_idx, branche_wiki_idx, kategorien_wiki_idx, summary_web_idx, version_col_index, branch_w_idx, branch_x_idx, branch_y_idx] - if None in required_indices: - debug_print(f"FEHLER: Mindestens ein benötigter Spaltenindex für process_branch_batch fehlt in COLUMN_MAP.") - return - + if None in required_indices: return debug_print(f"FEHLER: Indizes für process_branch_batch fehlen.") ts_col_letter = sheet_handler._get_col_letter(timestamp_col_index + 1) version_col_letter = sheet_handler._get_col_letter(version_col_index + 1) branch_w_letter = sheet_handler._get_col_letter(branch_w_idx + 1) branch_x_letter = sheet_handler._get_col_letter(branch_x_idx + 1) branch_y_letter = sheet_handler._get_col_letter(branch_y_idx + 1) - processed_count = 0 - skipped_count = 0 + # --- Konfiguration für Parallelisierung --- + MAX_BRANCH_WORKERS = 10 # Wie viele Threads für parallele Bewertung? + # Limit für gleichzeitige OpenAI Calls (wichtig wegen Rate Limits!) + OPENAI_CONCURRENCY_LIMIT = 5 + openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT) + # Batch-Größe für das Sammeln von Aufgaben, bevor der ThreadPool gestartet wird + PROCESSING_BRANCH_BATCH_SIZE = Config.PROCESSING_BATCH_SIZE # z.B. 20 aus Config nehmen + # Batch-Größe für das Senden von Sheet-Updates + update_batch_row_limit = Config.UPDATE_BATCH_ROW_LIMIT # z.B. 50 aus Config - if not ALLOWED_TARGET_BRANCHES: - load_target_schema() - if not ALLOWED_TARGET_BRANCHES: return + # --- Worker Funktion --- + def evaluate_branch_task(task_data): + row_num = task_data['row_num'] + result = {"branch": "k.A. (Fehler Task)", "consistency": "error", "justification": "Fehler in Worker-Task"} + error = None + try: + # Limitiere OpenAI Calls mit Semaphore + with openai_semaphore_branch: + # debug_print(f"Task Zeile {row_num}: Starte evaluate_branche_chatgpt...") # Kann sehr laut sein + # Füge kleine Pause hinzu, um Limits zu respektieren + time.sleep(0.3) # Kleine Pause vor jedem Call + result = evaluate_branche_chatgpt( + task_data['crm_branche'], + task_data['beschreibung'], + task_data['wiki_branche'], + task_data['wiki_kategorien'], + task_data['website_summary'] + ) + # debug_print(f"Task Zeile {row_num}: evaluate_branche_chatgpt beendet.") + except Exception as e: + error = f"Fehler bei Branchenevaluation Zeile {row_num}: {e}" + debug_print(error) + # Füge Fehlerinfo zum Ergebnis hinzu, falls evaluate_branche_chatgpt selbst keinen Fehler liefert + result['justification'] = error[:500] # Schreibe Fehler in Begründung (gekürzt) + result['consistency'] = 'error_task' + return {"row_num": row_num, "result": result, "error": error} + + # --- Hauptverarbeitung --- + tasks_for_processing_batch = [] + all_sheet_updates = [] + total_processed_count = 0 + total_skipped_count = 0 + total_error_count = 0 + + # Stelle sicher, dass Branchenschema geladen ist + if not ALLOWED_TARGET_BRANCHES: load_target_schema(); + if not ALLOWED_TARGET_BRANCHES: return debug_print("FEHLER: Ziel-Schema nicht geladen. Abbruch Branch-Batch.") for i in range(start_row_index_in_sheet, end_row_index_in_sheet + 1): row_index_in_list = i - 1 @@ -2548,37 +2587,86 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_ should_skip = True if should_skip: - skipped_count += 1 + total_skipped_count += 1 continue - # Daten holen (inkl. AS, das von einem anderen Prozess geschrieben wurde) - crm_branche = row[branche_crm_idx] if len(row) > branche_crm_idx else "" - beschreibung = row[beschreibung_idx] if len(row) > beschreibung_idx else "" - wiki_branche = row[branche_wiki_idx] if len(row) > branche_wiki_idx else "" - wiki_kategorien = row[kategorien_wiki_idx] if len(row) > kategorien_wiki_idx else "" - website_summary = row[summary_web_idx] if len(row) > summary_web_idx else "k.A." # Lese AS + # Daten für den Task sammeln + task_data = { + "row_num": i, + "crm_branche": row[branche_crm_idx] if len(row) > branche_crm_idx else "", + "beschreibung": row[beschreibung_idx] if len(row) > beschreibung_idx else "", + "wiki_branche": row[branche_wiki_idx] if len(row) > branche_wiki_idx else "", + "wiki_kategorien": row[kategorien_wiki_idx] if len(row) > kategorien_wiki_idx else "", + "website_summary": row[summary_web_idx] if len(row) > summary_web_idx else "" + } + tasks_for_processing_batch.append(task_data) - result = evaluate_branche_chatgpt(crm_branche, beschreibung, wiki_branche, wiki_kategorien, website_summary) - processed_count += 1 + # --- Verarbeitungs-Batch ausführen, wenn voll oder letzte Zeile --- + if len(tasks_for_processing_batch) >= PROCESSING_BRANCH_BATCH_SIZE or i == end_row_index_in_sheet: + if tasks_for_processing_batch: + batch_start_row = tasks_for_processing_batch[0]['row_num'] + batch_end_row = tasks_for_processing_batch[-1]['row_num'] + batch_task_count = len(tasks_for_processing_batch) + debug_print(f"\n--- Starte Branch-Evaluation Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") - updates = [] - current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - current_version = Config.VERSION + # --- Parallele Ausführung --- + results_list = [] # Liste der Ergebnis-Dicts + batch_error_count = 0 + debug_print(f" Evaluiere {batch_task_count} Zeilen parallel (max {MAX_BRANCH_WORKERS} worker, {OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)...") + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor: + future_to_task = {executor.submit(evaluate_branch_task, task): task for task in tasks_for_processing_batch} + for future in concurrent.futures.as_completed(future_to_task): + task = future_to_task[future] + try: + result_data = future.result() + results_list.append(result_data) # Füge komplettes Ergebnis hinzu + if result_data['error']: batch_error_count += 1; total_error_count +=1 + except Exception as exc: + row_num = task['row_num'] + err_msg = f"Generischer Fehler Branch Task Zeile {row_num}: {exc}" + debug_print(err_msg) + # Füge Fehler-Ergebnis hinzu + results_list.append({"row_num": row_num, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg[:500]}, "error": err_msg}) + batch_error_count += 1; total_error_count +=1 - updates.append({'range': f'{branch_w_letter}{i}', 'values': [[result.get("branch", "Fehler")]]}) - updates.append({'range': f'{branch_x_letter}{i}', 'values': [[result.get("consistency", "Fehler")]]}) - updates.append({'range': f'{branch_y_letter}{i}', 'values': [[result.get("justification", "Fehler")]]}) - updates.append({'range': f'{ts_col_letter}{i}', 'values': [[current_timestamp]]}) # AO Timestamp - updates.append({'range': f'{version_col_letter}{i}', 'values': [[current_version]]}) # AP Version + current_batch_processed_count = len(results_list) + total_processed_count += current_batch_processed_count + debug_print(f" Branch-Evaluation für Batch beendet. {current_batch_processed_count} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") - if updates: - success = sheet_handler.batch_update_cells(updates) - if success: debug_print(f"Zeile {i}: Branch-Einschätzung erfolgreich aktualisiert.") - else: debug_print(f"FEHLER beim Schreiben der Branch-Updates für Zeile {i}.") + # --- Sheet Updates vorbereiten --- + if results_list: + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_version = Config.VERSION + batch_sheet_updates = [] + for res_data in results_list: + row_num = res_data['row_num'] + result = res_data['result'] + row_updates = [ + {'range': f'{branch_w_letter}{row_num}', 'values': [[result.get("branch", "Fehler")]]}, + {'range': f'{branch_x_letter}{row_num}', 'values': [[result.get("consistency", "Fehler")]]}, + {'range': f'{branch_y_letter}{row_num}', 'values': [[result.get("justification", "Fehler")]]}, + {'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]}, # AO Timestamp + {'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]} # AP Version + ] + batch_sheet_updates.extend(row_updates) + all_sheet_updates.extend(batch_sheet_updates) # Sammle für größeren Batch-Update - time.sleep(Config.RETRY_DELAY) + tasks_for_processing_batch = [] # Batch leeren - debug_print(f"Brancheneinschätzung (Batch) abgeschlossen. {processed_count} Zeilen eingeschätzt, {skipped_count} Zeilen wg. Timestamp übersprungen.") + # --- Sheet Updates senden (wenn update_batch_row_limit erreicht) --- + if len(all_sheet_updates) >= update_batch_row_limit * 5: # *5 Updates pro Zeile + debug_print(f" Sende gesammelte Sheet-Updates ({len(all_sheet_updates)} Zellen)...") + success = sheet_handler.batch_update_cells(all_sheet_updates) + if success: debug_print(f" Sheet-Update bis Zeile {batch_end_row} erfolgreich.") + else: debug_print(f" FEHLER beim Sheet-Update bis Zeile {batch_end_row}.") + all_sheet_updates = [] + + # --- Finale Sheet Updates senden --- + if all_sheet_updates: + debug_print(f"Sende finale Sheet-Updates ({len(all_sheet_updates)} Zellen)...") + sheet_handler.batch_update_cells(all_sheet_updates) + + debug_print(f"Brancheneinschätzung (Parallel Batch) abgeschlossen. {total_processed_count} Zeilen verarbeitet (inkl. Fehler), {total_error_count} Fehler, {total_skipped_count} Zeilen wg. Timestamp übersprungen.") def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_in_sheet):