This commit is contained in:
2025-04-24 18:04:30 +00:00
parent 706523a77d
commit 2f293d2302

View File

@@ -4465,84 +4465,254 @@ class DataProcessor:
f"{processed_count} Tasks erstellt."
)
# process_branch_batch Methode
# Kopieren Sie die Logik aus Ihrer globalen process_branch_batch Funktion hierher und passen Sie sie an self an.
# Sie braucht Zugriff auf evaluate_branche_chatgpt (global) und openai_semaphore_branch (global?).
# Das Semaphor sollte eher eine Instanzvariable sein oder an den Worker übergeben werden.
# Machen wir das Semaphor global und übergeben es.
def process_branch_batch(self, limit=None):
"""
Batch-Prozess NUR für Branchen-Einschätzung (W-Y, AO).
Batch-Prozess NUR für Branchen-Einschätzung (Spalten W-Y, AO).
Findet Startzeile ab erster Zelle mit leerem AO.
"""
logging.info(f"Starte Branchen-Einschätzung Batch. Limit: {limit if limit is not None else 'Unbegrenzt'}")
if not self.sheet_handler.load_data(): return logging.error("FEHLER beim Laden der Daten.")
all_data = self.sheet_handler.get_all_data_with_headers(); header_rows = 5
if not all_data or len(all_data) <= header_rows: return logging.warning("Keine Daten gefunden.")
logging.info(
f"Starte Branchen-Einschätzung Batch. "
f"Limit: {limit if limit is not None else 'Unbegrenzt'}"
)
if not self.sheet_handler.load_data():
return logging.error("FEHLER beim Laden der Daten.")
timestamp_col_key = "Timestamp letzte Prüfung"; timestamp_col_index = COLUMN_MAP.get(timestamp_col_key); if timestamp_col_index is None: return logging.critical(f"FEHLER: Schlüssel '{timestamp_col_key}' fehlt.")
branche_crm_idx = COLUMN_MAP.get("CRM Branche"); beschreibung_idx = COLUMN_MAP.get("CRM Beschreibung"); branche_wiki_idx = COLUMN_MAP.get("Wiki Branche"); kategorien_wiki_idx = COLUMN_MAP.get("Wiki Kategorien"); summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung"); version_col_idx = COLUMN_MAP.get("Version");
branch_w_idx = COLUMN_MAP.get("Chat Vorschlag Branche"); branch_x_idx = COLUMN_MAP.get("Chat Konsistenz Branche"); branch_y_idx = COLUMN_MAP.get("Chat Begründung Abweichung Branche");
required_indices = [timestamp_col_index, branche_crm_idx, beschreibung_idx, branche_wiki_idx, kategorien_wiki_idx, summary_web_idx, version_col_idx, branch_w_idx, branch_x_idx, branch_y_idx];
if None in required_indices: return logging.critical(f"FEHLER: Benötigte Indizes fehlen.");
all_data = self.sheet_handler.get_all_data_with_headers()
header_rows = 5
if not all_data or len(all_data) <= header_rows:
return logging.warning("Keine Daten gefunden.")
ts_col_letter = self.sheet_handler._get_col_letter(timestamp_col_index + 1); version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1);
branch_w_letter = self.sheet_handler._get_col_letter(branch_w_idx + 1); branch_x_letter = self.sheet_handler._get_col_letter(branch_x_idx + 1); branch_y_letter = self.sheet_handler._get_col_letter(branch_y_idx + 1);
# Timestamp-Spalte prüfen
timestamp_col_key = "Timestamp letzte Prüfung"
timestamp_col_index = COLUMN_MAP.get(timestamp_col_key)
if timestamp_col_index is None:
return logging.critical(
f"FEHLER: Schlüssel '{timestamp_col_key}' fehlt."
)
start_data_index = self.sheet_handler.get_start_row_index(check_column_key=timestamp_col_key, min_sheet_row=header_rows + 1); if start_data_index == -1: return logging.error(f"FEHLER bei Startzeilensuche."); if start_data_index >= len(self.sheet_handler.get_data()): logging.info("Alle Zeilen mit Timestamp gefüllt. Nichts zu tun."); return;
# Weitere benötigte Spalten indizieren
branche_crm_idx = COLUMN_MAP.get("CRM Branche")
beschreibung_idx = COLUMN_MAP.get("CRM Beschreibung")
branche_wiki_idx = COLUMN_MAP.get("Wiki Branche")
kategorien_wiki_idx = COLUMN_MAP.get("Wiki Kategorien")
summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung")
version_col_idx = COLUMN_MAP.get("Version")
branch_w_idx = COLUMN_MAP.get("Chat Vorschlag Branche")
branch_x_idx = COLUMN_MAP.get("Chat Konsistenz Branche")
branch_y_idx = COLUMN_MAP.get("Chat Begründung Abweichung Branche")
start_sheet_row = start_data_index + header_rows + 1; total_sheet_rows = len(all_data); end_sheet_row = total_sheet_rows;
if limit is not None and limit >= 0: end_sheet_row = min(start_sheet_row + limit - 1, total_sheet_rows); if limit == 0: logging.info("Limit 0."); return;
if start_sheet_row > end_sheet_row: logging.warning("Start nach Ende (Limit)."); return;
logging.info(f"Verarbeite Sheet-Zeilen {start_sheet_row} bis {end_sheet_row} für Branchen-Einschätzung (Batch).")
required_indices = [
branche_crm_idx, beschreibung_idx, branche_wiki_idx,
kategorien_wiki_idx, summary_web_idx, version_col_idx,
branch_w_idx, branch_x_idx, branch_y_idx
]
if None in required_indices:
return logging.critical("FEHLER: Benötigte Indizes fehlen.")
MAX_BRANCH_WORKERS = Config.MAX_BRANCH_WORKERS; OPENAI_CONCURRENCY_LIMIT = Config.OPENAI_CONCURRENCY_LIMIT;
# Semaphor als globale Variable oder Instanz Variable der Klasse?
# Machen wir es global für Einfachheit in diesem Übergang.
# openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT) # Annahme: threading ist importiert und Semaphor global
# Spaltenbuchstaben ermitteln
ts_col_letter = self.sheet_handler._get_col_letter(timestamp_col_index + 1)
version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1)
branch_w_letter = self.sheet_handler._get_col_letter(branch_w_idx + 1)
branch_x_letter = self.sheet_handler._get_col_letter(branch_x_idx + 1)
branch_y_letter = self.sheet_handler._get_col_letter(branch_y_idx + 1)
tasks_for_processing_batch = []; processed_count = 0;
# Erste Zeile finden, in der AO leer ist
start_data_index = self.sheet_handler.get_start_row_index(
check_column_key=timestamp_col_key,
min_sheet_row=header_rows + 1
)
if start_data_index == -1:
return logging.error("FEHLER bei Startzeilensuche.")
if start_data_index >= len(self.sheet_handler.get_data()):
logging.info("Alle Zeilen mit Timestamp gefüllt. Nichts zu tun.")
return
if not ALLOWED_TARGET_BRANCHES: load_target_schema();
if not ALLOWED_TARGET_BRANCHES: return logging.critical("FEHLER: Ziel-Schema nicht geladen. Branch Batch nicht möglich.")
# Bereichsgrenzen festlegen
start_sheet_row = start_data_index + header_rows + 1
total_sheet_rows = len(all_data)
end_sheet_row = total_sheet_rows
if limit is not None and limit >= 0:
end_sheet_row = min(start_sheet_row + limit - 1, total_sheet_rows)
if limit == 0:
logging.info("Limit 0.")
return
if start_sheet_row > end_sheet_row:
logging.warning("Start nach Ende (Limit).")
return
logging.info(
f"Verarbeite Sheet-Zeilen {start_sheet_row} bis {end_sheet_row} "
f"für Branchen-Einschätzung (Batch)."
)
# Vorbereitung für parallele Verarbeitung
MAX_BRANCH_WORKERS = Config.MAX_BRANCH_WORKERS
OPENAI_CONCURRENCY_LIMIT = Config.OPENAI_CONCURRENCY_LIMIT
tasks_for_processing_batch = []
all_sheet_updates = []
processed_count = 0
# Zielschema sicherstellen
if not ALLOWED_TARGET_BRANCHES:
load_target_schema()
if not ALLOWED_TARGET_BRANCHES:
return logging.critical(
"FEHLER: Ziel-Schema nicht geladen. Branch Batch nicht möglich."
)
# Tasks sammeln
for i in range(start_sheet_row, end_sheet_row + 1):
row_index_in_list = i - 1; row = all_data[row_index_in_list];
if len(row) <= timestamp_col_index or str(row[timestamp_col_index]).strip(): logging.debug(f"Zeile {i}: Timestamp {ts_col_letter} nicht leer, übersprungen."); continue;
task_data = { "row_num": i, "crm_branche": self._get_cell_value(row, "CRM Branche"), "beschreibung": self._get_cell_value(row, "CRM Beschreibung"), "wiki_branche": self._get_cell_value(row, "Wiki Branche"), "wiki_kategorien": self._get_cell_value(row, "Wiki Kategorien"), "website_summary": self._get_cell_value(row, "Website Zusammenfassung") };
tasks_for_processing_batch.append(task_data); processed_count += 1;
row_index = i - 1
row = all_data[row_index]
if len(tasks_for_processing_batch) >= Config.PROCESSING_BRANCH_BATCH_SIZE or i == end_sheet_row:
if tasks_for_processing_batch:
batch_start_row = tasks_for_processing_batch[0]['row_num']; batch_end_row = tasks_for_processing_batch[-1]['row_num']; batch_task_count = len(tasks_for_processing_batch);
logging.info(f"\n--- Starte Branch-Evaluation Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---")
results_list = []; batch_error_count = 0; logging.info(f" Evaluiere {batch_task_count} Zeilen parallel (max {MAX_BRANCH_WORKERS} worker, {OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)...")
# Worker Funktion für Branch Evaluation (muss hier oder global sein)
# Machen wir sie global wie _process_batch, da sie Semaphor nutzt.
# Definiere _evaluate_branch_task_worker(task_data, semaphore)
cell_val = row[timestamp_col_index] if len(row) > timestamp_col_index else None
if cell_val and str(cell_val).strip():
logging.debug(
f"Zeile {i}: Timestamp '{ts_col_letter}' nicht leer, übersprungen."
)
continue
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor:
# Submit Aufgaben an den Executor
# Annahme: openai_semaphore_branch ist global initialisiert
future_to_task = {executor.submit(_evaluate_branch_task_worker, task, openai_semaphore_branch): task for task in tasks_for_processing_batch} # Annahme: _evaluate_branch_task_worker ist global
task_data = {
"row_num": i,
"crm_branche": self._get_cell_value(row, "CRM Branche"),
"beschreibung": self._get_cell_value(row, "CRM Beschreibung"),
"wiki_branche": self._get_cell_value(row, "Wiki Branche"),
"wiki_kategorien":self._get_cell_value(row, "Wiki Kategorien"),
"website_summary":self._get_cell_value(row, "Website Zusammenfassung")
}
tasks_for_processing_batch.append(task_data)
processed_count += 1
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]; try: result_data = future.result(); results_list.append(result_data); if result_data['error']: batch_error_count += 1;
except Exception as exc: row_num = task['row_num']; err_msg = f"Generischer Fehler Branch Task Zeile {row_num}: {exc}"; logging.error(err_msg); results_list.append({"row_num": row_num, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg[:500]}, "error": err_msg}); batch_error_count += 1;
logging.info(f" Branch-Evaluation für Batch beendet. {len(results_list)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).")
if results_list:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S"); current_version = Config.VERSION; batch_sheet_updates = []; results_list.sort(key=lambda x: x['row_num']);
for res_data in results_list:
row_num = res_data['row_num']; result = res_data['result']; logging.debug(f" Zeile {row_num}: Ergebnis -> Branch='{result.get('branch')}', Consistency='{result.get('consistency')}', Justification='{result.get('justification', '')[:50]}...'");
batch_sheet_updates.extend([
{'range': f'{branch_w_letter}{row_num}', 'values': [[result.get("branch", "Fehler")]]}, {'range': f'{branch_x_letter}{row_num}', 'values': [[result.get("consistency", "Fehler")]]}, {'range': f'{branch_y_letter}{row_num}', 'values': [[result.get("justification", "Fehler")]]}, {'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]}, {'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}
]);
if batch_sheet_updates: logging.info(f" Sende Sheet-Update für {len(results_list)} Zeilen ({len(batch_sheet_updates)} Zellen)..."); success = self.sheet_handler.batch_update_cells(batch_sheet_updates); if success: logging.info(f" Sheet-Update erfolgreich."); else: logging.error(f" FEHLER beim Sheet-Update."); all_sheet_updates = [];
else: logging.debug(f" Keine Sheet-Updates vorbereitet.")
tasks_for_processing_batch = []; logging.debug(f"--- Verarbeitungs-Batch {batch_start_row}-{batch_end_row} abgeschlossen ---"); logging.debug(" Warte nach Batch..."); time.sleep(Config.RETRY_DELAY);
if all_sheet_updates: logging.info(f"Sende finalen Sheet-Update ({len(all_sheet_updates)} Zellen)..."); self.sheet_handler.batch_update_cells(all_sheet_updates);
logging.info(f"Branchen-Einschätzung Batch abgeschlossen. {processed_count} Tasks erstellt.")
# Batch abarbeiten
if (len(tasks_for_processing_batch)
>= Config.PROCESSING_BRANCH_BATCH_SIZE
or i == end_sheet_row):
batch_start_row = tasks_for_processing_batch[0]["row_num"]
batch_end_row = tasks_for_processing_batch[-1]["row_num"]
batch_task_count = len(tasks_for_processing_batch)
logging.info(
f"--- Starte Branch-Evaluation Batch "
f"({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---"
)
logging.info(
f"Evaluiere parallel ({MAX_BRANCH_WORKERS} Worker, "
f"{OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)..."
)
results_list = []
batch_error_cnt = 0
# Worker ausführen
with concurrent.futures.ThreadPoolExecutor(
max_workers=MAX_BRANCH_WORKERS
) as executor:
future_to_task = {
executor.submit(
_evaluate_branch_task_worker,
task,
openai_semaphore_branch
): task for task in tasks_for_processing_batch
}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result_data = future.result()
results_list.append(result_data)
if result_data.get("error"):
batch_error_cnt += 1
except Exception as exc:
row_num = task["row_num"]
err_msg = (
f"Generischer Fehler Branch Task Zeile {row_num}: {exc}"
)
logging.error(err_msg)
results_list.append({
"row_num": row_num,
"result": {
"branch": "FEHLER",
"consistency": "error_task",
"justification": err_msg[:500]
},
"error": err_msg
})
batch_error_cnt += 1
logging.info(
f"Branch-Evaluation für Batch beendet: "
f"{len(results_list)} Ergebnisse, "
f"{batch_error_cnt} Fehler."
)
# Ergebnisse sortieren und Sheet-Updates erzeugen
if results_list:
current_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = Config.VERSION
results_list.sort(key=lambda x: x["row_num"])
batch_sheet_updates = []
for res in results_list:
rn = res["row_num"]
result = res["result"]
logging.debug(
f"Zeile {rn}: Branch='{result.get('branch')}', "
f"Consistency='{result.get('consistency')}', "
f"Justification='{result.get('justification','')[:50]}...'"
)
batch_sheet_updates.extend([
{
"range": f"{branch_w_letter}{rn}",
"values": [[ result.get("branch", "Fehler") ]]
},
{
"range": f"{branch_x_letter}{rn}",
"values": [[ result.get("consistency", "Fehler") ]]
},
{
"range": f"{branch_y_letter}{rn}",
"values": [[ result.get("justification", "Fehler") ]]
},
{
"range": f"{ts_col_letter}{rn}",
"values": [[ current_ts ]]
},
{
"range": f"{version_col_letter}{rn}",
"values": [[ current_version ]]
}
])
# Sheet-Update senden
if batch_sheet_updates:
logging.info(
f"Sende Sheet-Update für {len(results_list)} Zeilen "
f"({len(batch_sheet_updates)} Zellen)..."
)
success = self.sheet_handler.batch_update_cells(batch_sheet_updates)
if success:
logging.info("Sheet-Update erfolgreich.")
else:
logging.error("FEHLER beim Sheet-Update.")
# Vorbereitung für nächsten Batch
tasks_for_processing_batch = []
time.sleep(Config.RETRY_DELAY)
# Finalen Push, falls noch Updates da sind
if all_sheet_updates:
logging.info(
f"Sende finalen Sheet-Update ({len(all_sheet_updates)} Zellen)..."
)
self.sheet_handler.batch_update_cells(all_sheet_updates)
logging.info(
f"Branchen-Einschätzung Batch abgeschlossen. "
f"{processed_count} Tasks erstellt."
)
# --- Dienstprogramm Methoden (Werden von run_user_interface aufgerufen) ---
# Diese Methoden führen eine spezifische Aufgabe aus und arbeiten oft über das gesamte Sheet