diff --git a/brancheneinstufung.py b/brancheneinstufung.py index a70948ec..b4a16d55 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -4465,84 +4465,254 @@ class DataProcessor: f"{processed_count} Tasks erstellt." ) - # process_branch_batch Methode - # Kopieren Sie die Logik aus Ihrer globalen process_branch_batch Funktion hierher und passen Sie sie an self an. - # Sie braucht Zugriff auf evaluate_branche_chatgpt (global) und openai_semaphore_branch (global?). - # Das Semaphor sollte eher eine Instanzvariable sein oder an den Worker übergeben werden. - # Machen wir das Semaphor global und übergeben es. def process_branch_batch(self, limit=None): """ - Batch-Prozess NUR für Branchen-Einschätzung (W-Y, AO). + Batch-Prozess NUR für Branchen-Einschätzung (Spalten W-Y, AO). Findet Startzeile ab erster Zelle mit leerem AO. """ - logging.info(f"Starte Branchen-Einschätzung Batch. Limit: {limit if limit is not None else 'Unbegrenzt'}") - if not self.sheet_handler.load_data(): return logging.error("FEHLER beim Laden der Daten.") - all_data = self.sheet_handler.get_all_data_with_headers(); header_rows = 5 - if not all_data or len(all_data) <= header_rows: return logging.warning("Keine Daten gefunden.") + logging.info( + f"Starte Branchen-Einschätzung Batch. " + f"Limit: {limit if limit is not None else 'Unbegrenzt'}" + ) + if not self.sheet_handler.load_data(): + return logging.error("FEHLER beim Laden der Daten.") - timestamp_col_key = "Timestamp letzte Prüfung"; timestamp_col_index = COLUMN_MAP.get(timestamp_col_key); if timestamp_col_index is None: return logging.critical(f"FEHLER: Schlüssel '{timestamp_col_key}' fehlt.") - branche_crm_idx = COLUMN_MAP.get("CRM Branche"); beschreibung_idx = COLUMN_MAP.get("CRM Beschreibung"); branche_wiki_idx = COLUMN_MAP.get("Wiki Branche"); kategorien_wiki_idx = COLUMN_MAP.get("Wiki Kategorien"); summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung"); version_col_idx = COLUMN_MAP.get("Version"); - branch_w_idx = COLUMN_MAP.get("Chat Vorschlag Branche"); branch_x_idx = COLUMN_MAP.get("Chat Konsistenz Branche"); branch_y_idx = COLUMN_MAP.get("Chat Begründung Abweichung Branche"); - required_indices = [timestamp_col_index, branche_crm_idx, beschreibung_idx, branche_wiki_idx, kategorien_wiki_idx, summary_web_idx, version_col_idx, branch_w_idx, branch_x_idx, branch_y_idx]; - if None in required_indices: return logging.critical(f"FEHLER: Benötigte Indizes fehlen."); + all_data = self.sheet_handler.get_all_data_with_headers() + header_rows = 5 + if not all_data or len(all_data) <= header_rows: + return logging.warning("Keine Daten gefunden.") - ts_col_letter = self.sheet_handler._get_col_letter(timestamp_col_index + 1); version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1); - branch_w_letter = self.sheet_handler._get_col_letter(branch_w_idx + 1); branch_x_letter = self.sheet_handler._get_col_letter(branch_x_idx + 1); branch_y_letter = self.sheet_handler._get_col_letter(branch_y_idx + 1); + # Timestamp-Spalte prüfen + timestamp_col_key = "Timestamp letzte Prüfung" + timestamp_col_index = COLUMN_MAP.get(timestamp_col_key) + if timestamp_col_index is None: + return logging.critical( + f"FEHLER: Schlüssel '{timestamp_col_key}' fehlt." + ) - start_data_index = self.sheet_handler.get_start_row_index(check_column_key=timestamp_col_key, min_sheet_row=header_rows + 1); if start_data_index == -1: return logging.error(f"FEHLER bei Startzeilensuche."); if start_data_index >= len(self.sheet_handler.get_data()): logging.info("Alle Zeilen mit Timestamp gefüllt. Nichts zu tun."); return; + # Weitere benötigte Spalten indizieren + branche_crm_idx = COLUMN_MAP.get("CRM Branche") + beschreibung_idx = COLUMN_MAP.get("CRM Beschreibung") + branche_wiki_idx = COLUMN_MAP.get("Wiki Branche") + kategorien_wiki_idx = COLUMN_MAP.get("Wiki Kategorien") + summary_web_idx = COLUMN_MAP.get("Website Zusammenfassung") + version_col_idx = COLUMN_MAP.get("Version") + branch_w_idx = COLUMN_MAP.get("Chat Vorschlag Branche") + branch_x_idx = COLUMN_MAP.get("Chat Konsistenz Branche") + branch_y_idx = COLUMN_MAP.get("Chat Begründung Abweichung Branche") - start_sheet_row = start_data_index + header_rows + 1; total_sheet_rows = len(all_data); end_sheet_row = total_sheet_rows; - if limit is not None and limit >= 0: end_sheet_row = min(start_sheet_row + limit - 1, total_sheet_rows); if limit == 0: logging.info("Limit 0."); return; - if start_sheet_row > end_sheet_row: logging.warning("Start nach Ende (Limit)."); return; - logging.info(f"Verarbeite Sheet-Zeilen {start_sheet_row} bis {end_sheet_row} für Branchen-Einschätzung (Batch).") + required_indices = [ + branche_crm_idx, beschreibung_idx, branche_wiki_idx, + kategorien_wiki_idx, summary_web_idx, version_col_idx, + branch_w_idx, branch_x_idx, branch_y_idx + ] + if None in required_indices: + return logging.critical("FEHLER: Benötigte Indizes fehlen.") - MAX_BRANCH_WORKERS = Config.MAX_BRANCH_WORKERS; OPENAI_CONCURRENCY_LIMIT = Config.OPENAI_CONCURRENCY_LIMIT; - # Semaphor als globale Variable oder Instanz Variable der Klasse? - # Machen wir es global für Einfachheit in diesem Übergang. - # openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT) # Annahme: threading ist importiert und Semaphor global + # Spaltenbuchstaben ermitteln + ts_col_letter = self.sheet_handler._get_col_letter(timestamp_col_index + 1) + version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1) + branch_w_letter = self.sheet_handler._get_col_letter(branch_w_idx + 1) + branch_x_letter = self.sheet_handler._get_col_letter(branch_x_idx + 1) + branch_y_letter = self.sheet_handler._get_col_letter(branch_y_idx + 1) - tasks_for_processing_batch = []; processed_count = 0; + # Erste Zeile finden, in der AO leer ist + start_data_index = self.sheet_handler.get_start_row_index( + check_column_key=timestamp_col_key, + min_sheet_row=header_rows + 1 + ) + if start_data_index == -1: + return logging.error("FEHLER bei Startzeilensuche.") + if start_data_index >= len(self.sheet_handler.get_data()): + logging.info("Alle Zeilen mit Timestamp gefüllt. Nichts zu tun.") + return - if not ALLOWED_TARGET_BRANCHES: load_target_schema(); - if not ALLOWED_TARGET_BRANCHES: return logging.critical("FEHLER: Ziel-Schema nicht geladen. Branch Batch nicht möglich.") + # Bereichsgrenzen festlegen + start_sheet_row = start_data_index + header_rows + 1 + total_sheet_rows = len(all_data) + end_sheet_row = total_sheet_rows + if limit is not None and limit >= 0: + end_sheet_row = min(start_sheet_row + limit - 1, total_sheet_rows) + if limit == 0: + logging.info("Limit 0.") + return + if start_sheet_row > end_sheet_row: + logging.warning("Start nach Ende (Limit).") + return + + logging.info( + f"Verarbeite Sheet-Zeilen {start_sheet_row} bis {end_sheet_row} " + f"für Branchen-Einschätzung (Batch)." + ) + + # Vorbereitung für parallele Verarbeitung + MAX_BRANCH_WORKERS = Config.MAX_BRANCH_WORKERS + OPENAI_CONCURRENCY_LIMIT = Config.OPENAI_CONCURRENCY_LIMIT + tasks_for_processing_batch = [] + all_sheet_updates = [] + processed_count = 0 + + # Zielschema sicherstellen + if not ALLOWED_TARGET_BRANCHES: + load_target_schema() + if not ALLOWED_TARGET_BRANCHES: + return logging.critical( + "FEHLER: Ziel-Schema nicht geladen. Branch Batch nicht möglich." + ) + + # Tasks sammeln for i in range(start_sheet_row, end_sheet_row + 1): - row_index_in_list = i - 1; row = all_data[row_index_in_list]; - if len(row) <= timestamp_col_index or str(row[timestamp_col_index]).strip(): logging.debug(f"Zeile {i}: Timestamp {ts_col_letter} nicht leer, übersprungen."); continue; - task_data = { "row_num": i, "crm_branche": self._get_cell_value(row, "CRM Branche"), "beschreibung": self._get_cell_value(row, "CRM Beschreibung"), "wiki_branche": self._get_cell_value(row, "Wiki Branche"), "wiki_kategorien": self._get_cell_value(row, "Wiki Kategorien"), "website_summary": self._get_cell_value(row, "Website Zusammenfassung") }; - tasks_for_processing_batch.append(task_data); processed_count += 1; + row_index = i - 1 + row = all_data[row_index] - if len(tasks_for_processing_batch) >= Config.PROCESSING_BRANCH_BATCH_SIZE or i == end_sheet_row: - if tasks_for_processing_batch: - batch_start_row = tasks_for_processing_batch[0]['row_num']; batch_end_row = tasks_for_processing_batch[-1]['row_num']; batch_task_count = len(tasks_for_processing_batch); - logging.info(f"\n--- Starte Branch-Evaluation Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") - results_list = []; batch_error_count = 0; logging.info(f" Evaluiere {batch_task_count} Zeilen parallel (max {MAX_BRANCH_WORKERS} worker, {OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)...") - # Worker Funktion für Branch Evaluation (muss hier oder global sein) - # Machen wir sie global wie _process_batch, da sie Semaphor nutzt. - # Definiere _evaluate_branch_task_worker(task_data, semaphore) + cell_val = row[timestamp_col_index] if len(row) > timestamp_col_index else None + if cell_val and str(cell_val).strip(): + logging.debug( + f"Zeile {i}: Timestamp '{ts_col_letter}' nicht leer, übersprungen." + ) + continue - with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor: - # Submit Aufgaben an den Executor - # Annahme: openai_semaphore_branch ist global initialisiert - future_to_task = {executor.submit(_evaluate_branch_task_worker, task, openai_semaphore_branch): task for task in tasks_for_processing_batch} # Annahme: _evaluate_branch_task_worker ist global + task_data = { + "row_num": i, + "crm_branche": self._get_cell_value(row, "CRM Branche"), + "beschreibung": self._get_cell_value(row, "CRM Beschreibung"), + "wiki_branche": self._get_cell_value(row, "Wiki Branche"), + "wiki_kategorien":self._get_cell_value(row, "Wiki Kategorien"), + "website_summary":self._get_cell_value(row, "Website Zusammenfassung") + } + tasks_for_processing_batch.append(task_data) + processed_count += 1 - for future in concurrent.futures.as_completed(future_to_task): - task = future_to_task[future]; try: result_data = future.result(); results_list.append(result_data); if result_data['error']: batch_error_count += 1; - except Exception as exc: row_num = task['row_num']; err_msg = f"Generischer Fehler Branch Task Zeile {row_num}: {exc}"; logging.error(err_msg); results_list.append({"row_num": row_num, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg[:500]}, "error": err_msg}); batch_error_count += 1; - logging.info(f" Branch-Evaluation für Batch beendet. {len(results_list)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") - if results_list: - current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S"); current_version = Config.VERSION; batch_sheet_updates = []; results_list.sort(key=lambda x: x['row_num']); - for res_data in results_list: - row_num = res_data['row_num']; result = res_data['result']; logging.debug(f" Zeile {row_num}: Ergebnis -> Branch='{result.get('branch')}', Consistency='{result.get('consistency')}', Justification='{result.get('justification', '')[:50]}...'"); - batch_sheet_updates.extend([ - {'range': f'{branch_w_letter}{row_num}', 'values': [[result.get("branch", "Fehler")]]}, {'range': f'{branch_x_letter}{row_num}', 'values': [[result.get("consistency", "Fehler")]]}, {'range': f'{branch_y_letter}{row_num}', 'values': [[result.get("justification", "Fehler")]]}, {'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]}, {'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]} - ]); - if batch_sheet_updates: logging.info(f" Sende Sheet-Update für {len(results_list)} Zeilen ({len(batch_sheet_updates)} Zellen)..."); success = self.sheet_handler.batch_update_cells(batch_sheet_updates); if success: logging.info(f" Sheet-Update erfolgreich."); else: logging.error(f" FEHLER beim Sheet-Update."); all_sheet_updates = []; - else: logging.debug(f" Keine Sheet-Updates vorbereitet.") - tasks_for_processing_batch = []; logging.debug(f"--- Verarbeitungs-Batch {batch_start_row}-{batch_end_row} abgeschlossen ---"); logging.debug(" Warte nach Batch..."); time.sleep(Config.RETRY_DELAY); - if all_sheet_updates: logging.info(f"Sende finalen Sheet-Update ({len(all_sheet_updates)} Zellen)..."); self.sheet_handler.batch_update_cells(all_sheet_updates); - logging.info(f"Branchen-Einschätzung Batch abgeschlossen. {processed_count} Tasks erstellt.") + # Batch abarbeiten + if (len(tasks_for_processing_batch) + >= Config.PROCESSING_BRANCH_BATCH_SIZE + or i == end_sheet_row): + batch_start_row = tasks_for_processing_batch[0]["row_num"] + batch_end_row = tasks_for_processing_batch[-1]["row_num"] + batch_task_count = len(tasks_for_processing_batch) + + logging.info( + f"--- Starte Branch-Evaluation Batch " + f"({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---" + ) + logging.info( + f"Evaluiere parallel ({MAX_BRANCH_WORKERS} Worker, " + f"{OPENAI_CONCURRENCY_LIMIT} OpenAI gleichzeitig)..." + ) + + results_list = [] + batch_error_cnt = 0 + + # Worker ausführen + with concurrent.futures.ThreadPoolExecutor( + max_workers=MAX_BRANCH_WORKERS + ) as executor: + + future_to_task = { + executor.submit( + _evaluate_branch_task_worker, + task, + openai_semaphore_branch + ): task for task in tasks_for_processing_batch + } + + for future in concurrent.futures.as_completed(future_to_task): + task = future_to_task[future] + try: + result_data = future.result() + results_list.append(result_data) + if result_data.get("error"): + batch_error_cnt += 1 + except Exception as exc: + row_num = task["row_num"] + err_msg = ( + f"Generischer Fehler Branch Task Zeile {row_num}: {exc}" + ) + logging.error(err_msg) + results_list.append({ + "row_num": row_num, + "result": { + "branch": "FEHLER", + "consistency": "error_task", + "justification": err_msg[:500] + }, + "error": err_msg + }) + batch_error_cnt += 1 + + logging.info( + f"Branch-Evaluation für Batch beendet: " + f"{len(results_list)} Ergebnisse, " + f"{batch_error_cnt} Fehler." + ) + + # Ergebnisse sortieren und Sheet-Updates erzeugen + if results_list: + current_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_version = Config.VERSION + results_list.sort(key=lambda x: x["row_num"]) + + batch_sheet_updates = [] + for res in results_list: + rn = res["row_num"] + result = res["result"] + logging.debug( + f"Zeile {rn}: Branch='{result.get('branch')}', " + f"Consistency='{result.get('consistency')}', " + f"Justification='{result.get('justification','')[:50]}...'" + ) + batch_sheet_updates.extend([ + { + "range": f"{branch_w_letter}{rn}", + "values": [[ result.get("branch", "Fehler") ]] + }, + { + "range": f"{branch_x_letter}{rn}", + "values": [[ result.get("consistency", "Fehler") ]] + }, + { + "range": f"{branch_y_letter}{rn}", + "values": [[ result.get("justification", "Fehler") ]] + }, + { + "range": f"{ts_col_letter}{rn}", + "values": [[ current_ts ]] + }, + { + "range": f"{version_col_letter}{rn}", + "values": [[ current_version ]] + } + ]) + + # Sheet-Update senden + if batch_sheet_updates: + logging.info( + f"Sende Sheet-Update für {len(results_list)} Zeilen " + f"({len(batch_sheet_updates)} Zellen)..." + ) + success = self.sheet_handler.batch_update_cells(batch_sheet_updates) + if success: + logging.info("Sheet-Update erfolgreich.") + else: + logging.error("FEHLER beim Sheet-Update.") + + # Vorbereitung für nächsten Batch + tasks_for_processing_batch = [] + time.sleep(Config.RETRY_DELAY) + + # Finalen Push, falls noch Updates da sind + if all_sheet_updates: + logging.info( + f"Sende finalen Sheet-Update ({len(all_sheet_updates)} Zellen)..." + ) + self.sheet_handler.batch_update_cells(all_sheet_updates) + + logging.info( + f"Branchen-Einschätzung Batch abgeschlossen. " + f"{processed_count} Tasks erstellt." + ) # --- Dienstprogramm Methoden (Werden von run_user_interface aufgerufen) --- # Diese Methoden führen eine spezifische Aufgabe aus und arbeiten oft über das gesamte Sheet