diff --git a/brancheneinstufung.py b/brancheneinstufung.py index 81594919..943d3701 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -6334,7 +6334,8 @@ class DataProcessor: "Timestamp letzte Pruefung", "CRM Branche", "CRM Beschreibung", "Wiki Branche", "Wiki Kategorien", "Website Zusammenfassung", "Version", - "Chat Vorschlag Branche", "Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche" + "Chat Vorschlag Branche", "Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche", + "CRM Name" # Hinzugefügt für besseres Logging ] col_indices = {key: COLUMN_MAP.get(key) for key in required_keys} if None in col_indices.values(): @@ -6346,16 +6347,14 @@ class DataProcessor: MAX_BRANCH_WORKERS = getattr(Config, 'MAX_BRANCH_WORKERS', 10) OPENAI_CONCURRENCY_LIMIT = getattr(Config, 'OPENAI_CONCURRENCY_LIMIT', 3) processing_batch_size = getattr(Config, 'PROCESSING_BRANCH_BATCH_SIZE', 20) - # update_batch_row_limit wird hier nicht mehr global für all_sheet_updates verwendet, da pro Batch gesendet wird. # --- Verarbeitungsvariablen --- - tasks_for_current_processing_batch = [] # Tasks fuer den aktuellen ThreadPoolExecutor-Batch + tasks_for_current_processing_batch = [] processed_count = 0 skipped_count = 0 - # Laden des Zielschemas (global, aber hier zur Sicherheit prüfen) - global ALLOWED_TARGET_BRANCHES + global ALLOWED_TARGET_BRANCHES # Sicherstellen, dass wir auf die globale Variable zugreifen if not ALLOWED_TARGET_BRANCHES: load_target_schema() if not ALLOWED_TARGET_BRANCHES: @@ -6371,12 +6370,17 @@ class DataProcessor: if not any(cell and isinstance(cell, str) and cell.strip() for cell in row): skipped_count += 1 continue + + company_name_log = self._get_cell_value_safe(row, "CRM Name").strip() # Für Logging # --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist --- ao_value = self._get_cell_value_safe(row, "Timestamp letzte Pruefung").strip() processing_needed_based_on_status = not ao_value if not processing_needed_based_on_status: + # Detail-Log, warum übersprungen, wenn Timestamp gesetzt ist + # if i < start_sheet_row + 10 or i % 500 == 0 : # Logge nur für die ersten paar und dann seltener + # self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...): Uebersprungen (Timestamp letzte Pruefung '{ao_value}' bereits gesetzt).") skipped_count += 1 continue @@ -6385,64 +6389,90 @@ class DataProcessor: wiki_branche_val = self._get_cell_value_safe(row, "Wiki Branche").strip() wiki_kategorien_val = self._get_cell_value_safe(row, "Wiki Kategorien").strip() website_summary_val = self._get_cell_value_safe(row, "Website Zusammenfassung").strip() - info_sources_count = sum(1 for val in [crm_branche_val, crm_beschreibung_val, wiki_branche_val, wiki_kategorien_val, website_summary_val] - if val and isinstance(val, str) and val.strip() and val.strip().lower() != "k.a." and not val.strip().startswith("FEHLER")) + + # ======================= BEGINN DES ZU ERSETZENDEN/EINZUFÜGENDEN DEBUG-BLOCKS ======================= + self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Rohwerte für Info-Quellen:") + self.logger.debug(f" CRM Branche (H): '{crm_branche_val}' (Typ: {type(crm_branche_val)})") + self.logger.debug(f" CRM Beschreibung (G): '{str(crm_beschreibung_val)[:50]}...' (Typ: {type(crm_beschreibung_val)})") + self.logger.debug(f" Wiki Branche (R): '{wiki_branche_val}' (Typ: {type(wiki_branche_val)})") + self.logger.debug(f" Wiki Kategorien (U): '{str(wiki_kategorien_val)[:50]}...' (Typ: {type(wiki_kategorien_val)})") + self.logger.debug(f" Website Zusammenfassung (AD): '{str(website_summary_val)[:50]}...' (Typ: {type(website_summary_val)})") + + sources_to_check = { + "CRM Branche": crm_branche_val, + "CRM Beschreibung": crm_beschreibung_val, + "Wiki Branche": wiki_branche_val, + "Wiki Kategorien": wiki_kategorien_val, + "Website Zusammenfassung": website_summary_val + } + + info_sources_count = 0 + counted_sources = [] + + for source_name, val in sources_to_check.items(): + is_valid_source = False + # Detailprüfung der Bedingungen + cond1 = bool(val) + cond2 = isinstance(val, str) + cond3 = False + cond4 = False + cond5 = False + if cond1 and cond2: + stripped_val = val.strip() + cond3 = bool(stripped_val) + if cond3: + lower_stripped_val = stripped_val.lower() + cond4 = lower_stripped_val != "k.a." + # Wichtig: "FEHLER" am Anfang des Strings, nicht nur enthalten + cond5 = not stripped_val.upper().startswith("FEHLER") + + is_valid_source = cond1 and cond2 and cond3 and cond4 and cond5 + + if is_valid_source: + info_sources_count += 1 + counted_sources.append(source_name) + + self.logger.debug(f" Prüfe Quelle '{source_name}': Wert='{str(val)[:30]}...', " + f"c1(val)? {cond1}, c2(is_str)? {cond2}, c3(stripped)? {cond3}, " + f"c4(!=k.a.)? {cond4}, c5(!FEHLER)? {cond5} -> Gültig? {is_valid_source}") + + self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Gezählte valide Quellen: {info_sources_count} - {counted_sources}") + if info_sources_count < 2: - self.logger.debug(f"Zeile {i} (Branch Check): Uebersprungen (AO leer, aber nur {info_sources_count} Informationsquellen verfuegbar). Mindestens 2 benoetigt.") + self.logger.info(f"Zeile {i} ({company_name_log[:30]}...) (Branch Check): Uebersprungen (Timestamp BC leer, aber nur {info_sources_count} Informationsquellen verfuegbar: {counted_sources}). Mindestens 2 benoetigt.") skipped_count += 1 continue - - # --- Wenn Verarbeitung noetig: Zur Task-Liste hinzufügen --- - # (Der processed_count wird erst erhöht, wenn der Task tatsächlich in einen Batch geht) + # ======================= ENDE DES ZU ERSETZENDEN/EINZUFÜGENDEN DEBUG-BLOCKS ======================= tasks_for_current_processing_batch.append({ "row_num": i, - "crm_branche": crm_branche_val, + "crm_branche": crm_branche_val, "beschreibung": crm_beschreibung_val, "wiki_branche": wiki_branche_val, "wiki_kategorien": wiki_kategorien_val, "website_summary": website_summary_val }) - - # --- Verarbeite den Batch, wenn voll oder am Ende des Limits/Bereichs --- - # Das Limit `limit` bezieht sich auf die Anzahl der tatsächlich *verarbeiteten* (nicht übersprungenen) Tasks. - # `processed_count` wird jetzt erst beim Start eines Batches relevant. - - # Bedingung zum Starten eines Batches: - # 1. Batch ist voll ODER - # 2. Es ist die letzte Zeile im definierten Bereich (`i == end_sheet_row`) UND es gibt Tasks ODER - # 3. Das `limit` ist erreicht (falls gesetzt) UND es gibt Tasks. - - # Zähle, wie viele Tasks wir bisher verarbeitet hätten, wenn dieser Batch startet. - # Dies ist `processed_count + len(tasks_for_current_processing_batch)`. execute_batch_now = False if len(tasks_for_current_processing_batch) >= processing_batch_size: execute_batch_now = True - elif i == end_sheet_row and tasks_for_current_processing_batch: # Letzte Zeile des Bereichs und es gibt Tasks + elif i == end_sheet_row and tasks_for_current_processing_batch: execute_batch_now = True elif limit is not None and (processed_count + len(tasks_for_current_processing_batch)) >= limit and tasks_for_current_processing_batch: - # Wenn das Limit erreicht wird durch die aktuellen Tasks im Batch - # Kürze tasks_for_current_processing_batch, falls es das Limit überschreiten würde if (processed_count + len(tasks_for_current_processing_batch)) > limit: num_to_trim = (processed_count + len(tasks_for_current_processing_batch)) - limit - # Entferne die überzähligen Tasks vom Ende der Liste tasks_for_current_processing_batch = tasks_for_current_processing_batch[:-num_to_trim] - - if tasks_for_current_processing_batch: # Nur ausführen, wenn nach Trimmen noch Tasks da sind + if tasks_for_current_processing_batch: execute_batch_now = True - if execute_batch_now: batch_start_row_log = tasks_for_current_processing_batch[0]['row_num'] batch_end_row_log = tasks_for_current_processing_batch[-1]['row_num'] self.logger.debug(f"\n--- Starte Branch-Evaluation Batch ({len(tasks_for_current_processing_batch)} Tasks, Zeilen {batch_start_row_log}-{batch_end_row_log}) ---") - # Erhöhe processed_count um die Anzahl der Tasks in diesem Batch processed_count += len(tasks_for_current_processing_batch) - # --- Parallele Verarbeitung dieses Batches --- - batch_results_data = [] # Ergebnisse dieses spezifischen Batches + batch_results_data = [] batch_error_count_this_batch = 0 openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT) @@ -6464,7 +6494,6 @@ class DataProcessor: self.logger.debug(f" Branch-Evaluation fuer Batch beendet. {len(batch_results_data)} Ergebnisse erhalten ({batch_error_count_this_batch} Fehler in diesem Batch).") - # --- Sheet Updates für diesen Batch vorbereiten und senden --- if batch_results_data: current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") current_version = getattr(Config, 'VERSION', 'unknown') @@ -6495,20 +6524,15 @@ class DataProcessor: if success: self.logger.info(f" Sheet-Update fuer Batch Zeilen {batch_start_row_log}-{batch_end_row_log} erfolgreich.") - tasks_for_current_processing_batch = [] # Batch-Liste leeren + tasks_for_current_processing_batch = [] pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.8 self.logger.debug(f"--- Batch Zeilen {batch_start_row_log}-{batch_end_row_log} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---") time.sleep(pause_duration) - # Wenn das Limit erreicht wurde, die Hauptschleife verlassen if limit is not None and processed_count >= limit: self.logger.info(f"Gesamtes Verarbeitungslimit ({limit}) fuer process_branch_batch erreicht. Beende.") break - # --- Verarbeitung des letzten unvollständigen Batches (falls Schleife nicht durch Limit beendet wurde) --- - # Dieser Block ist jetzt durch die Logik `elif i == end_sheet_row and tasks_for_current_processing_batch:` abgedeckt. - # Ein separater Block für den "finalen Batch" ist nicht mehr nötig, wenn das Limit korrekt gehandhabt wird. - self.logger.info(f"Brancheneinschaetzung (Parallel Batch) abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.")