bugfix
This commit is contained in:
@@ -2494,48 +2494,87 @@ def process_website_summarization_batch(sheet_handler, start_row_index_in_sheet,
|
||||
# Komplette Funktion process_branch_batch (MIT Korrektur und Prüfung auf AO)
|
||||
def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_in_sheet):
|
||||
"""
|
||||
Batch-Prozess für Brancheneinschätzung. Lädt Daten neu, prüft Timestamp AO,
|
||||
liest vorhandene Zusammenfassung (AS) und setzt AO + AP für bearbeitete Zeilen.
|
||||
Batch-Prozess für Brancheneinschätzung mit paralleler Verarbeitung via Threads.
|
||||
Prüft Timestamp AO, führt evaluate_branche_chatgpt parallel aus (limitiert),
|
||||
setzt W, X, Y, AO + AP und sendet Sheet-Updates gebündelt.
|
||||
"""
|
||||
debug_print(f"Starte Brancheneinschätzung (Batch) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...")
|
||||
debug_print(f"Starte Brancheneinschätzung (Parallel Batch) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...")
|
||||
|
||||
# --- Lade Daten ---
|
||||
if not sheet_handler.load_data(): return
|
||||
all_data = sheet_handler.get_all_data_with_headers()
|
||||
if not all_data or len(all_data) <= 5: return
|
||||
sheet = sheet_handler.sheet
|
||||
header_rows = 5
|
||||
|
||||
# Indizes holen
|
||||
# --- Indizes und Buchstaben ---
|
||||
timestamp_col_key = "Timestamp letzte Prüfung"
|
||||
timestamp_col_index = COLUMN_MAP.get(timestamp_col_key)
|
||||
branche_crm_idx = COLUMN_MAP.get("CRM Branche")
|
||||
beschreibung_idx = COLUMN_MAP.get("CRM Beschreibung")
|
||||
branche_wiki_idx = COLUMN_MAP.get("Wiki Branche")
|
||||
kategorien_wiki_idx = COLUMN_MAP.get("Wiki Kategorien")
|
||||
summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung") # Index für AS
|
||||
summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung")
|
||||
version_col_idx = COLUMN_MAP.get("Version")
|
||||
branch_w_idx = COLUMN_MAP.get("Chat Vorschlag Branche")
|
||||
branch_x_idx = COLUMN_MAP.get("Chat Konsistenz Branche")
|
||||
branch_y_idx = COLUMN_MAP.get("Chat Begründung Abweichung Branche")
|
||||
|
||||
required_indices = [timestamp_col_index, branche_crm_idx, beschreibung_idx, branche_wiki_idx,
|
||||
kategorien_wiki_idx, summary_web_idx, version_col_index, branch_w_idx,
|
||||
branch_x_idx, branch_y_idx]
|
||||
if None in required_indices:
|
||||
debug_print(f"FEHLER: Mindestens ein benötigter Spaltenindex für process_branch_batch fehlt in COLUMN_MAP.")
|
||||
return
|
||||
|
||||
if None in required_indices: return debug_print(f"FEHLER: Indizes für process_branch_batch fehlen.")
|
||||
ts_col_letter = sheet_handler._get_col_letter(timestamp_col_index + 1)
|
||||
version_col_letter = sheet_handler._get_col_letter(version_col_index + 1)
|
||||
branch_w_letter = sheet_handler._get_col_letter(branch_w_idx + 1)
|
||||
branch_x_letter = sheet_handler._get_col_letter(branch_x_idx + 1)
|
||||
branch_y_letter = sheet_handler._get_col_letter(branch_y_idx + 1)
|
||||
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
# --- Konfiguration für Parallelisierung ---
|
||||
MAX_BRANCH_WORKERS = 10 # Wie viele Threads für parallele Bewertung?
|
||||
# Limit für gleichzeitige OpenAI Calls (wichtig wegen Rate Limits!)
|
||||
OPENAI_CONCURRENCY_LIMIT = 5
|
||||
openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT)
|
||||
# Batch-Größe für das Sammeln von Aufgaben, bevor der ThreadPool gestartet wird
|
||||
PROCESSING_BRANCH_BATCH_SIZE = Config.PROCESSING_BATCH_SIZE # z.B. 20 aus Config nehmen
|
||||
# Batch-Größe für das Senden von Sheet-Updates
|
||||
update_batch_row_limit = Config.UPDATE_BATCH_ROW_LIMIT # z.B. 50 aus Config
|
||||
|
||||
if not ALLOWED_TARGET_BRANCHES:
|
||||
load_target_schema()
|
||||
if not ALLOWED_TARGET_BRANCHES: return
|
||||
# --- Worker Funktion ---
|
||||
def evaluate_branch_task(task_data):
|
||||
row_num = task_data['row_num']
|
||||
result = {"branch": "k.A. (Fehler Task)", "consistency": "error", "justification": "Fehler in Worker-Task"}
|
||||
error = None
|
||||
try:
|
||||
# Limitiere OpenAI Calls mit Semaphore
|
||||
with openai_semaphore_branch:
|
||||
# debug_print(f"Task Zeile {row_num}: Starte evaluate_branche_chatgpt...") # Kann sehr laut sein
|
||||
# Füge kleine Pause hinzu, um Limits zu respektieren
|
||||
time.sleep(0.3) # Kleine Pause vor jedem Call
|
||||
result = evaluate_branche_chatgpt(
|
||||
task_data['crm_branche'],
|
||||
task_data['beschreibung'],
|
||||
task_data['wiki_branche'],
|
||||
task_data['wiki_kategorien'],
|
||||
task_data['website_summary']
|
||||
)
|
||||
# debug_print(f"Task Zeile {row_num}: evaluate_branche_chatgpt beendet.")
|
||||
except Exception as e:
|
||||
error = f"Fehler bei Branchenevaluation Zeile {row_num}: {e}"
|
||||
debug_print(error)
|
||||
# Füge Fehlerinfo zum Ergebnis hinzu, falls evaluate_branche_chatgpt selbst keinen Fehler liefert
|
||||
result['justification'] = error[:500] # Schreibe Fehler in Begründung (gekürzt)
|
||||
result['consistency'] = 'error_task'
|
||||
return {"row_num": row_num, "result": result, "error": error}
|
||||
|
||||
# --- Hauptverarbeitung ---
|
||||
tasks_for_processing_batch = []
|
||||
all_sheet_updates = []
|
||||
total_processed_count = 0
|
||||
total_skipped_count = 0
|
||||
total_error_count = 0
|
||||
|
||||
# Stelle sicher, dass Branchenschema geladen ist
|
||||
if not ALLOWED_TARGET_BRANCHES: load_target_schema();
|
||||
if not ALLOWED_TARGET_BRANCHES: return debug_print("FEHLER: Ziel-Schema nicht geladen. Abbruch Branch-Batch.")
|
||||
|
||||
for i in range(start_row_index_in_sheet, end_row_index_in_sheet + 1):
|
||||
row_index_in_list = i - 1
|
||||
@@ -2548,37 +2587,86 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_
|
||||
should_skip = True
|
||||
|
||||
if should_skip:
|
||||
skipped_count += 1
|
||||
total_skipped_count += 1
|
||||
continue
|
||||
|
||||
# Daten holen (inkl. AS, das von einem anderen Prozess geschrieben wurde)
|
||||
crm_branche = row[branche_crm_idx] if len(row) > branche_crm_idx else ""
|
||||
beschreibung = row[beschreibung_idx] if len(row) > beschreibung_idx else ""
|
||||
wiki_branche = row[branche_wiki_idx] if len(row) > branche_wiki_idx else ""
|
||||
wiki_kategorien = row[kategorien_wiki_idx] if len(row) > kategorien_wiki_idx else ""
|
||||
website_summary = row[summary_web_idx] if len(row) > summary_web_idx else "k.A." # Lese AS
|
||||
# Daten für den Task sammeln
|
||||
task_data = {
|
||||
"row_num": i,
|
||||
"crm_branche": row[branche_crm_idx] if len(row) > branche_crm_idx else "",
|
||||
"beschreibung": row[beschreibung_idx] if len(row) > beschreibung_idx else "",
|
||||
"wiki_branche": row[branche_wiki_idx] if len(row) > branche_wiki_idx else "",
|
||||
"wiki_kategorien": row[kategorien_wiki_idx] if len(row) > kategorien_wiki_idx else "",
|
||||
"website_summary": row[summary_web_idx] if len(row) > summary_web_idx else ""
|
||||
}
|
||||
tasks_for_processing_batch.append(task_data)
|
||||
|
||||
result = evaluate_branche_chatgpt(crm_branche, beschreibung, wiki_branche, wiki_kategorien, website_summary)
|
||||
processed_count += 1
|
||||
# --- Verarbeitungs-Batch ausführen, wenn voll oder letzte Zeile ---
|
||||
if len(tasks_for_processing_batch) >= PROCESSING_BRANCH_BATCH_SIZE or i == end_row_index_in_sheet:
|
||||
if tasks_for_processing_batch:
|
||||
batch_start_row = tasks_for_processing_batch[0]['row_num']
|
||||
batch_end_row = tasks_for_processing_batch[-1]['row_num']
|
||||
batch_task_count = len(tasks_for_processing_batch)
|
||||
debug_print(f"\n--- Starte Branch-Evaluation Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---")
|
||||
|
||||
updates = []
|
||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
current_version = Config.VERSION
|
||||
# --- Parallele Ausführung ---
|
||||
results_list = [] # Liste der Ergebnis-Dicts
|
||||
batch_error_count = 0
|
||||
debug_print(f" Evaluiere {batch_task_count} Zeilen parallel (max {MAX_BRANCH_WORKERS} worker, {OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)...")
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor:
|
||||
future_to_task = {executor.submit(evaluate_branch_task, task): task for task in tasks_for_processing_batch}
|
||||
for future in concurrent.futures.as_completed(future_to_task):
|
||||
task = future_to_task[future]
|
||||
try:
|
||||
result_data = future.result()
|
||||
results_list.append(result_data) # Füge komplettes Ergebnis hinzu
|
||||
if result_data['error']: batch_error_count += 1; total_error_count +=1
|
||||
except Exception as exc:
|
||||
row_num = task['row_num']
|
||||
err_msg = f"Generischer Fehler Branch Task Zeile {row_num}: {exc}"
|
||||
debug_print(err_msg)
|
||||
# Füge Fehler-Ergebnis hinzu
|
||||
results_list.append({"row_num": row_num, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg[:500]}, "error": err_msg})
|
||||
batch_error_count += 1; total_error_count +=1
|
||||
|
||||
updates.append({'range': f'{branch_w_letter}{i}', 'values': [[result.get("branch", "Fehler")]]})
|
||||
updates.append({'range': f'{branch_x_letter}{i}', 'values': [[result.get("consistency", "Fehler")]]})
|
||||
updates.append({'range': f'{branch_y_letter}{i}', 'values': [[result.get("justification", "Fehler")]]})
|
||||
updates.append({'range': f'{ts_col_letter}{i}', 'values': [[current_timestamp]]}) # AO Timestamp
|
||||
updates.append({'range': f'{version_col_letter}{i}', 'values': [[current_version]]}) # AP Version
|
||||
current_batch_processed_count = len(results_list)
|
||||
total_processed_count += current_batch_processed_count
|
||||
debug_print(f" Branch-Evaluation für Batch beendet. {current_batch_processed_count} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).")
|
||||
|
||||
if updates:
|
||||
success = sheet_handler.batch_update_cells(updates)
|
||||
if success: debug_print(f"Zeile {i}: Branch-Einschätzung erfolgreich aktualisiert.")
|
||||
else: debug_print(f"FEHLER beim Schreiben der Branch-Updates für Zeile {i}.")
|
||||
# --- Sheet Updates vorbereiten ---
|
||||
if results_list:
|
||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
current_version = Config.VERSION
|
||||
batch_sheet_updates = []
|
||||
for res_data in results_list:
|
||||
row_num = res_data['row_num']
|
||||
result = res_data['result']
|
||||
row_updates = [
|
||||
{'range': f'{branch_w_letter}{row_num}', 'values': [[result.get("branch", "Fehler")]]},
|
||||
{'range': f'{branch_x_letter}{row_num}', 'values': [[result.get("consistency", "Fehler")]]},
|
||||
{'range': f'{branch_y_letter}{row_num}', 'values': [[result.get("justification", "Fehler")]]},
|
||||
{'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]}, # AO Timestamp
|
||||
{'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]} # AP Version
|
||||
]
|
||||
batch_sheet_updates.extend(row_updates)
|
||||
all_sheet_updates.extend(batch_sheet_updates) # Sammle für größeren Batch-Update
|
||||
|
||||
time.sleep(Config.RETRY_DELAY)
|
||||
tasks_for_processing_batch = [] # Batch leeren
|
||||
|
||||
debug_print(f"Brancheneinschätzung (Batch) abgeschlossen. {processed_count} Zeilen eingeschätzt, {skipped_count} Zeilen wg. Timestamp übersprungen.")
|
||||
# --- Sheet Updates senden (wenn update_batch_row_limit erreicht) ---
|
||||
if len(all_sheet_updates) >= update_batch_row_limit * 5: # *5 Updates pro Zeile
|
||||
debug_print(f" Sende gesammelte Sheet-Updates ({len(all_sheet_updates)} Zellen)...")
|
||||
success = sheet_handler.batch_update_cells(all_sheet_updates)
|
||||
if success: debug_print(f" Sheet-Update bis Zeile {batch_end_row} erfolgreich.")
|
||||
else: debug_print(f" FEHLER beim Sheet-Update bis Zeile {batch_end_row}.")
|
||||
all_sheet_updates = []
|
||||
|
||||
# --- Finale Sheet Updates senden ---
|
||||
if all_sheet_updates:
|
||||
debug_print(f"Sende finale Sheet-Updates ({len(all_sheet_updates)} Zellen)...")
|
||||
sheet_handler.batch_update_cells(all_sheet_updates)
|
||||
|
||||
debug_print(f"Brancheneinschätzung (Parallel Batch) abgeschlossen. {total_processed_count} Zeilen verarbeitet (inkl. Fehler), {total_error_count} Fehler, {total_skipped_count} Zeilen wg. Timestamp übersprungen.")
|
||||
|
||||
|
||||
def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_in_sheet):
|
||||
|
||||
Reference in New Issue
Block a user