This commit is contained in:
2025-05-11 20:18:02 +00:00
parent 1677ab058f
commit e515ce0fc2

View File

@@ -6559,31 +6559,37 @@ class DataProcessor:
# Sheet Updates vorbereiten FÜR DIESEN BATCH.
# Dies geschieht jetzt nach der parallelen Verarbeitung.
if results_list:
# Aktueller Zeitstempel und Version fuer die Updates
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut
batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen
current_version = getattr(Config, 'VERSION', 'unknown')
batch_sheet_updates = []
# Sortiere Ergebnisse nach Zeilennummer fuer geordnetes Schreiben (optional, aber gut)
results_list.sort(key=lambda x: x['row_num'])
# Iteriere ueber die Ergebnisse dieses Batches
for res_data in results_list:
row_num = res_data['row_num'] # 1-basierte Zeilennummer
result = res_data['result'] # Das Ergebnis-Dictionary von evaluate_branch_task
for res_data in results_list: # <--- HIER IST DIE SCHLEIFE, DIE SIE IM SCREENSHOT ZEIGEN
row_num = res_data['row_num']
result = res_data['result']
self.logger.debug(f" Zeile {row_num} (aus Batch): Ergebnis aus evaluate_branch_task: {result}") # DEBUG
# Logge das individuelle Ergebnis VOR dem Update
# self.logger.debug(f" Zeile {row_num}: Ergebnis -> Branch='{result.get('branch')}', Consistency='{result.get('consistency')}', Justification='{result.get('justification', '')[:50]}...'") # Zu viel Laerm (gekuerzt)
# Sammle Updates fuer W, X, Y, AO, AP (nutzt interne Helfer _get_col_letter Block 14)
# Stellen Sie sicher, dass die Schluessel im Ergebnis-Dict vorhanden sind, Fallback auf Standard-Fehlerwerte.
batch_sheet_updates.append({'range': f'{branch_w_letter}{row_num}', 'values': [[result.get("branch", "FEHLER")]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{branch_x_letter}{row_num}', 'values': [[result.get("consistency", "error")]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{branch_y_letter}{row_num}', 'values': [[result.get("justification", "Keine Begruendung")]]}) # Block 1 Column Map
# Setze AO Timestamp fuer diese Zeile
batch_sheet_updates.append({'range': f'{ts_ao_letter}{row_num}', 'values': [[current_timestamp]]}) # Block 1 Column Map
# Setze AP Version fuer diese Zeile
batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map
# Chat Vorschlag Branche (AH)
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num}',
'values': [[result.get("branch", "FEHLER BRANCH")]]})
# Chat Branche Konfidenz (AI) - NEU
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num}',
'values': [[result.get("confidence", "N/A CONF")]]}) # <<< HIER ANPASSEN
# Chat Konsistenz Branche (AJ)
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num}',
'values': [[result.get("consistency", "error CONS")]]})
# Chat Begruendung Abweichung Branche (AK)
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num}',
'values': [[result.get("justification", "Keine Begr. JUST")]]})
# Timestamp letzte Pruefung (BC)
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num}',
'values': [[current_timestamp]]})
# Version (BD)
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num}',
'values': [[current_version]]})
# --- Sende Updates fuer DIESEN BATCH SOFORT ---
@@ -6614,59 +6620,35 @@ class DataProcessor:
# --- Verarbeitung des letzten unvollstaendigen Batches nach der Schleife ---
# Wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind.
if tasks_for_processing_batch:
# Logge den Start des finalen Batches
batch_start_row = tasks_for_processing_batch[0]['row_num']
batch_end_row = tasks_for_processing_batch[-1]['row_num']
self.logger.debug(f"\n--- Starte FINALEN Branch-Evaluation Batch ({len(tasks_for_processing_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
# ... (Logik für den letzten Batch mit ThreadPoolExecutor) ...
self.logger.debug(f" FINALER Branch Batch beendet. {len(results_list)} Ergebnisse erhalten ({batch_error_count} Fehler).")
# Sammle Sheet Updates (W, X, Y, AO, AP) fuer diesen finalen Batch.
if results_list:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = getattr(Config, 'VERSION', 'unknown')
batch_sheet_updates = []
results_list.sort(key=lambda x: x['row_num'])
for res_data in results_list: # <--- HIER IST DIE ZWEITE SCHLEIFE
row_num = res_data['row_num']
result = res_data['result']
self.logger.debug(f" FINALER Batch Zeile {row_num}: Ergebnis aus evaluate_branch_task: {result}") # DEBUG
results_list = [] # Liste zum Speichern der Ergebnisse fuer diesen finalen Batch
batch_error_count = 0 # Fehlerzaehler
self.logger.debug(f" Evaluiere {len(tasks_for_processing_batch)} Zeilen parallel (max {MAX_BRANCH_WORKERS} worker, {OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)...") # <<< GEÄNDERT
# Erstellen Sie die Semaphore Instanz fuer den finalen Batch.
openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor:
# Map tasks to futures. Ruft die INTERNE Worker-Funktion auf und uebergibt die Semaphore.
future_to_task = {executor.submit(self.evaluate_branch_task, task, openai_semaphore_branch): task for task in tasks_for_processing_batch}
# Verarbeite die Ergebnisse
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future] # Holen Sie die urspruenglichen Task-Daten
try:
result_data = future.result() # Holen Sie das Ergebnis
results_list.append(result_data) # Fuege das Ergebnis zur Liste hinzu
# Pruefe, ob der Worker einen Fehler gemeldet hat
if result_data.get('error'): batch_error_count += 1
except Exception as exc:
# Faengt unerwartete Fehler bei der Ergebnisabfrage ab
row_num = task['row_num']
err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage Branch Task Zeile {row_num}: {type(exc).__name__} - {exc}"
self.logger.error(err_msg) # <<< GEÄNDERT
# Setze einen Standard-Fehler-Ergebniswert
results_list.append({"row_num": row_num, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg[:500]}, "error": err_msg}) # Kuerze Begruendung
batch_error_count += 1
self.logger.debug(f" FINALER Branch Batch beendet. {len(results_list)} Ergebnisse erhalten ({batch_error_count} Fehler).") # <<< GEÄNDERT
# Sammle Sheet Updates (W, X, Y, AO, AP) fuer diesen finalen Batch.
if results_list:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut
batch_sheet_updates = [] # Updates fuer diesen spezifischen Batch
results_list.sort(key=lambda x: x['row_num']) # Sortiere Ergebnisse nach Zeilennummer
for res_data in results_list:
row_num = res_data['row_num']
result = res_data['result'] # Ergebnis-Dictionary
# Fuege Updates fuer W, X, Y, AO, AP hinzu (nutzt interne Helfer)
batch_sheet_updates.append({'range': f'{branch_w_letter}{row_num}', 'values': [[result.get("branch", "FEHLER")]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{branch_x_letter}{row_num}', 'values': [[result.get("consistency", "error")]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{branch_y_letter}{row_num}', 'values': [[result.get("justification", "Keine Begruendung")]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{ts_ao_letter}{row_num}', 'values': [[current_timestamp]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map
# Sammle Updates fuer AH, AI, AJ, AK, BC, BD
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num}',
'values': [[result.get("branch", "FEHLER BRANCH")]]})
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num}',
'values': [[result.get("confidence", "N/A CONF")]]}) # <<< HIER ANPASSEN
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num}',
'values': [[result.get("consistency", "error CONS")]]})
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num}',
'values': [[result.get("justification", "Keine Begr. JUST")]]})
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num}',
'values': [[current_timestamp]]})
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num}',
'values': [[current_version]]})
# Fuege diese Updates zur globalen Liste hinzu (wird dann nur noch einmal gesendet)
# all_sheet_updates.extend(batch_sheet_updates) # Nicht hier sammeln, direkt senden