bugfix
This commit is contained in:
@@ -2505,21 +2505,14 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_
|
|||||||
if not all_data or len(all_data) <= 5: return
|
if not all_data or len(all_data) <= 5: return
|
||||||
header_rows = 5
|
header_rows = 5
|
||||||
|
|
||||||
# Indizes und Buchstaben holen
|
# --- Indizes und Buchstaben ---
|
||||||
timestamp_col_key = "Timestamp letzte Prüfung"
|
timestamp_col_key = "Timestamp letzte Prüfung"; timestamp_col_index = COLUMN_MAP.get(timestamp_col_key)
|
||||||
timestamp_col_index = COLUMN_MAP.get(timestamp_col_key)
|
branche_crm_idx = COLUMN_MAP.get("CRM Branche"); beschreibung_idx = COLUMN_MAP.get("CRM Beschreibung")
|
||||||
branche_crm_idx = COLUMN_MAP.get("CRM Branche")
|
branche_wiki_idx = COLUMN_MAP.get("Wiki Branche"); kategorien_wiki_idx = COLUMN_MAP.get("Wiki Kategorien")
|
||||||
beschreibung_idx = COLUMN_MAP.get("CRM Beschreibung")
|
summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung"); version_col_idx = COLUMN_MAP.get("Version")
|
||||||
branche_wiki_idx = COLUMN_MAP.get("Wiki Branche")
|
branch_w_idx = COLUMN_MAP.get("Chat Vorschlag Branche"); branch_x_idx = COLUMN_MAP.get("Chat Konsistenz Branche")
|
||||||
kategorien_wiki_idx = COLUMN_MAP.get("Wiki Kategorien")
|
|
||||||
summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung")
|
|
||||||
version_col_idx = COLUMN_MAP.get("Version")
|
|
||||||
branch_w_idx = COLUMN_MAP.get("Chat Vorschlag Branche")
|
|
||||||
branch_x_idx = COLUMN_MAP.get("Chat Konsistenz Branche")
|
|
||||||
branch_y_idx = COLUMN_MAP.get("Chat Begründung Abweichung Branche")
|
branch_y_idx = COLUMN_MAP.get("Chat Begründung Abweichung Branche")
|
||||||
required_indices = [timestamp_col_index, branche_crm_idx, beschreibung_idx, branche_wiki_idx,
|
required_indices = [timestamp_col_index, branche_crm_idx, beschreibung_idx, branche_wiki_idx, kategorien_wiki_idx, summary_web_idx, version_col_idx, branch_w_idx, branch_x_idx, branch_y_idx]
|
||||||
kategorien_wiki_idx, summary_web_idx, version_col_index, branch_w_idx,
|
|
||||||
branch_x_idx, branch_y_idx]
|
|
||||||
if None in required_indices: return debug_print(f"FEHLER: Indizes fehlen.")
|
if None in required_indices: return debug_print(f"FEHLER: Indizes fehlen.")
|
||||||
ts_col_letter = sheet_handler._get_col_letter(timestamp_col_index + 1)
|
ts_col_letter = sheet_handler._get_col_letter(timestamp_col_index + 1)
|
||||||
version_col_letter = sheet_handler._get_col_letter(version_col_idx + 1)
|
version_col_letter = sheet_handler._get_col_letter(version_col_idx + 1)
|
||||||
@@ -2527,34 +2520,30 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_
|
|||||||
branch_x_letter = sheet_handler._get_col_letter(branch_x_idx + 1)
|
branch_x_letter = sheet_handler._get_col_letter(branch_x_idx + 1)
|
||||||
branch_y_letter = sheet_handler._get_col_letter(branch_y_idx + 1)
|
branch_y_letter = sheet_handler._get_col_letter(branch_y_idx + 1)
|
||||||
|
|
||||||
# Konfiguration für Parallelisierung
|
# --- Konfiguration ---
|
||||||
MAX_BRANCH_WORKERS = Config.MAX_BRANCH_WORKERS
|
MAX_BRANCH_WORKERS = Config.MAX_BRANCH_WORKERS
|
||||||
OPENAI_CONCURRENCY_LIMIT = Config.OPENAI_CONCURRENCY_LIMIT # Hole aus Config
|
OPENAI_CONCURRENCY_LIMIT = Config.OPENAI_CONCURRENCY_LIMIT
|
||||||
openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT)
|
openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT)
|
||||||
PROCESSING_BRANCH_BATCH_SIZE = Config.PROCESSING_BATCH_SIZE
|
PROCESSING_BRANCH_BATCH_SIZE = Config.PROCESSING_BRANCH_BATCH_SIZE
|
||||||
|
update_batch_row_limit = Config.UPDATE_BATCH_ROW_LIMIT # Wird derzeit nicht verwendet, da wir pro Batch senden
|
||||||
|
|
||||||
# Worker Funktion (unverändert)
|
# --- Worker Funktion ---
|
||||||
def evaluate_branch_task(task_data):
|
def evaluate_branch_task(task_data):
|
||||||
row_num = task_data['row_num']
|
row_num = task_data['row_num']; result = {"branch": "k.A. (Fehler Task)", "consistency": "error", "justification": "Fehler in Worker-Task"}; error = None
|
||||||
result = {"branch": "k.A. (Fehler Task)", "consistency": "error", "justification": "Fehler in Worker-Task"}
|
|
||||||
error = None
|
|
||||||
try:
|
try:
|
||||||
with openai_semaphore_branch:
|
with openai_semaphore_branch:
|
||||||
time.sleep(0.3)
|
# debug_print(f" Task {row_num}: Warte auf Semaphore...") # Sehr detailliertes Logging
|
||||||
result = evaluate_branche_chatgpt(
|
# time.sleep(0.1) # Minimale Pause reduziert manchmal Race Conditions bei hoher Last
|
||||||
task_data['crm_branche'], task_data['beschreibung'], task_data['wiki_branche'],
|
# debug_print(f" Task {row_num}: Semaphore erhalten, starte evaluate_branche_chatgpt...")
|
||||||
task_data['wiki_kategorien'], task_data['website_summary']
|
result = evaluate_branche_chatgpt( task_data['crm_branche'], task_data['beschreibung'], task_data['wiki_branche'], task_data['wiki_kategorien'], task_data['website_summary'])
|
||||||
)
|
# debug_print(f" Task {row_num}: evaluate_branche_chatgpt beendet.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error = f"Fehler Brancheneval Zeile {row_num}: {e}"
|
error = f"Fehler bei Branchenevaluation Zeile {row_num}: {e}"; debug_print(error); result['justification'] = error[:500]; result['consistency'] = 'error_task'
|
||||||
debug_print(error); result['justification'] = error[:500]; result['consistency'] = 'error_task'
|
|
||||||
return {"row_num": row_num, "result": result, "error": error}
|
return {"row_num": row_num, "result": result, "error": error}
|
||||||
|
|
||||||
# Hauptverarbeitung
|
# --- Hauptverarbeitung ---
|
||||||
tasks_for_processing_batch = []
|
tasks_for_processing_batch = []
|
||||||
total_processed_count = 0
|
total_processed_count = 0; total_skipped_count = 0; total_error_count = 0
|
||||||
total_skipped_count = 0
|
|
||||||
total_error_count = 0
|
|
||||||
|
|
||||||
if not ALLOWED_TARGET_BRANCHES: load_target_schema();
|
if not ALLOWED_TARGET_BRANCHES: load_target_schema();
|
||||||
if not ALLOWED_TARGET_BRANCHES: return debug_print("FEHLER: Ziel-Schema nicht geladen.")
|
if not ALLOWED_TARGET_BRANCHES: return debug_print("FEHLER: Ziel-Schema nicht geladen.")
|
||||||
@@ -2566,48 +2555,37 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_
|
|||||||
|
|
||||||
# Timestamp-Prüfung (AO)
|
# Timestamp-Prüfung (AO)
|
||||||
should_skip = False
|
should_skip = False
|
||||||
if len(row) > timestamp_col_index and str(row[timestamp_col_index]).strip():
|
if len(row) > timestamp_col_index and str(row[timestamp_col_index]).strip(): should_skip = True
|
||||||
should_skip = True
|
if should_skip: total_skipped_count += 1; continue
|
||||||
|
|
||||||
if should_skip:
|
|
||||||
total_skipped_count += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Task sammeln
|
# Task sammeln
|
||||||
task_data = {
|
task_data = { "row_num": i, "crm_branche": row[branche_crm_idx] if len(row) > branche_crm_idx else "", "beschreibung": row[beschreibung_idx] if len(row) > beschreibung_idx else "", "wiki_branche": row[branche_wiki_idx] if len(row) > branche_wiki_idx else "", "wiki_kategorien": row[kategorien_wiki_idx] if len(row) > kategorien_wiki_idx else "", "website_summary": row[summary_web_idx] if len(row) > summary_web_idx else ""}
|
||||||
"row_num": i,
|
|
||||||
"crm_branche": row[branche_crm_idx] if len(row) > branche_crm_idx else "",
|
|
||||||
"beschreibung": row[beschreibung_idx] if len(row) > beschreibung_idx else "",
|
|
||||||
"wiki_branche": row[branche_wiki_idx] if len(row) > branche_wiki_idx else "",
|
|
||||||
"wiki_kategorien": row[kategorien_wiki_idx] if len(row) > kategorien_wiki_idx else "",
|
|
||||||
"website_summary": row[summary_web_idx] if len(row) > summary_web_idx else ""
|
|
||||||
}
|
|
||||||
tasks_for_processing_batch.append(task_data)
|
tasks_for_processing_batch.append(task_data)
|
||||||
|
|
||||||
# --- Verarbeitungs-Batch ausführen ---
|
# --- Verarbeitungs-Batch ausführen ---
|
||||||
if len(tasks_for_processing_batch) >= PROCESSING_BRANCH_BATCH_SIZE or i == end_row_index_in_sheet:
|
if len(tasks_for_processing_batch) >= PROCESSING_BRANCH_BATCH_SIZE or i == end_row_index_in_sheet:
|
||||||
if tasks_for_processing_batch:
|
if tasks_for_processing_batch:
|
||||||
batch_start_row = tasks_for_processing_batch[0]['row_num']
|
batch_start_row = tasks_for_processing_batch[0]['row_num']; batch_end_row = tasks_for_processing_batch[-1]['row_num']
|
||||||
batch_end_row = tasks_for_processing_batch[-1]['row_num']
|
|
||||||
batch_task_count = len(tasks_for_processing_batch)
|
batch_task_count = len(tasks_for_processing_batch)
|
||||||
debug_print(f"\n--- Starte Branch-Evaluation Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---")
|
debug_print(f"\n--- Starte Branch-Evaluation Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---")
|
||||||
|
|
||||||
results_list = []
|
results_list = []; batch_error_count = 0
|
||||||
batch_error_count = 0
|
|
||||||
debug_print(f" Evaluiere {batch_task_count} Zeilen parallel (max {MAX_BRANCH_WORKERS} worker, {OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)...")
|
debug_print(f" Evaluiere {batch_task_count} Zeilen parallel (max {MAX_BRANCH_WORKERS} worker, {OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)...")
|
||||||
|
# *** BEGINN PARALLELE VERARBEITUNG ***
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor:
|
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor:
|
||||||
future_to_task = {executor.submit(evaluate_branch_task, task): task for task in tasks_for_processing_batch}
|
future_to_task = {executor.submit(evaluate_branch_task, task): task for task in tasks_for_processing_batch}
|
||||||
|
# Warte auf Ergebnisse und sammle sie
|
||||||
for future in concurrent.futures.as_completed(future_to_task):
|
for future in concurrent.futures.as_completed(future_to_task):
|
||||||
task = future_to_task[future]
|
task = future_to_task[future]
|
||||||
try:
|
try: result_data = future.result(); results_list.append(result_data);
|
||||||
result_data = future.result()
|
|
||||||
results_list.append(result_data)
|
|
||||||
if result_data['error']: batch_error_count += 1; total_error_count +=1
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
row_num = task['row_num']; err_msg = f"Generischer Fehler Branch Task Zeile {row_num}: {exc}"; debug_print(err_msg)
|
row_num = task['row_num']; err_msg = f"Generischer Fehler Branch Task Zeile {row_num}: {exc}"; debug_print(err_msg)
|
||||||
results_list.append({"row_num": row_num, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg[:500]}, "error": err_msg})
|
results_list.append({"row_num": row_num, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg[:500]}, "error": err_msg})
|
||||||
batch_error_count += 1; total_error_count +=1
|
batch_error_count += 1; total_error_count +=1
|
||||||
|
# Zähle Fehler aus dem Ergebnis-Dict
|
||||||
|
if results_list[-1]['error']: batch_error_count += 1; total_error_count +=1
|
||||||
|
|
||||||
|
# *** ENDE PARALLELE VERARBEITUNG ***
|
||||||
current_batch_processed_count = len(results_list)
|
current_batch_processed_count = len(results_list)
|
||||||
total_processed_count += current_batch_processed_count
|
total_processed_count += current_batch_processed_count
|
||||||
debug_print(f" Branch-Evaluation für Batch beendet. {current_batch_processed_count} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).")
|
debug_print(f" Branch-Evaluation für Batch beendet. {current_batch_processed_count} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).")
|
||||||
@@ -2616,16 +2594,19 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_
|
|||||||
if results_list:
|
if results_list:
|
||||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
current_version = Config.VERSION
|
current_version = Config.VERSION
|
||||||
batch_sheet_updates = [] # Nur Updates für diesen Batch
|
batch_sheet_updates = []
|
||||||
|
# Sortiere Ergebnisse nach Zeilennummer für geordnetes Schreiben (optional)
|
||||||
|
results_list.sort(key=lambda x: x['row_num'])
|
||||||
for res_data in results_list:
|
for res_data in results_list:
|
||||||
row_num = res_data['row_num']
|
row_num = res_data['row_num']; result = res_data['result']
|
||||||
result = res_data['result']
|
# Logge das individuelle Ergebnis VOR dem Update
|
||||||
|
debug_print(f" Zeile {row_num}: Ergebnis -> Branch='{result.get('branch')}', Consistency='{result.get('consistency')}', Justification='{result.get('justification', '')[:50]}...'")
|
||||||
row_updates = [
|
row_updates = [
|
||||||
{'range': f'{branch_w_letter}{row_num}', 'values': [[result.get("branch", "Fehler")]]},
|
{'range': f'{branch_w_letter}{row_num}', 'values': [[result.get("branch", "Fehler")]]},
|
||||||
{'range': f'{branch_x_letter}{row_num}', 'values': [[result.get("consistency", "Fehler")]]},
|
{'range': f'{branch_x_letter}{row_num}', 'values': [[result.get("consistency", "Fehler")]]},
|
||||||
{'range': f'{branch_y_letter}{row_num}', 'values': [[result.get("justification", "Fehler")]]},
|
{'range': f'{branch_y_letter}{row_num}', 'values': [[result.get("justification", "Fehler")]]},
|
||||||
{'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]}, # AO Timestamp
|
{'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]},
|
||||||
{'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]} # AP Version
|
{'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}
|
||||||
]
|
]
|
||||||
batch_sheet_updates.extend(row_updates)
|
batch_sheet_updates.extend(row_updates)
|
||||||
|
|
||||||
@@ -2635,17 +2616,16 @@ def process_branch_batch(sheet_handler, start_row_index_in_sheet, end_row_index_
|
|||||||
success = sheet_handler.batch_update_cells(batch_sheet_updates)
|
success = sheet_handler.batch_update_cells(batch_sheet_updates)
|
||||||
if success: debug_print(f" Sheet-Update für Batch {batch_start_row}-{batch_end_row} erfolgreich.")
|
if success: debug_print(f" Sheet-Update für Batch {batch_start_row}-{batch_end_row} erfolgreich.")
|
||||||
else: debug_print(f" FEHLER beim Sheet-Update für Batch {batch_start_row}-{batch_end_row}.")
|
else: debug_print(f" FEHLER beim Sheet-Update für Batch {batch_start_row}-{batch_end_row}.")
|
||||||
else:
|
else: debug_print(f" Keine Sheet-Updates für Batch {batch_start_row}-{batch_end_row} vorbereitet.")
|
||||||
debug_print(f" Keine Sheet-Updates für Batch {batch_start_row}-{batch_end_row} vorbereitet.")
|
|
||||||
|
|
||||||
tasks_for_processing_batch = [] # Batch leeren
|
tasks_for_processing_batch = [] # Batch leeren
|
||||||
debug_print(f"--- Verarbeitungs-Batch {batch_start_row}-{batch_end_row} abgeschlossen ---")
|
debug_print(f"--- Verarbeitungs-Batch {batch_start_row}-{batch_end_row} abgeschlossen ---")
|
||||||
# Kurze Pause NACHDEM ein Batch komplett verarbeitet und geschrieben wurde
|
# Kurze Pause NACHDEM ein Batch komplett verarbeitet und geschrieben wurde
|
||||||
time.sleep(1) # Kurze Pause von 1 Sekunde
|
# debug_print(" Warte 1 Sekunde...") # Test-Log
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
# --- time.sleep(Config.RETRY_DELAY) wurde HIER entfernt ---
|
# !!! HIER DARF KEIN SLEEP STEHEN !!!
|
||||||
|
# time.sleep(Config.RETRY_DELAY) # <<< DIESE ZEILE MUSS WEG SEIN in deinem Code!
|
||||||
# Kein finales Update senden mehr nötig, da nach jedem Batch gesendet wird
|
|
||||||
|
|
||||||
debug_print(f"Brancheneinschätzung (Parallel Batch) abgeschlossen. {total_processed_count} Zeilen verarbeitet (inkl. Fehler), {total_error_count} Fehler, {total_skipped_count} Zeilen wg. Timestamp übersprungen.")
|
debug_print(f"Brancheneinschätzung (Parallel Batch) abgeschlossen. {total_processed_count} Zeilen verarbeitet (inkl. Fehler), {total_error_count} Fehler, {total_skipped_count} Zeilen wg. Timestamp übersprungen.")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user