diff --git a/brancheneinstufung.py b/brancheneinstufung.py index 48edf900..6af4c3e9 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -6302,17 +6302,17 @@ class DataProcessor: # ALLOWED_TARGET_BRANCHES (Block 6). # Nutzt die uebergeordnete sheet_handler Instanz (Block 14). def process_branch_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None): - self.logger.info(f"Starte Brancheneinschaetzung (Parallel Batch W-Y, AO, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") + self.logger.info(f"Starte Brancheneinschaetzung (Parallel Batch). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") - # --- Daten laden und Startzeile ermitteln --- + # --- Daten laden und Startzeile ermitteln --- (wie gehabt) if start_sheet_row is None: - self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren AO...") + self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren Timestamp letzte Pruefung (BC)...") start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key="Timestamp letzte Pruefung", min_sheet_row=7) if start_data_index_no_header == -1: self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") return start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1 - self.logger.info(f"Automatisch ermittelte Startzeile (erste leere AO Zelle): {start_sheet_row}") + self.logger.info(f"Automatisch ermittelte Startzeile (erste leere BC Zelle): {start_sheet_row}") else: if not self.sheet_handler.load_data(): self.logger.error("FEHLER beim Laden der Daten fuer process_branch_batch.") @@ -6329,40 +6329,46 @@ class DataProcessor: self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") return - # --- Indizes und Buchstaben --- + # --- Indizes und Buchstaben --- (wie gehabt) required_keys = [ - "Timestamp letzte Pruefung", - "CRM Branche", "CRM Beschreibung", "Wiki Branche", "Wiki Kategorien", - "Website Zusammenfassung", "Version", - "Chat Vorschlag Branche", "Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche", - "CRM Name" # Hinzugefügt für besseres Logging + "Timestamp letzte Pruefung", "CRM Branche", "CRM Beschreibung", "Wiki Branche", + "Wiki Kategorien", "Website Zusammenfassung", "Version", "Chat Vorschlag Branche", + "Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche", + "CRM Name", "Finaler Umsatz (Wiki>CRM)", "Finaler Mitarbeiter (Wiki>CRM)" # Für Konsolidierung in _process_single_row (obwohl hier nicht direkt genutzt, aber für Vollständigkeit) ] + # ... (col_indices und Fehlerprüfung bleiben) col_indices = {key: COLUMN_MAP.get(key) for key in required_keys} if None in col_indices.values(): missing = [k for k, v in col_indices.items() if v is None] self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_branch_batch: {missing}. Breche ab.") return - # --- Konfiguration fuer Parallelisierung --- + + # --- Konfiguration fuer Parallelisierung --- (wie gehabt) MAX_BRANCH_WORKERS = getattr(Config, 'MAX_BRANCH_WORKERS', 10) OPENAI_CONCURRENCY_LIMIT = getattr(Config, 'OPENAI_CONCURRENCY_LIMIT', 3) processing_batch_size = getattr(Config, 'PROCESSING_BRANCH_BATCH_SIZE', 20) # --- Verarbeitungsvariablen --- - tasks_for_current_processing_batch = [] + tasks_to_submit_to_executor = [] # Sammelt Tasks, bis Limit oder Batch-Größe erreicht - processed_count = 0 + total_tasks_collected_for_processing = 0 # Zählt alle Tasks, die die Kriterien erfüllen + processed_tasks_in_run = 0 # Zählt Tasks, die tatsächlich an den Executor übergeben wurden skipped_count = 0 - global ALLOWED_TARGET_BRANCHES # Sicherstellen, dass wir auf die globale Variable zugreifen + global ALLOWED_TARGET_BRANCHES if not ALLOWED_TARGET_BRANCHES: load_target_schema() if not ALLOWED_TARGET_BRANCHES: - self.logger.critical("FEHLER: Ziel-Branchenschema konnte nicht geladen werden. Branchenbewertung nicht moeglich. Breche Batch ab.") + self.logger.critical("FEHLER: Ziel-Branchenschema konnte nicht geladen werden. Breche Batch ab.") return # Hauptschleife über die Zeilen for i in range(start_sheet_row, end_sheet_row + 1): + if limit is not None and total_tasks_collected_for_processing >= limit: + self.logger.info(f"Sammel-Limit ({limit}) für Branch-Tasks erreicht. Stoppe weitere Zeilenprüfung.") + break + row_index_in_list = i - 1 if row_index_in_list >= total_sheet_rows: break @@ -6371,169 +6377,130 @@ class DataProcessor: skipped_count += 1 continue - company_name_log = self._get_cell_value_safe(row, "CRM Name").strip() # Für Logging + company_name_log = self._get_cell_value_safe(row, "CRM Name").strip() - # --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist --- ao_value = self._get_cell_value_safe(row, "Timestamp letzte Pruefung").strip() processing_needed_based_on_status = not ao_value if not processing_needed_based_on_status: - # Detail-Log, warum übersprungen, wenn Timestamp gesetzt ist - # if i < start_sheet_row + 10 or i % 500 == 0 : # Logge nur für die ersten paar und dann seltener - # self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...): Uebersprungen (Timestamp letzte Pruefung '{ao_value}' bereits gesetzt).") skipped_count += 1 continue + # --- DEBUG BLOCK für info_sources_count (wie im vorherigen Vorschlag) --- crm_branche_val = self._get_cell_value_safe(row, "CRM Branche").strip() + # ... (Rest der _val Variablen und der detaillierte Debug-Block für info_sources_count) crm_beschreibung_val = self._get_cell_value_safe(row, "CRM Beschreibung").strip() wiki_branche_val = self._get_cell_value_safe(row, "Wiki Branche").strip() wiki_kategorien_val = self._get_cell_value_safe(row, "Wiki Kategorien").strip() website_summary_val = self._get_cell_value_safe(row, "Website Zusammenfassung").strip() - # ======================= BEGINN DES ZU ERSETZENDEN/EINZUFÜGENDEN DEBUG-BLOCKS ======================= self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Rohwerte für Info-Quellen:") - self.logger.debug(f" CRM Branche (H): '{crm_branche_val}' (Typ: {type(crm_branche_val)})") - self.logger.debug(f" CRM Beschreibung (G): '{str(crm_beschreibung_val)[:50]}...' (Typ: {type(crm_beschreibung_val)})") - self.logger.debug(f" Wiki Branche (R): '{wiki_branche_val}' (Typ: {type(wiki_branche_val)})") - self.logger.debug(f" Wiki Kategorien (U): '{str(wiki_kategorien_val)[:50]}...' (Typ: {type(wiki_kategorien_val)})") - self.logger.debug(f" Website Zusammenfassung (AD): '{str(website_summary_val)[:50]}...' (Typ: {type(website_summary_val)})") - + # ... (kompletter detaillierter Debug-Block für info_sources_count hier einfügen) ... sources_to_check = { - "CRM Branche": crm_branche_val, - "CRM Beschreibung": crm_beschreibung_val, - "Wiki Branche": wiki_branche_val, - "Wiki Kategorien": wiki_kategorien_val, + "CRM Branche": crm_branche_val, "CRM Beschreibung": crm_beschreibung_val, + "Wiki Branche": wiki_branche_val, "Wiki Kategorien": wiki_kategorien_val, "Website Zusammenfassung": website_summary_val } - - info_sources_count = 0 - counted_sources = [] - + info_sources_count = 0; counted_sources = [] for source_name, val in sources_to_check.items(): - is_valid_source = False - # Detailprüfung der Bedingungen - cond1 = bool(val) - cond2 = isinstance(val, str) - cond3 = False - cond4 = False - cond5 = False + cond1 = bool(val); cond2 = isinstance(val, str); cond3 = False; cond4 = False; cond5 = False if cond1 and cond2: - stripped_val = val.strip() - cond3 = bool(stripped_val) - if cond3: - lower_stripped_val = stripped_val.lower() - cond4 = lower_stripped_val != "k.a." - # Wichtig: "FEHLER" am Anfang des Strings, nicht nur enthalten - cond5 = not stripped_val.upper().startswith("FEHLER") - + stripped_val = val.strip(); cond3 = bool(stripped_val) + if cond3: lower_stripped_val = stripped_val.lower(); cond4 = lower_stripped_val != "k.a."; cond5 = not stripped_val.upper().startswith("FEHLER") is_valid_source = cond1 and cond2 and cond3 and cond4 and cond5 - - if is_valid_source: - info_sources_count += 1 - counted_sources.append(source_name) - - self.logger.debug(f" Prüfe Quelle '{source_name}': Wert='{str(val)[:30]}...', " - f"c1(val)? {cond1}, c2(is_str)? {cond2}, c3(stripped)? {cond3}, " - f"c4(!=k.a.)? {cond4}, c5(!FEHLER)? {cond5} -> Gültig? {is_valid_source}") - + if is_valid_source: info_sources_count += 1; counted_sources.append(source_name) + self.logger.debug(f" Prüfe Quelle '{source_name}': Wert='{str(val)[:30]}...', c1?{cond1}, c2?{cond2}, c3?{cond3}, c4?{cond4}, c5?{cond5} -> Gültig? {is_valid_source}") self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Gezählte valide Quellen: {info_sources_count} - {counted_sources}") - + if info_sources_count < 2: self.logger.info(f"Zeile {i} ({company_name_log[:30]}...) (Branch Check): Uebersprungen (Timestamp BC leer, aber nur {info_sources_count} Informationsquellen verfuegbar: {counted_sources}). Mindestens 2 benoetigt.") skipped_count += 1 continue - # ======================= ENDE DES ZU ERSETZENDEN/EINZUFÜGENDEN DEBUG-BLOCKS ======================= + # --- ENDE DEBUG BLOCK --- - tasks_for_current_processing_batch.append({ - "row_num": i, - "crm_branche": crm_branche_val, - "beschreibung": crm_beschreibung_val, - "wiki_branche": wiki_branche_val, - "wiki_kategorien": wiki_kategorien_val, + # Task zur Liste hinzufügen, wenn alle Kriterien erfüllt sind + tasks_to_submit_to_executor.append({ + "row_num": i, "crm_branche": crm_branche_val, "beschreibung": crm_beschreibung_val, + "wiki_branche": wiki_branche_val, "wiki_kategorien": wiki_kategorien_val, "website_summary": website_summary_val }) + total_tasks_collected_for_processing += 1 + # Ende der Hauptschleife for i in range... + + # --- Jetzt die gesammelten Tasks in Batches verarbeiten --- + num_tasks_to_process = len(tasks_to_submit_to_executor) + self.logger.info(f"Nach Prüfung aller Zeilen im Bereich: {num_tasks_to_process} Tasks für Branch-Evaluation gesammelt.") + + for batch_start_index in range(0, num_tasks_to_process, processing_batch_size): + if limit is not None and processed_tasks_in_run >= limit: + self.logger.info(f"Verarbeitungslimit ({limit}) vor Start des nächsten Batches erreicht.") + break # Verlasse die Batch-Verarbeitungsschleife + + # Aktuellen Batch extrahieren + current_batch_tasks = tasks_to_submit_to_executor[batch_start_index : batch_start_index + processing_batch_size] - execute_batch_now = False - if len(tasks_for_current_processing_batch) >= processing_batch_size: - execute_batch_now = True - elif i == end_sheet_row and tasks_for_current_processing_batch: - execute_batch_now = True - elif limit is not None and (processed_count + len(tasks_for_current_processing_batch)) >= limit and tasks_for_current_processing_batch: - if (processed_count + len(tasks_for_current_processing_batch)) > limit: - num_to_trim = (processed_count + len(tasks_for_current_processing_batch)) - limit - tasks_for_current_processing_batch = tasks_for_current_processing_batch[:-num_to_trim] - if tasks_for_current_processing_batch: - execute_batch_now = True + # Überprüfen, ob der aktuelle Batch das Limit überschreiten würde + if limit is not None and (processed_tasks_in_run + len(current_batch_tasks)) > limit: + can_take_in_batch = limit - processed_tasks_in_run + if can_take_in_batch <= 0: # Sollte nicht passieren, wenn Limit-Check oben greift + break + current_batch_tasks = current_batch_tasks[:can_take_in_batch] - if execute_batch_now: - batch_start_row_log = tasks_for_current_processing_batch[0]['row_num'] - batch_end_row_log = tasks_for_current_processing_batch[-1]['row_num'] - self.logger.debug(f"\n--- Starte Branch-Evaluation Batch ({len(tasks_for_current_processing_batch)} Tasks, Zeilen {batch_start_row_log}-{batch_end_row_log}) ---") - - processed_count += len(tasks_for_current_processing_batch) + if not current_batch_tasks: # Sollte nicht passieren, wenn Limit-Check oben greift + continue - batch_results_data = [] - batch_error_count_this_batch = 0 - - openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT) - with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor: - future_to_task = {executor.submit(self.evaluate_branch_task, task, openai_semaphore_branch): task for task in tasks_for_current_processing_batch} - for future in concurrent.futures.as_completed(future_to_task): - task = future_to_task[future] - try: - result_data = future.result() - batch_results_data.append(result_data) - if result_data.get('error'): - batch_error_count_this_batch += 1 - except Exception as exc: - row_num_exc = task['row_num'] - err_msg_exc = f"Unerwarteter Fehler bei Ergebnisabfrage Branch Task Zeile {row_num_exc}: {type(exc).__name__} - {exc}" - self.logger.error(err_msg_exc) - batch_results_data.append({"row_num": row_num_exc, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg_exc[:500]}, "error": err_msg_exc}) - batch_error_count_this_batch += 1 - - self.logger.debug(f" Branch-Evaluation fuer Batch beendet. {len(batch_results_data)} Ergebnisse erhalten ({batch_error_count_this_batch} Fehler in diesem Batch).") + batch_start_row_log = current_batch_tasks[0]['row_num'] + batch_end_row_log = current_batch_tasks[-1]['row_num'] + self.logger.debug(f"\n--- Starte Branch-Evaluation Batch ({len(current_batch_tasks)} Tasks, Zeilen {batch_start_row_log}-{batch_end_row_log}) ---") + + processed_tasks_in_run += len(current_batch_tasks) - if batch_results_data: - current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - current_version = getattr(Config, 'VERSION', 'unknown') - sheet_updates_for_this_batch = [] - batch_results_data.sort(key=lambda x: x['row_num']) + batch_results_data = [] + batch_error_count_this_batch = 0 + openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT) + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor: + future_to_task = {executor.submit(self.evaluate_branch_task, task, openai_semaphore_branch): task for task in current_batch_tasks} + for future in concurrent.futures.as_completed(future_to_task): + task = future_to_task[future] + try: + result_data = future.result() + batch_results_data.append(result_data) + if result_data.get('error'): batch_error_count_this_batch += 1 + except Exception as exc: + row_num_exc = task['row_num'] + err_msg_exc = f"Unerwarteter Fehler bei Ergebnisabfrage Branch Task Zeile {row_num_exc}: {type(exc).__name__} - {exc}" + self.logger.error(err_msg_exc) + batch_results_data.append({"row_num": row_num_exc, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg_exc[:500]}, "error": err_msg_exc}) + batch_error_count_this_batch += 1 + + self.logger.debug(f" Branch-Evaluation fuer Batch beendet. {len(batch_results_data)} Ergebnisse erhalten ({batch_error_count_this_batch} Fehler in diesem Batch).") - for res_data_item in batch_results_data: - row_num_item = res_data_item['row_num'] - result_item = res_data_item['result'] - self.logger.debug(f" Zeile {row_num_item} (aus Batch): Ergebnis aus evaluate_branch_task: {result_item}") + if batch_results_data: + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_version = getattr(Config, 'VERSION', 'unknown') + sheet_updates_for_this_batch = [] + batch_results_data.sort(key=lambda x: x['row_num']) + for res_data_item in batch_results_data: + row_num_item = res_data_item['row_num']; result_item = res_data_item['result'] + self.logger.debug(f" Zeile {row_num_item} (aus Batch): Ergebnis aus evaluate_branch_task: {result_item}") + # ... (alle appends für sheet_updates_for_this_batch wie im vorherigen korrekten Vorschlag) + sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num_item}', 'values': [[result_item.get("branch", "FEHLER BRANCH")]]}) + sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num_item}', 'values': [[result_item.get("confidence", "N/A CONF")]]}) + sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num_item}', 'values': [[result_item.get("consistency", "error CONS")]]}) + sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num_item}', 'values': [[result_item.get("justification", "Keine Begr. JUST")]]}) + sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num_item}', 'values': [[current_timestamp]]}) + sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num_item}', 'values': [[current_version]]}) - sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num_item}', - 'values': [[result_item.get("branch", "FEHLER BRANCH")]]}) - sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num_item}', - 'values': [[result_item.get("confidence", "N/A CONF")]]}) - sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num_item}', - 'values': [[result_item.get("consistency", "error CONS")]]}) - sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num_item}', - 'values': [[result_item.get("justification", "Keine Begr. JUST")]]}) - sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num_item}', - 'values': [[current_timestamp]]}) - sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num_item}', - 'values': [[current_version]]}) - - if sheet_updates_for_this_batch: - self.logger.debug(f" Sende Sheet-Update fuer {len(batch_results_data)} Zeilen ({len(sheet_updates_for_this_batch)} Zellen) dieses Batches...") - success = self.sheet_handler.batch_update_cells(sheet_updates_for_this_batch) - if success: - self.logger.info(f" Sheet-Update fuer Batch Zeilen {batch_start_row_log}-{batch_end_row_log} erfolgreich.") - - tasks_for_current_processing_batch = [] - pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.8 - self.logger.debug(f"--- Batch Zeilen {batch_start_row_log}-{batch_end_row_log} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---") - time.sleep(pause_duration) - - if limit is not None and processed_count >= limit: - self.logger.info(f"Gesamtes Verarbeitungslimit ({limit}) fuer process_branch_batch erreicht. Beende.") - break + if sheet_updates_for_this_batch: + self.logger.debug(f" Sende Sheet-Update fuer {len(batch_results_data)} Zeilen ({len(sheet_updates_for_this_batch)} Zellen) dieses Batches...") + success = self.sheet_handler.batch_update_cells(sheet_updates_for_this_batch) + if success: self.logger.info(f" Sheet-Update fuer Batch Zeilen {batch_start_row_log}-{batch_end_row_log} erfolgreich.") + + pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.8 + self.logger.debug(f"--- Batch Zeilen {batch_start_row_log}-{batch_end_row_log} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---") + time.sleep(pause_duration) - self.logger.info(f"Brancheneinschaetzung (Parallel Batch) abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.") + self.logger.info(f"Brancheneinschaetzung (Parallel Batch) abgeschlossen. {processed_tasks_in_run} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.") # ==============================================================================