bugfix
This commit is contained in:
@@ -6302,378 +6302,214 @@ class DataProcessor:
|
||||
# ALLOWED_TARGET_BRANCHES (Block 6).
|
||||
# Nutzt die uebergeordnete sheet_handler Instanz (Block 14).
|
||||
def process_branch_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None):
|
||||
"""
|
||||
Batch-Prozess fuer Brancheneinschaetzung mit paralleler Verarbeitung ueber Threads.
|
||||
Prueft Timestamp AO, fuehrt evaluate_branche_chatgpt parallel aus (limitiert),
|
||||
setzt W, X, Y, AO + AP und sendet Sheet-Updates GEBUENDELT PRO VERARBEITUNGS-BATCH.
|
||||
|
||||
Args:
|
||||
start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AO).
|
||||
end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet).
|
||||
limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt).
|
||||
"""
|
||||
# Verwenden Sie logger, da das Logging jetzt konfiguriert ist
|
||||
# Logge die Konfiguration des Batch-Laufs
|
||||
self.logger.info(f"Starte Brancheneinschaetzung (Parallel Batch W-Y, AO, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT
|
||||
|
||||
self.logger.info(f"Starte Brancheneinschaetzung (Parallel Batch W-Y, AO, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...")
|
||||
|
||||
# --- Daten laden und Startzeile ermitteln ---
|
||||
# Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt
|
||||
if start_sheet_row is None:
|
||||
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren AO...") # <<< GEÄNDERT
|
||||
# Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AO (Block 1 Column Map).
|
||||
# Standardmaessig ab Zeile 7
|
||||
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren AO...")
|
||||
start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key="Timestamp letzte Pruefung", min_sheet_row=7)
|
||||
|
||||
# Wenn get_start_row_index -1 zurueckgibt (Fehler)
|
||||
if start_data_index_no_header == -1:
|
||||
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT
|
||||
return # Beende die Methode
|
||||
|
||||
# Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten Daten-Index
|
||||
start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut
|
||||
self.logger.info(f"Automatisch ermittelte Startzeile (erste leere AO Zelle): {start_sheet_row}") # <<< GEÄNDERT
|
||||
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.")
|
||||
return
|
||||
start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1
|
||||
self.logger.info(f"Automatisch ermittelte Startzeile (erste leere AO Zelle): {start_sheet_row}")
|
||||
else:
|
||||
# Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein.
|
||||
# Der load_data Aufruf ist mit retry_on_failure dekoriert (Block 2).
|
||||
if not self.sheet_handler.load_data():
|
||||
self.logger.error("FEHLER beim Laden der Daten fuer process_branch_batch.") # <<< GEÄNDERT
|
||||
return # Beende die Methode, wenn das Laden fehlschlaegt
|
||||
self.logger.error("FEHLER beim Laden der Daten fuer process_branch_batch.")
|
||||
return
|
||||
|
||||
|
||||
# Holen Sie die gesamte Datenliste (inklusive Header) aus dem SheetHandler.
|
||||
all_data = self.sheet_handler.get_all_data_with_headers()
|
||||
# Annahme: header_rows ist als Attribut im SheetHandler verfuegbar (Block 14).
|
||||
header_rows = self.sheet_handler._header_rows
|
||||
total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet
|
||||
total_sheet_rows = len(all_data)
|
||||
|
||||
|
||||
# Berechne Endzeile, wenn nicht manuell gesetzt
|
||||
if end_sheet_row is None:
|
||||
end_sheet_row = total_sheet_rows # Bis zur letzten Zeile
|
||||
|
||||
# Logge den verarbeitungsbereich
|
||||
self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT
|
||||
|
||||
# Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht ueber Gesamtzeilen)
|
||||
end_sheet_row = total_sheet_rows
|
||||
self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}")
|
||||
if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows:
|
||||
self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT
|
||||
return # Beende die Methode, wenn der Bereich leer ist
|
||||
|
||||
self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.")
|
||||
return
|
||||
|
||||
# --- Indizes und Buchstaben ---
|
||||
# Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP (Block 1) vorhanden sind
|
||||
required_keys = [
|
||||
"Timestamp letzte Pruefung", # AO - Pruefkriterium
|
||||
"CRM Branche", "CRM Beschreibung", "Wiki Branche", "Wiki Kategorien", # Daten fuer Prompt
|
||||
"Website Zusammenfassung", "Version", # Weitere Daten fuer Prompt / Update
|
||||
"Chat Vorschlag Branche", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche" # Ergebnisspalten W, X, Y
|
||||
"Timestamp letzte Pruefung",
|
||||
"CRM Branche", "CRM Beschreibung", "Wiki Branche", "Wiki Kategorien",
|
||||
"Website Zusammenfassung", "Version",
|
||||
"Chat Vorschlag Branche", "Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche"
|
||||
]
|
||||
# Erstellen Sie ein Dictionary mit Schluesseln und Indizes
|
||||
col_indices = {key: COLUMN_MAP.get(key) for key in required_keys}
|
||||
|
||||
# Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden wurden
|
||||
if None in col_indices.values():
|
||||
missing = [k for k, v in col_indices.items() if v is None]
|
||||
self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_branch_batch: {missing}. Breche ab.") # <<< GEÄNDERT
|
||||
return # Beende die Methode bei kritischem Fehler
|
||||
|
||||
# Ermitteln Sie die Spaltenbuchstaben fuer Updates (W, X, Y, AO, AP) (nutzt interne Helfer _get_col_letter Block 14)
|
||||
ts_ao_letter = self.sheet_handler._get_col_letter(col_indices["Timestamp letzte Pruefung"] + 1)
|
||||
version_col_letter = self.sheet_handler._get_col_letter(col_indices["Version"] + 1)
|
||||
branch_w_letter = self.sheet_handler._get_col_letter(col_indices["Chat Vorschlag Branche"] + 1)
|
||||
branch_x_letter = self.sheet_handler._get_col_letter(col_indices["Chat Konsistenz Branche"] + 1)
|
||||
branch_y_letter = self.sheet_handler._get_col_letter(col_indices["Chat Begruendung Abweichung Branche"] + 1)
|
||||
|
||||
self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_branch_batch: {missing}. Breche ab.")
|
||||
return
|
||||
|
||||
# --- Konfiguration fuer Parallelisierung ---
|
||||
# Holen Sie die Konfiguration aus Config (Block 1)
|
||||
MAX_BRANCH_WORKERS = getattr(Config, 'MAX_BRANCH_WORKERS', 10) # Threads fuer parallele Verarbeitung
|
||||
OPENAI_CONCURRENCY_LIMIT = getattr(Config, 'OPENAI_CONCURRENCY_LIMIT', 3) # Max. gleichzeitige OpenAI Calls
|
||||
openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT) # Erstellen Sie die Semaphore Instanz
|
||||
|
||||
# Holen Sie die Batch-Groesse fuer Verarbeitung (Threading) aus Config (Block 1)
|
||||
MAX_BRANCH_WORKERS = getattr(Config, 'MAX_BRANCH_WORKERS', 10)
|
||||
OPENAI_CONCURRENCY_LIMIT = getattr(Config, 'OPENAI_CONCURRENCY_LIMIT', 3)
|
||||
processing_batch_size = getattr(Config, 'PROCESSING_BRANCH_BATCH_SIZE', 20)
|
||||
# Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1)
|
||||
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50) # Wird derzeit nicht verwendet, da wir pro Batch senden
|
||||
# update_batch_row_limit wird hier nicht mehr global für all_sheet_updates verwendet, da pro Batch gesendet wird.
|
||||
|
||||
# --- Verarbeitungsvariablen ---
|
||||
tasks_for_current_processing_batch = [] # Tasks fuer den aktuellen ThreadPoolExecutor-Batch
|
||||
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
# --- Verarbeitung ---
|
||||
tasks_for_processing_batch = [] # Tasks fuer den aktuellen Batch (Liste von Dicts)
|
||||
rows_in_current_batch = [] # 1-basierte Zeilennummern im aktuellen Batch
|
||||
# Sheet Updates werden direkt nach Verarbeitung eines Batch geschrieben,
|
||||
# keine grosse gesammelte Liste wie bei Scraping/Summarization
|
||||
|
||||
|
||||
processed_count = 0 # Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den Batch aufgenommen werden (im Rahmen des Limits).
|
||||
skipped_count = 0 # Zaehlt Zeilen, die uebersprungen wurden (wegen AO oder fehlender Daten).
|
||||
|
||||
|
||||
# Laden Sie das Zielschema, falls noch nicht geschehen (evaluate_branche_chatgpt benoetigt es Block 10)
|
||||
# load_target_schema (Block 6) befuellt die globale Variable ALLOWED_TARGET_BRANCHES.
|
||||
# Laden des Zielschemas (global, aber hier zur Sicherheit prüfen)
|
||||
global ALLOWED_TARGET_BRANCHES
|
||||
if not ALLOWED_TARGET_BRANCHES:
|
||||
# load_target_schema ist mit retry_on_failure dekoriert (Block 2).
|
||||
load_target_schema() # Versuche, das Schema zu laden
|
||||
|
||||
# Pruefe erneut, ob das Schema geladen wurde
|
||||
load_target_schema()
|
||||
if not ALLOWED_TARGET_BRANCHES:
|
||||
self.logger.critical("FEHLER: Ziel-Branchenschema konnte nach Ladeversuch nicht geladen werden. Branchenbewertung nicht moeglich. Breche Batch ab.") # <<< GEÄNDERT
|
||||
return # Beende die Methode
|
||||
self.logger.critical("FEHLER: Ziel-Branchenschema konnte nicht geladen werden. Branchenbewertung nicht moeglich. Breche Batch ab.")
|
||||
return
|
||||
|
||||
|
||||
# Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte Sheet-Zeilennummer)
|
||||
# Hauptschleife über die Zeilen
|
||||
for i in range(start_sheet_row, end_sheet_row + 1):
|
||||
row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste
|
||||
# Pruefen Sie, ob das Ende des Sheets erreicht wurde
|
||||
if row_index_in_list >= total_sheet_rows: break # Ende des Sheets erreicht
|
||||
row_index_in_list = i - 1
|
||||
if row_index_in_list >= total_sheet_rows: break
|
||||
|
||||
|
||||
row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile
|
||||
|
||||
|
||||
# Stellen Sie sicher, dass die Zeile nicht leer ist
|
||||
row = all_data[row_index_in_list]
|
||||
if not any(cell and isinstance(cell, str) and cell.strip() for cell in row):
|
||||
#self.logger.debug(f"Zeile {i}: Uebersprungen (Leere Zeile).") # Zu viel Laerm im Debug
|
||||
skipped_count += 1 # Zaehlen als uebersprungen
|
||||
continue # Springe zur naechsten Zeile
|
||||
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist ---
|
||||
# Kriterium: Timestamp letzte Pruefung (AO) ist leer.
|
||||
# ZUSAETZLICH: Pruefen, ob genuegend Quelldaten fuer die Evaluation vorhanden sind.
|
||||
# Mindestens 2 der folgenden Quellen muessen vorhanden sein:
|
||||
# CRM Branche ODER Beschreibung ODER Wiki Branche/Kategorien ODER Website Summary.
|
||||
|
||||
# Holen Sie den Wert aus Spalte AO (Timestamp letzte Pruefung) (nutzt interne Helfer _get_cell_value_safe)
|
||||
ao_value = self._get_cell_value_safe(row, "Timestamp letzte Pruefung").strip() # Block 1 Column Map
|
||||
# Pruefung basiert darauf, ob AO leer ist.
|
||||
ao_value = self._get_cell_value_safe(row, "Timestamp letzte Pruefung").strip()
|
||||
processing_needed_based_on_status = not ao_value
|
||||
|
||||
|
||||
# Wenn der Schritt laut Status nicht noetig ist, ueberspringen
|
||||
if not processing_needed_based_on_status:
|
||||
skipped_count += 1 # Zaehlen als uebersprungene Zeile
|
||||
continue # Springe zur naechsten Zeile
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
crm_branche_val = self._get_cell_value_safe(row, "CRM Branche").strip()
|
||||
crm_beschreibung_val = self._get_cell_value_safe(row, "CRM Beschreibung").strip()
|
||||
wiki_branche_val = self._get_cell_value_safe(row, "Wiki Branche").strip()
|
||||
wiki_kategorien_val = self._get_cell_value_safe(row, "Wiki Kategorien").strip()
|
||||
website_summary_val = self._get_cell_value_safe(row, "Website Zusammenfassung").strip()
|
||||
info_sources_count = sum(1 for val in [crm_branche_val, crm_beschreibung_val, wiki_branche_val, wiki_kategorien_val, website_summary_val]
|
||||
if val and isinstance(val, str) and val.strip() and val.strip().lower() != "k.a." and not val.strip().startswith("FEHLER"))
|
||||
if info_sources_count < 2:
|
||||
self.logger.debug(f"Zeile {i} (Branch Check): Uebersprungen (AO leer, aber nur {info_sources_count} Informationsquellen verfuegbar). Mindestens 2 benoetigt.")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Pruefe, ob ausreichend Daten vorhanden sind (mindestens 2 Quellen)
|
||||
# Nutzt interne Helfer _get_cell_value_safe
|
||||
crm_branche = self._get_cell_value_safe(row, "CRM Branche").strip() # Block 1 Column Map
|
||||
crm_beschreibung = self._get_cell_value_safe(row, "CRM Beschreibung").strip() # Block 1 Column Map
|
||||
wiki_branche = self._get_cell_value_safe(row, "Wiki Branche").strip() # Block 1 Column Map
|
||||
wiki_kategorien = self._get_cell_value_safe(row, "Wiki Kategorien").strip() # Block 1 Column Map
|
||||
website_summary = self._get_cell_value_safe(row, "Website Zusammenfassung").strip() # Block 1 Column Map
|
||||
|
||||
# Pruefe, ob die Werte vorhanden und nicht Standard "k.A." sind.
|
||||
info_sources_count = sum(1 for val in [crm_branche, crm_beschreibung, wiki_branche, wiki_kategorien, website_summary]
|
||||
if val and isinstance(val, str) and val.strip() and val.strip().lower() != "k.a." and not val.strip().startswith("FEHLER")) # Schliesse Fehlerwerte aus
|
||||
|
||||
# Wenn nicht genuegend Informationsquellen verfuegbar sind
|
||||
if info_sources_count < 2: # Mindestens 2 Info-Punkte sollten vorhanden sein (kann angepasst werden)
|
||||
self.logger.debug(f"Zeile {i} (Branch Check): Uebersprungen (AO leer, aber nur {info_sources_count} Informationsquellen verfuegbar). Mindestens 2 benoetigt.") # <<< GEÄNDERT
|
||||
skipped_count += 1 # Zaehlen als uebersprungene Zeile
|
||||
continue # Springe zur naechsten Zeile
|
||||
|
||||
|
||||
# --- Wenn Verarbeitung noetig und genuegend Daten da: Fuege zur Batch-Liste hinzu ---
|
||||
processed_count += 1 # Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im Rahmen des Limits zaehlen)
|
||||
|
||||
# Pruefe das Limit fuer verarbeitete Zeilen
|
||||
if limit is not None and isinstance(limit, int) and limit > 0 and processed_count > limit:
|
||||
# Wenn das Limit erreicht ist und es ein positives Limit gibt
|
||||
self.logger.info(f"Verarbeitungslimit ({limit}) fuer process_branch_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT
|
||||
break # Brich die Schleife ab
|
||||
|
||||
|
||||
# Sammle die benoetigten Daten fuer den Branchen-Task (evaluate_branch_task denselben Block).
|
||||
# Diese Daten werden in einem Dictionary fuer den Batch gesammelt.
|
||||
tasks_for_processing_batch.append({
|
||||
"row_num": i, # Die 1-basierte Sheet-Zeilennummer
|
||||
"crm_branche": crm_branche, # Nutzt den Wert aus dem Sheet
|
||||
"beschreibung": crm_beschreibung, # Nutzt den Wert aus dem Sheet
|
||||
"wiki_branche": wiki_branche, # Nutzt den Wert aus dem Sheet
|
||||
"wiki_kategorien": wiki_kategorien, # Nutzt den Wert aus dem Sheet
|
||||
"website_summary": website_summary # Nutzt den Wert aus dem Sheet
|
||||
# --- Wenn Verarbeitung noetig: Zur Task-Liste hinzufügen ---
|
||||
# (Der processed_count wird erst erhöht, wenn der Task tatsächlich in einen Batch geht)
|
||||
|
||||
tasks_for_current_processing_batch.append({
|
||||
"row_num": i,
|
||||
"crm_branche": crm_branche_val,
|
||||
"beschreibung": crm_beschreibung_val,
|
||||
"wiki_branche": wiki_branche_val,
|
||||
"wiki_kategorien": wiki_kategorien_val,
|
||||
"website_summary": website_summary_val
|
||||
})
|
||||
# Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu
|
||||
rows_in_current_batch.append(i)
|
||||
|
||||
# --- Verarbeite den Batch, wenn voll oder am Ende des Limits/Bereichs ---
|
||||
# Das Limit `limit` bezieht sich auf die Anzahl der tatsächlich *verarbeiteten* (nicht übersprungenen) Tasks.
|
||||
# `processed_count` wird jetzt erst beim Start eines Batches relevant.
|
||||
|
||||
# Bedingung zum Starten eines Batches:
|
||||
# 1. Batch ist voll ODER
|
||||
# 2. Es ist die letzte Zeile im definierten Bereich (`i == end_sheet_row`) UND es gibt Tasks ODER
|
||||
# 3. Das `limit` ist erreicht (falls gesetzt) UND es gibt Tasks.
|
||||
|
||||
# Zähle, wie viele Tasks wir bisher verarbeitet hätten, wenn dieser Batch startet.
|
||||
# Dies ist `processed_count + len(tasks_for_current_processing_batch)`.
|
||||
|
||||
execute_batch_now = False
|
||||
if len(tasks_for_current_processing_batch) >= processing_batch_size:
|
||||
execute_batch_now = True
|
||||
elif i == end_sheet_row and tasks_for_current_processing_batch: # Letzte Zeile des Bereichs und es gibt Tasks
|
||||
execute_batch_now = True
|
||||
elif limit is not None and (processed_count + len(tasks_for_current_processing_batch)) >= limit and tasks_for_current_processing_batch:
|
||||
# Wenn das Limit erreicht wird durch die aktuellen Tasks im Batch
|
||||
# Kürze tasks_for_current_processing_batch, falls es das Limit überschreiten würde
|
||||
if (processed_count + len(tasks_for_current_processing_batch)) > limit:
|
||||
num_to_trim = (processed_count + len(tasks_for_current_processing_batch)) - limit
|
||||
# Entferne die überzähligen Tasks vom Ende der Liste
|
||||
tasks_for_current_processing_batch = tasks_for_current_processing_batch[:-num_to_trim]
|
||||
|
||||
if tasks_for_current_processing_batch: # Nur ausführen, wenn nach Trimmen noch Tasks da sind
|
||||
execute_batch_now = True
|
||||
|
||||
|
||||
# --- Verarbeite den Batch, wenn voll ---
|
||||
# Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat.
|
||||
# processing_batch_size wird aus Config geholt (Block 1).
|
||||
if len(tasks_for_processing_batch) >= processing_batch_size:
|
||||
# Logge den Start der Batch-Verarbeitung
|
||||
batch_start_row = tasks_for_processing_batch[0]['row_num']
|
||||
batch_end_row = tasks_for_processing_batch[-1]['row_num']
|
||||
self.logger.debug(f"\n--- Starte Branch-Evaluation Batch ({len(tasks_for_processing_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
|
||||
if execute_batch_now:
|
||||
batch_start_row_log = tasks_for_current_processing_batch[0]['row_num']
|
||||
batch_end_row_log = tasks_for_current_processing_batch[-1]['row_num']
|
||||
self.logger.debug(f"\n--- Starte Branch-Evaluation Batch ({len(tasks_for_current_processing_batch)} Tasks, Zeilen {batch_start_row_log}-{batch_end_row_log}) ---")
|
||||
|
||||
# Erhöhe processed_count um die Anzahl der Tasks in diesem Batch
|
||||
processed_count += len(tasks_for_current_processing_batch)
|
||||
|
||||
|
||||
results_list = [] # Liste zum Speichern der Ergebnisse fuer diesen Batch (Liste von Dicts)
|
||||
batch_error_count = 0 # Fehlerzaehler fuer diesen spezifischen Batch
|
||||
|
||||
self.logger.debug(f" Evaluiere {len(tasks_for_processing_batch)} Zeilen parallel (max {MAX_BRANCH_WORKERS} worker, {OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)...") # <<< GEÄNDERT
|
||||
# Holen Sie die Parallelisierungskonfiguration aus Config (Block 1).
|
||||
MAX_BRANCH_WORKERS = getattr(Config, 'MAX_BRANCH_WORKERS', 10)
|
||||
OPENAI_CONCURRENCY_LIMIT = getattr(Config, 'OPENAI_CONCURRENCY_LIMIT', 3)
|
||||
# Erstellen Sie die Semaphore Instanz (wird von evaluate_branch_task benutzt).
|
||||
# threading.Semaphore muss hier innerhalb des Batch-Aufrufs erstellt werden.
|
||||
# --- Parallele Verarbeitung dieses Batches ---
|
||||
batch_results_data = [] # Ergebnisse dieses spezifischen Batches
|
||||
batch_error_count_this_batch = 0
|
||||
|
||||
openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT)
|
||||
|
||||
|
||||
# *** BEGINN PARALLELE VERARBEITUNG MIT THREADS ***
|
||||
# Verwende ThreadPoolExecutor fuer parallele Ausfuehrung der evaluate_branch_task.
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor:
|
||||
# Map tasks to futures. Ruft die INTERNE Worker-Funktion auf.
|
||||
# Uebergibt das task_data Dictionary und die Semaphore Instanz als Argumente.
|
||||
future_to_task = {executor.submit(self.evaluate_branch_task, task, openai_semaphore_branch): task for task in tasks_for_processing_batch}
|
||||
|
||||
|
||||
# Verarbeite die Ergebnisse, sobald sie fertig sind.
|
||||
future_to_task = {executor.submit(self.evaluate_branch_task, task, openai_semaphore_branch): task for task in tasks_for_current_processing_batch}
|
||||
for future in concurrent.futures.as_completed(future_to_task):
|
||||
task = future_to_task[future] # Holen Sie die urspruenglichen Task-Daten (Dict)
|
||||
task = future_to_task[future]
|
||||
try:
|
||||
# Holen Sie das Ergebnis vom Future. Wenn die Worker-Funktion eine Exception wirft, wird diese hier gefangen.
|
||||
result_data = future.result() # Ergebnis ist ein Dictionary {'row_num': ..., 'result': ..., 'error': ...}
|
||||
results_list.append(result_data) # Fuege das Ergebnis zur Liste hinzu
|
||||
# Pruefe, ob der Worker einen Fehler gemeldet hat (error Feld im Ergebnis)
|
||||
if result_data.get('error'):
|
||||
batch_error_count += 1 # Erhoehe den Fehlerzaehler fuer diesen Batch
|
||||
|
||||
result_data = future.result()
|
||||
batch_results_data.append(result_data)
|
||||
if result_data.get('error'):
|
||||
batch_error_count_this_batch += 1
|
||||
except Exception as exc:
|
||||
# Dieser Block faengt unerwartete Fehler ab, die waehrend der Future-Ergebnis-Abfrage auftreten.
|
||||
# Die meisten Fehler sollten von evaluate_branch_task oder seinen Helfern behandelt werden.
|
||||
row_num = task['row_num'] # Zeilennummer aus den Task-Daten
|
||||
err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage Branch Task Zeile {row_num}: {type(exc).__name__} - {exc}"
|
||||
self.logger.error(err_msg) # <<< GEÄNDERT
|
||||
# Setze einen Standard-Fehler-Ergebniswert fuer diese Zeile
|
||||
results_list.append({"row_num": row_num, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg[:500]}, "error": err_msg}) # Kuerze Begruendung
|
||||
batch_error_count += 1 # Erhoehe den Fehlerzaehler
|
||||
row_num_exc = task['row_num']
|
||||
err_msg_exc = f"Unerwarteter Fehler bei Ergebnisabfrage Branch Task Zeile {row_num_exc}: {type(exc).__name__} - {exc}"
|
||||
self.logger.error(err_msg_exc)
|
||||
batch_results_data.append({"row_num": row_num_exc, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg_exc[:500]}, "error": err_msg_exc})
|
||||
batch_error_count_this_batch += 1
|
||||
|
||||
self.logger.debug(f" Branch-Evaluation fuer Batch beendet. {len(batch_results_data)} Ergebnisse erhalten ({batch_error_count_this_batch} Fehler in diesem Batch).")
|
||||
|
||||
|
||||
# *** ENDE PARALLELE VERARBEITUNG ***
|
||||
self.logger.debug(f" Branch-Evaluation fuer Batch beendet. {len(results_list)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") # <<< GEÄNDERT
|
||||
|
||||
|
||||
# Sheet Updates vorbereiten FÜR DIESEN BATCH.
|
||||
# Dies geschieht jetzt nach der parallelen Verarbeitung.
|
||||
if results_list:
|
||||
# --- Sheet Updates für diesen Batch vorbereiten und senden ---
|
||||
if batch_results_data:
|
||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
current_version = getattr(Config, 'VERSION', 'unknown')
|
||||
batch_sheet_updates = []
|
||||
sheet_updates_for_this_batch = []
|
||||
batch_results_data.sort(key=lambda x: x['row_num'])
|
||||
|
||||
results_list.sort(key=lambda x: x['row_num'])
|
||||
for res_data_item in batch_results_data:
|
||||
row_num_item = res_data_item['row_num']
|
||||
result_item = res_data_item['result']
|
||||
self.logger.debug(f" Zeile {row_num_item} (aus Batch): Ergebnis aus evaluate_branch_task: {result_item}")
|
||||
|
||||
for res_data in results_list: # <--- HIER IST DIE SCHLEIFE, DIE SIE IM SCREENSHOT ZEIGEN
|
||||
row_num = res_data['row_num']
|
||||
result = res_data['result']
|
||||
|
||||
self.logger.debug(f" Zeile {row_num} (aus Batch): Ergebnis aus evaluate_branch_task: {result}") # DEBUG
|
||||
|
||||
# Chat Vorschlag Branche (AH)
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num}',
|
||||
'values': [[result.get("branch", "FEHLER BRANCH")]]})
|
||||
# Chat Branche Konfidenz (AI) - NEU
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num}',
|
||||
'values': [[result.get("confidence", "N/A CONF")]]}) # <<< HIER ANPASSEN
|
||||
# Chat Konsistenz Branche (AJ)
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num}',
|
||||
'values': [[result.get("consistency", "error CONS")]]})
|
||||
# Chat Begruendung Abweichung Branche (AK)
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num}',
|
||||
'values': [[result.get("justification", "Keine Begr. JUST")]]})
|
||||
|
||||
# Timestamp letzte Pruefung (BC)
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num}',
|
||||
'values': [[current_timestamp]]})
|
||||
# Version (BD)
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num}',
|
||||
'values': [[current_version]]})
|
||||
|
||||
|
||||
# --- Sende Updates fuer DIESEN BATCH SOFORT ---
|
||||
# Sende die gesammelten Updates fuer diesen Batch.
|
||||
if batch_sheet_updates:
|
||||
self.logger.debug(f" Sende Sheet-Update fuer {len(results_list)} Zeilen ({len(batch_sheet_updates)} Zellen) dieses Batches...") # <<< GEÄNDERT
|
||||
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry.
|
||||
# Wenn es fehlschlaegt, wird es intern geloggt.
|
||||
success = self.sheet_handler.batch_update_cells(batch_sheet_updates)
|
||||
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num_item}',
|
||||
'values': [[result_item.get("branch", "FEHLER BRANCH")]]})
|
||||
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num_item}',
|
||||
'values': [[result_item.get("confidence", "N/A CONF")]]})
|
||||
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num_item}',
|
||||
'values': [[result_item.get("consistency", "error CONS")]]})
|
||||
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num_item}',
|
||||
'values': [[result_item.get("justification", "Keine Begr. JUST")]]})
|
||||
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num_item}',
|
||||
'values': [[current_timestamp]]})
|
||||
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num_item}',
|
||||
'values': [[current_version]]})
|
||||
|
||||
if sheet_updates_for_this_batch:
|
||||
self.logger.debug(f" Sende Sheet-Update fuer {len(batch_results_data)} Zeilen ({len(sheet_updates_for_this_batch)} Zellen) dieses Batches...")
|
||||
success = self.sheet_handler.batch_update_cells(sheet_updates_for_this_batch)
|
||||
if success:
|
||||
self.logger.info(f" Sheet-Update fuer Batch Zeilen {batch_start_row}-{batch_end_row} erfolgreich.") # <<< GEÄNDERT
|
||||
# Der Fehlerfall wird von batch_update_cells geloggt
|
||||
|
||||
# else: self.logger.debug(f" Keine Sheet-Updates fuer Batch Zeilen {batch_start_row}-{batch_end_row} vorbereitet.") # Zu viel Laerm im Debug
|
||||
|
||||
|
||||
# Leere den Batch fuer die naechste Iteration
|
||||
tasks_for_processing_batch = []
|
||||
rows_in_current_batch = []
|
||||
|
||||
# Pause NACHDEM ein Batch komplett verarbeitet und geschrieben wurde (nutzt Config Block 1).
|
||||
# Dies ist wichtig, um Rate Limits und Serverlast zu managen.
|
||||
pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.8 # Längere Pause, z.B. 80% der Retry-Wartezeit
|
||||
self.logger.debug(f"--- Batch Zeilen {batch_start_row}-{batch_end_row} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---") # <<< GEÄNDERT
|
||||
self.logger.info(f" Sheet-Update fuer Batch Zeilen {batch_start_row_log}-{batch_end_row_log} erfolgreich.")
|
||||
|
||||
tasks_for_current_processing_batch = [] # Batch-Liste leeren
|
||||
pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.8
|
||||
self.logger.debug(f"--- Batch Zeilen {batch_start_row_log}-{batch_end_row_log} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---")
|
||||
time.sleep(pause_duration)
|
||||
|
||||
# Wenn das Limit erreicht wurde, die Hauptschleife verlassen
|
||||
if limit is not None and processed_count >= limit:
|
||||
self.logger.info(f"Gesamtes Verarbeitungslimit ({limit}) fuer process_branch_batch erreicht. Beende.")
|
||||
break
|
||||
|
||||
# --- Verarbeitung des letzten unvollständigen Batches (falls Schleife nicht durch Limit beendet wurde) ---
|
||||
# Dieser Block ist jetzt durch die Logik `elif i == end_sheet_row and tasks_for_current_processing_batch:` abgedeckt.
|
||||
# Ein separater Block für den "finalen Batch" ist nicht mehr nötig, wenn das Limit korrekt gehandhabt wird.
|
||||
|
||||
# --- Verarbeitung des letzten unvollstaendigen Batches nach der Schleife ---
|
||||
# Wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind.
|
||||
if tasks_for_processing_batch:
|
||||
# ... (Logik für den letzten Batch mit ThreadPoolExecutor) ...
|
||||
self.logger.debug(f" FINALER Branch Batch beendet. {len(results_list)} Ergebnisse erhalten ({batch_error_count} Fehler).")
|
||||
|
||||
# Sammle Sheet Updates (AH, AI, AJ, AK, BC, BD) fuer diesen finalen Batch.
|
||||
if results_list: # Beginn des Blocks, zu dem das problematische if gehören sollte
|
||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
current_version = getattr(Config, 'VERSION', 'unknown')
|
||||
batch_sheet_updates = []
|
||||
results_list.sort(key=lambda x: x['row_num'])
|
||||
for res_data in results_list: # Beginn der inneren Schleife
|
||||
row_num = res_data['row_num']
|
||||
result = res_data['result']
|
||||
|
||||
self.logger.debug(f" FINALER Batch Zeile {row_num}: Ergebnis aus evaluate_branch_task: {result}")
|
||||
|
||||
# Sammle Updates fuer AH, AI, AJ, AK, BC, BD
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num}',
|
||||
'values': [[result.get("branch", "FEHLER BRANCH")]]})
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num}',
|
||||
'values': [[result.get("confidence", "N/A CONF")]]})
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num}',
|
||||
'values': [[result.get("consistency", "error CONS")]]})
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num}',
|
||||
'values': [[result.get("justification", "Keine Begr. JUST")]]})
|
||||
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num}',
|
||||
'values': [[current_timestamp]]})
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num}',
|
||||
'values': [[current_version]]})
|
||||
# Ende der inneren for-Schleife (for res_data in results_list)
|
||||
|
||||
# Sende die gesammelten Updates fuer DIESEN finalen Batch.
|
||||
# DIESER BLOCK MUSS AUF DERSELBEN EBENE WIE DIE if results_list BEDINGUNG SEIN (oder eine Ebene tiefer, wenn er nur bei results_list ausgeführt werden soll)
|
||||
# Basierend auf der Logik, dass Updates nur gesendet werden, WENN es welche gibt, gehört es eine Ebene tiefer als if results_list.
|
||||
# Also auf der Ebene der batch_sheet_updates = [] Initialisierung.
|
||||
|
||||
# ---> KORREKTE EINRÜCKUNG FÜR DEN FOLGENDEN BLOCK <---
|
||||
# Dieser Block sollte auf derselben Ebene sein wie die Initialisierung von batch_sheet_updates
|
||||
# oder, wenn er nur ausgeführt werden soll, wenn results_list nicht leer ist,
|
||||
# dann innerhalb des `if results_list:` Blocks, aber NACH der `for`-Schleife.
|
||||
# Im Screenshot ist es direkt nach der for-Schleife, also innerhalb von `if results_list:`.
|
||||
|
||||
if batch_sheet_updates: # Diese Zeile muss dieselbe Einrückung haben wie z.B. "current_timestamp ="
|
||||
self.logger.debug(f" Sende FINALES Sheet-Update fuer {len(results_list)} Zeilen ({len(batch_sheet_updates)} Zellen)...")
|
||||
success = self.sheet_handler.batch_update_cells(batch_sheet_updates)
|
||||
if success:
|
||||
self.logger.info(f" FINALES Sheet-Update fuer Branch Batch erfolgreich.")
|
||||
# Der Fehlerfall wird von batch_update_cells geloggt
|
||||
# Ende des `if results_list:` Blocks
|
||||
# Ende des `if tasks_for_processing_batch:` Blocks (für den finalen Batch)
|
||||
|
||||
# Logge den Abschluss des Modus
|
||||
self.logger.info(f"Brancheneinschaetzung (Parallel Batch) abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen.")
|
||||
# Keine Pause nach diesem Modus noetig, da die naechste Aktion im Dispatcher (Block 34) folgt.
|
||||
self.logger.info(f"Brancheneinschaetzung (Parallel Batch) abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.")
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
|
||||
Reference in New Issue
Block a user