This commit is contained in:
2025-05-11 20:42:53 +00:00
parent 3f9e9ba8ef
commit 2646485f7e

View File

@@ -6334,7 +6334,8 @@ class DataProcessor:
"Timestamp letzte Pruefung",
"CRM Branche", "CRM Beschreibung", "Wiki Branche", "Wiki Kategorien",
"Website Zusammenfassung", "Version",
"Chat Vorschlag Branche", "Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche"
"Chat Vorschlag Branche", "Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche",
"CRM Name" # Hinzugefügt für besseres Logging
]
col_indices = {key: COLUMN_MAP.get(key) for key in required_keys}
if None in col_indices.values():
@@ -6346,16 +6347,14 @@ class DataProcessor:
MAX_BRANCH_WORKERS = getattr(Config, 'MAX_BRANCH_WORKERS', 10)
OPENAI_CONCURRENCY_LIMIT = getattr(Config, 'OPENAI_CONCURRENCY_LIMIT', 3)
processing_batch_size = getattr(Config, 'PROCESSING_BRANCH_BATCH_SIZE', 20)
# update_batch_row_limit wird hier nicht mehr global für all_sheet_updates verwendet, da pro Batch gesendet wird.
# --- Verarbeitungsvariablen ---
tasks_for_current_processing_batch = [] # Tasks fuer den aktuellen ThreadPoolExecutor-Batch
tasks_for_current_processing_batch = []
processed_count = 0
skipped_count = 0
# Laden des Zielschemas (global, aber hier zur Sicherheit prüfen)
global ALLOWED_TARGET_BRANCHES
global ALLOWED_TARGET_BRANCHES # Sicherstellen, dass wir auf die globale Variable zugreifen
if not ALLOWED_TARGET_BRANCHES:
load_target_schema()
if not ALLOWED_TARGET_BRANCHES:
@@ -6371,12 +6370,17 @@ class DataProcessor:
if not any(cell and isinstance(cell, str) and cell.strip() for cell in row):
skipped_count += 1
continue
company_name_log = self._get_cell_value_safe(row, "CRM Name").strip() # Für Logging
# --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist ---
ao_value = self._get_cell_value_safe(row, "Timestamp letzte Pruefung").strip()
processing_needed_based_on_status = not ao_value
if not processing_needed_based_on_status:
# Detail-Log, warum übersprungen, wenn Timestamp gesetzt ist
# if i < start_sheet_row + 10 or i % 500 == 0 : # Logge nur für die ersten paar und dann seltener
# self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...): Uebersprungen (Timestamp letzte Pruefung '{ao_value}' bereits gesetzt).")
skipped_count += 1
continue
@@ -6385,64 +6389,90 @@ class DataProcessor:
wiki_branche_val = self._get_cell_value_safe(row, "Wiki Branche").strip()
wiki_kategorien_val = self._get_cell_value_safe(row, "Wiki Kategorien").strip()
website_summary_val = self._get_cell_value_safe(row, "Website Zusammenfassung").strip()
info_sources_count = sum(1 for val in [crm_branche_val, crm_beschreibung_val, wiki_branche_val, wiki_kategorien_val, website_summary_val]
if val and isinstance(val, str) and val.strip() and val.strip().lower() != "k.a." and not val.strip().startswith("FEHLER"))
# ======================= BEGINN DES ZU ERSETZENDEN/EINZUFÜGENDEN DEBUG-BLOCKS =======================
self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Rohwerte für Info-Quellen:")
self.logger.debug(f" CRM Branche (H): '{crm_branche_val}' (Typ: {type(crm_branche_val)})")
self.logger.debug(f" CRM Beschreibung (G): '{str(crm_beschreibung_val)[:50]}...' (Typ: {type(crm_beschreibung_val)})")
self.logger.debug(f" Wiki Branche (R): '{wiki_branche_val}' (Typ: {type(wiki_branche_val)})")
self.logger.debug(f" Wiki Kategorien (U): '{str(wiki_kategorien_val)[:50]}...' (Typ: {type(wiki_kategorien_val)})")
self.logger.debug(f" Website Zusammenfassung (AD): '{str(website_summary_val)[:50]}...' (Typ: {type(website_summary_val)})")
sources_to_check = {
"CRM Branche": crm_branche_val,
"CRM Beschreibung": crm_beschreibung_val,
"Wiki Branche": wiki_branche_val,
"Wiki Kategorien": wiki_kategorien_val,
"Website Zusammenfassung": website_summary_val
}
info_sources_count = 0
counted_sources = []
for source_name, val in sources_to_check.items():
is_valid_source = False
# Detailprüfung der Bedingungen
cond1 = bool(val)
cond2 = isinstance(val, str)
cond3 = False
cond4 = False
cond5 = False
if cond1 and cond2:
stripped_val = val.strip()
cond3 = bool(stripped_val)
if cond3:
lower_stripped_val = stripped_val.lower()
cond4 = lower_stripped_val != "k.a."
# Wichtig: "FEHLER" am Anfang des Strings, nicht nur enthalten
cond5 = not stripped_val.upper().startswith("FEHLER")
is_valid_source = cond1 and cond2 and cond3 and cond4 and cond5
if is_valid_source:
info_sources_count += 1
counted_sources.append(source_name)
self.logger.debug(f" Prüfe Quelle '{source_name}': Wert='{str(val)[:30]}...', "
f"c1(val)? {cond1}, c2(is_str)? {cond2}, c3(stripped)? {cond3}, "
f"c4(!=k.a.)? {cond4}, c5(!FEHLER)? {cond5} -> Gültig? {is_valid_source}")
self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Gezählte valide Quellen: {info_sources_count} - {counted_sources}")
if info_sources_count < 2:
self.logger.debug(f"Zeile {i} (Branch Check): Uebersprungen (AO leer, aber nur {info_sources_count} Informationsquellen verfuegbar). Mindestens 2 benoetigt.")
self.logger.info(f"Zeile {i} ({company_name_log[:30]}...) (Branch Check): Uebersprungen (Timestamp BC leer, aber nur {info_sources_count} Informationsquellen verfuegbar: {counted_sources}). Mindestens 2 benoetigt.")
skipped_count += 1
continue
# --- Wenn Verarbeitung noetig: Zur Task-Liste hinzufügen ---
# (Der processed_count wird erst erhöht, wenn der Task tatsächlich in einen Batch geht)
# ======================= ENDE DES ZU ERSETZENDEN/EINZUFÜGENDEN DEBUG-BLOCKS =======================
tasks_for_current_processing_batch.append({
"row_num": i,
"crm_branche": crm_branche_val,
"crm_branche": crm_branche_val,
"beschreibung": crm_beschreibung_val,
"wiki_branche": wiki_branche_val,
"wiki_kategorien": wiki_kategorien_val,
"website_summary": website_summary_val
})
# --- Verarbeite den Batch, wenn voll oder am Ende des Limits/Bereichs ---
# Das Limit `limit` bezieht sich auf die Anzahl der tatsächlich *verarbeiteten* (nicht übersprungenen) Tasks.
# `processed_count` wird jetzt erst beim Start eines Batches relevant.
# Bedingung zum Starten eines Batches:
# 1. Batch ist voll ODER
# 2. Es ist die letzte Zeile im definierten Bereich (`i == end_sheet_row`) UND es gibt Tasks ODER
# 3. Das `limit` ist erreicht (falls gesetzt) UND es gibt Tasks.
# Zähle, wie viele Tasks wir bisher verarbeitet hätten, wenn dieser Batch startet.
# Dies ist `processed_count + len(tasks_for_current_processing_batch)`.
execute_batch_now = False
if len(tasks_for_current_processing_batch) >= processing_batch_size:
execute_batch_now = True
elif i == end_sheet_row and tasks_for_current_processing_batch: # Letzte Zeile des Bereichs und es gibt Tasks
elif i == end_sheet_row and tasks_for_current_processing_batch:
execute_batch_now = True
elif limit is not None and (processed_count + len(tasks_for_current_processing_batch)) >= limit and tasks_for_current_processing_batch:
# Wenn das Limit erreicht wird durch die aktuellen Tasks im Batch
# Kürze tasks_for_current_processing_batch, falls es das Limit überschreiten würde
if (processed_count + len(tasks_for_current_processing_batch)) > limit:
num_to_trim = (processed_count + len(tasks_for_current_processing_batch)) - limit
# Entferne die überzähligen Tasks vom Ende der Liste
tasks_for_current_processing_batch = tasks_for_current_processing_batch[:-num_to_trim]
if tasks_for_current_processing_batch: # Nur ausführen, wenn nach Trimmen noch Tasks da sind
if tasks_for_current_processing_batch:
execute_batch_now = True
if execute_batch_now:
batch_start_row_log = tasks_for_current_processing_batch[0]['row_num']
batch_end_row_log = tasks_for_current_processing_batch[-1]['row_num']
self.logger.debug(f"\n--- Starte Branch-Evaluation Batch ({len(tasks_for_current_processing_batch)} Tasks, Zeilen {batch_start_row_log}-{batch_end_row_log}) ---")
# Erhöhe processed_count um die Anzahl der Tasks in diesem Batch
processed_count += len(tasks_for_current_processing_batch)
# --- Parallele Verarbeitung dieses Batches ---
batch_results_data = [] # Ergebnisse dieses spezifischen Batches
batch_results_data = []
batch_error_count_this_batch = 0
openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT)
@@ -6464,7 +6494,6 @@ class DataProcessor:
self.logger.debug(f" Branch-Evaluation fuer Batch beendet. {len(batch_results_data)} Ergebnisse erhalten ({batch_error_count_this_batch} Fehler in diesem Batch).")
# --- Sheet Updates für diesen Batch vorbereiten und senden ---
if batch_results_data:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = getattr(Config, 'VERSION', 'unknown')
@@ -6495,20 +6524,15 @@ class DataProcessor:
if success:
self.logger.info(f" Sheet-Update fuer Batch Zeilen {batch_start_row_log}-{batch_end_row_log} erfolgreich.")
tasks_for_current_processing_batch = [] # Batch-Liste leeren
tasks_for_current_processing_batch = []
pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.8
self.logger.debug(f"--- Batch Zeilen {batch_start_row_log}-{batch_end_row_log} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---")
time.sleep(pause_duration)
# Wenn das Limit erreicht wurde, die Hauptschleife verlassen
if limit is not None and processed_count >= limit:
self.logger.info(f"Gesamtes Verarbeitungslimit ({limit}) fuer process_branch_batch erreicht. Beende.")
break
# --- Verarbeitung des letzten unvollständigen Batches (falls Schleife nicht durch Limit beendet wurde) ---
# Dieser Block ist jetzt durch die Logik `elif i == end_sheet_row and tasks_for_current_processing_batch:` abgedeckt.
# Ein separater Block für den "finalen Batch" ist nicht mehr nötig, wenn das Limit korrekt gehandhabt wird.
self.logger.info(f"Brancheneinschaetzung (Parallel Batch) abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.")