This commit is contained in:
2025-05-11 21:08:02 +00:00
parent 05ed038862
commit d3d4d15549

View File

@@ -6302,17 +6302,17 @@ class DataProcessor:
# ALLOWED_TARGET_BRANCHES (Block 6).
# Nutzt die uebergeordnete sheet_handler Instanz (Block 14).
def process_branch_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None):
self.logger.info(f"Starte Brancheneinschaetzung (Parallel Batch W-Y, AO, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...")
self.logger.info(f"Starte Brancheneinschaetzung (Parallel Batch). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...")
# --- Daten laden und Startzeile ermitteln ---
# --- Daten laden und Startzeile ermitteln --- (wie gehabt)
if start_sheet_row is None:
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren AO...")
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren Timestamp letzte Pruefung (BC)...")
start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key="Timestamp letzte Pruefung", min_sheet_row=7)
if start_data_index_no_header == -1:
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.")
return
start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1
self.logger.info(f"Automatisch ermittelte Startzeile (erste leere AO Zelle): {start_sheet_row}")
self.logger.info(f"Automatisch ermittelte Startzeile (erste leere BC Zelle): {start_sheet_row}")
else:
if not self.sheet_handler.load_data():
self.logger.error("FEHLER beim Laden der Daten fuer process_branch_batch.")
@@ -6329,40 +6329,46 @@ class DataProcessor:
self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.")
return
# --- Indizes und Buchstaben ---
# --- Indizes und Buchstaben --- (wie gehabt)
required_keys = [
"Timestamp letzte Pruefung",
"CRM Branche", "CRM Beschreibung", "Wiki Branche", "Wiki Kategorien",
"Website Zusammenfassung", "Version",
"Chat Vorschlag Branche", "Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche",
"CRM Name" # Hinzugefügt für besseres Logging
"Timestamp letzte Pruefung", "CRM Branche", "CRM Beschreibung", "Wiki Branche",
"Wiki Kategorien", "Website Zusammenfassung", "Version", "Chat Vorschlag Branche",
"Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche",
"CRM Name", "Finaler Umsatz (Wiki>CRM)", "Finaler Mitarbeiter (Wiki>CRM)" # Für Konsolidierung in _process_single_row (obwohl hier nicht direkt genutzt, aber für Vollständigkeit)
]
# ... (col_indices und Fehlerprüfung bleiben)
col_indices = {key: COLUMN_MAP.get(key) for key in required_keys}
if None in col_indices.values():
missing = [k for k, v in col_indices.items() if v is None]
self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_branch_batch: {missing}. Breche ab.")
return
# --- Konfiguration fuer Parallelisierung ---
# --- Konfiguration fuer Parallelisierung --- (wie gehabt)
MAX_BRANCH_WORKERS = getattr(Config, 'MAX_BRANCH_WORKERS', 10)
OPENAI_CONCURRENCY_LIMIT = getattr(Config, 'OPENAI_CONCURRENCY_LIMIT', 3)
processing_batch_size = getattr(Config, 'PROCESSING_BRANCH_BATCH_SIZE', 20)
# --- Verarbeitungsvariablen ---
tasks_for_current_processing_batch = []
tasks_to_submit_to_executor = [] # Sammelt Tasks, bis Limit oder Batch-Größe erreicht
processed_count = 0
total_tasks_collected_for_processing = 0 # Zählt alle Tasks, die die Kriterien erfüllen
processed_tasks_in_run = 0 # Zählt Tasks, die tatsächlich an den Executor übergeben wurden
skipped_count = 0
global ALLOWED_TARGET_BRANCHES # Sicherstellen, dass wir auf die globale Variable zugreifen
global ALLOWED_TARGET_BRANCHES
if not ALLOWED_TARGET_BRANCHES:
load_target_schema()
if not ALLOWED_TARGET_BRANCHES:
self.logger.critical("FEHLER: Ziel-Branchenschema konnte nicht geladen werden. Branchenbewertung nicht moeglich. Breche Batch ab.")
self.logger.critical("FEHLER: Ziel-Branchenschema konnte nicht geladen werden. Breche Batch ab.")
return
# Hauptschleife über die Zeilen
for i in range(start_sheet_row, end_sheet_row + 1):
if limit is not None and total_tasks_collected_for_processing >= limit:
self.logger.info(f"Sammel-Limit ({limit}) für Branch-Tasks erreicht. Stoppe weitere Zeilenprüfung.")
break
row_index_in_list = i - 1
if row_index_in_list >= total_sheet_rows: break
@@ -6371,169 +6377,130 @@ class DataProcessor:
skipped_count += 1
continue
company_name_log = self._get_cell_value_safe(row, "CRM Name").strip() # Für Logging
company_name_log = self._get_cell_value_safe(row, "CRM Name").strip()
# --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist ---
ao_value = self._get_cell_value_safe(row, "Timestamp letzte Pruefung").strip()
processing_needed_based_on_status = not ao_value
if not processing_needed_based_on_status:
# Detail-Log, warum übersprungen, wenn Timestamp gesetzt ist
# if i < start_sheet_row + 10 or i % 500 == 0 : # Logge nur für die ersten paar und dann seltener
# self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...): Uebersprungen (Timestamp letzte Pruefung '{ao_value}' bereits gesetzt).")
skipped_count += 1
continue
# --- DEBUG BLOCK für info_sources_count (wie im vorherigen Vorschlag) ---
crm_branche_val = self._get_cell_value_safe(row, "CRM Branche").strip()
# ... (Rest der _val Variablen und der detaillierte Debug-Block für info_sources_count)
crm_beschreibung_val = self._get_cell_value_safe(row, "CRM Beschreibung").strip()
wiki_branche_val = self._get_cell_value_safe(row, "Wiki Branche").strip()
wiki_kategorien_val = self._get_cell_value_safe(row, "Wiki Kategorien").strip()
website_summary_val = self._get_cell_value_safe(row, "Website Zusammenfassung").strip()
# ======================= BEGINN DES ZU ERSETZENDEN/EINZUFÜGENDEN DEBUG-BLOCKS =======================
self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Rohwerte für Info-Quellen:")
self.logger.debug(f" CRM Branche (H): '{crm_branche_val}' (Typ: {type(crm_branche_val)})")
self.logger.debug(f" CRM Beschreibung (G): '{str(crm_beschreibung_val)[:50]}...' (Typ: {type(crm_beschreibung_val)})")
self.logger.debug(f" Wiki Branche (R): '{wiki_branche_val}' (Typ: {type(wiki_branche_val)})")
self.logger.debug(f" Wiki Kategorien (U): '{str(wiki_kategorien_val)[:50]}...' (Typ: {type(wiki_kategorien_val)})")
self.logger.debug(f" Website Zusammenfassung (AD): '{str(website_summary_val)[:50]}...' (Typ: {type(website_summary_val)})")
# ... (kompletter detaillierter Debug-Block für info_sources_count hier einfügen) ...
sources_to_check = {
"CRM Branche": crm_branche_val,
"CRM Beschreibung": crm_beschreibung_val,
"Wiki Branche": wiki_branche_val,
"Wiki Kategorien": wiki_kategorien_val,
"CRM Branche": crm_branche_val, "CRM Beschreibung": crm_beschreibung_val,
"Wiki Branche": wiki_branche_val, "Wiki Kategorien": wiki_kategorien_val,
"Website Zusammenfassung": website_summary_val
}
info_sources_count = 0
counted_sources = []
info_sources_count = 0; counted_sources = []
for source_name, val in sources_to_check.items():
is_valid_source = False
# Detailprüfung der Bedingungen
cond1 = bool(val)
cond2 = isinstance(val, str)
cond3 = False
cond4 = False
cond5 = False
cond1 = bool(val); cond2 = isinstance(val, str); cond3 = False; cond4 = False; cond5 = False
if cond1 and cond2:
stripped_val = val.strip()
cond3 = bool(stripped_val)
if cond3:
lower_stripped_val = stripped_val.lower()
cond4 = lower_stripped_val != "k.a."
# Wichtig: "FEHLER" am Anfang des Strings, nicht nur enthalten
cond5 = not stripped_val.upper().startswith("FEHLER")
stripped_val = val.strip(); cond3 = bool(stripped_val)
if cond3: lower_stripped_val = stripped_val.lower(); cond4 = lower_stripped_val != "k.a."; cond5 = not stripped_val.upper().startswith("FEHLER")
is_valid_source = cond1 and cond2 and cond3 and cond4 and cond5
if is_valid_source:
info_sources_count += 1
counted_sources.append(source_name)
self.logger.debug(f" Prüfe Quelle '{source_name}': Wert='{str(val)[:30]}...', "
f"c1(val)? {cond1}, c2(is_str)? {cond2}, c3(stripped)? {cond3}, "
f"c4(!=k.a.)? {cond4}, c5(!FEHLER)? {cond5} -> Gültig? {is_valid_source}")
if is_valid_source: info_sources_count += 1; counted_sources.append(source_name)
self.logger.debug(f" Prüfe Quelle '{source_name}': Wert='{str(val)[:30]}...', c1?{cond1}, c2?{cond2}, c3?{cond3}, c4?{cond4}, c5?{cond5} -> Gültig? {is_valid_source}")
self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Gezählte valide Quellen: {info_sources_count} - {counted_sources}")
if info_sources_count < 2:
self.logger.info(f"Zeile {i} ({company_name_log[:30]}...) (Branch Check): Uebersprungen (Timestamp BC leer, aber nur {info_sources_count} Informationsquellen verfuegbar: {counted_sources}). Mindestens 2 benoetigt.")
skipped_count += 1
continue
# ======================= ENDE DES ZU ERSETZENDEN/EINZUFÜGENDEN DEBUG-BLOCKS =======================
# --- ENDE DEBUG BLOCK ---
tasks_for_current_processing_batch.append({
"row_num": i,
"crm_branche": crm_branche_val,
"beschreibung": crm_beschreibung_val,
"wiki_branche": wiki_branche_val,
"wiki_kategorien": wiki_kategorien_val,
# Task zur Liste hinzufügen, wenn alle Kriterien erfüllt sind
tasks_to_submit_to_executor.append({
"row_num": i, "crm_branche": crm_branche_val, "beschreibung": crm_beschreibung_val,
"wiki_branche": wiki_branche_val, "wiki_kategorien": wiki_kategorien_val,
"website_summary": website_summary_val
})
total_tasks_collected_for_processing += 1
# Ende der Hauptschleife for i in range...
# --- Jetzt die gesammelten Tasks in Batches verarbeiten ---
num_tasks_to_process = len(tasks_to_submit_to_executor)
self.logger.info(f"Nach Prüfung aller Zeilen im Bereich: {num_tasks_to_process} Tasks für Branch-Evaluation gesammelt.")
for batch_start_index in range(0, num_tasks_to_process, processing_batch_size):
if limit is not None and processed_tasks_in_run >= limit:
self.logger.info(f"Verarbeitungslimit ({limit}) vor Start des nächsten Batches erreicht.")
break # Verlasse die Batch-Verarbeitungsschleife
# Aktuellen Batch extrahieren
current_batch_tasks = tasks_to_submit_to_executor[batch_start_index : batch_start_index + processing_batch_size]
execute_batch_now = False
if len(tasks_for_current_processing_batch) >= processing_batch_size:
execute_batch_now = True
elif i == end_sheet_row and tasks_for_current_processing_batch:
execute_batch_now = True
elif limit is not None and (processed_count + len(tasks_for_current_processing_batch)) >= limit and tasks_for_current_processing_batch:
if (processed_count + len(tasks_for_current_processing_batch)) > limit:
num_to_trim = (processed_count + len(tasks_for_current_processing_batch)) - limit
tasks_for_current_processing_batch = tasks_for_current_processing_batch[:-num_to_trim]
if tasks_for_current_processing_batch:
execute_batch_now = True
# Überprüfen, ob der aktuelle Batch das Limit überschreiten würde
if limit is not None and (processed_tasks_in_run + len(current_batch_tasks)) > limit:
can_take_in_batch = limit - processed_tasks_in_run
if can_take_in_batch <= 0: # Sollte nicht passieren, wenn Limit-Check oben greift
break
current_batch_tasks = current_batch_tasks[:can_take_in_batch]
if execute_batch_now:
batch_start_row_log = tasks_for_current_processing_batch[0]['row_num']
batch_end_row_log = tasks_for_current_processing_batch[-1]['row_num']
self.logger.debug(f"\n--- Starte Branch-Evaluation Batch ({len(tasks_for_current_processing_batch)} Tasks, Zeilen {batch_start_row_log}-{batch_end_row_log}) ---")
processed_count += len(tasks_for_current_processing_batch)
if not current_batch_tasks: # Sollte nicht passieren, wenn Limit-Check oben greift
continue
batch_results_data = []
batch_error_count_this_batch = 0
openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor:
future_to_task = {executor.submit(self.evaluate_branch_task, task, openai_semaphore_branch): task for task in tasks_for_current_processing_batch}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result_data = future.result()
batch_results_data.append(result_data)
if result_data.get('error'):
batch_error_count_this_batch += 1
except Exception as exc:
row_num_exc = task['row_num']
err_msg_exc = f"Unerwarteter Fehler bei Ergebnisabfrage Branch Task Zeile {row_num_exc}: {type(exc).__name__} - {exc}"
self.logger.error(err_msg_exc)
batch_results_data.append({"row_num": row_num_exc, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg_exc[:500]}, "error": err_msg_exc})
batch_error_count_this_batch += 1
self.logger.debug(f" Branch-Evaluation fuer Batch beendet. {len(batch_results_data)} Ergebnisse erhalten ({batch_error_count_this_batch} Fehler in diesem Batch).")
batch_start_row_log = current_batch_tasks[0]['row_num']
batch_end_row_log = current_batch_tasks[-1]['row_num']
self.logger.debug(f"\n--- Starte Branch-Evaluation Batch ({len(current_batch_tasks)} Tasks, Zeilen {batch_start_row_log}-{batch_end_row_log}) ---")
processed_tasks_in_run += len(current_batch_tasks)
if batch_results_data:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = getattr(Config, 'VERSION', 'unknown')
sheet_updates_for_this_batch = []
batch_results_data.sort(key=lambda x: x['row_num'])
batch_results_data = []
batch_error_count_this_batch = 0
openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor:
future_to_task = {executor.submit(self.evaluate_branch_task, task, openai_semaphore_branch): task for task in current_batch_tasks}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result_data = future.result()
batch_results_data.append(result_data)
if result_data.get('error'): batch_error_count_this_batch += 1
except Exception as exc:
row_num_exc = task['row_num']
err_msg_exc = f"Unerwarteter Fehler bei Ergebnisabfrage Branch Task Zeile {row_num_exc}: {type(exc).__name__} - {exc}"
self.logger.error(err_msg_exc)
batch_results_data.append({"row_num": row_num_exc, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg_exc[:500]}, "error": err_msg_exc})
batch_error_count_this_batch += 1
self.logger.debug(f" Branch-Evaluation fuer Batch beendet. {len(batch_results_data)} Ergebnisse erhalten ({batch_error_count_this_batch} Fehler in diesem Batch).")
for res_data_item in batch_results_data:
row_num_item = res_data_item['row_num']
result_item = res_data_item['result']
self.logger.debug(f" Zeile {row_num_item} (aus Batch): Ergebnis aus evaluate_branch_task: {result_item}")
if batch_results_data:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = getattr(Config, 'VERSION', 'unknown')
sheet_updates_for_this_batch = []
batch_results_data.sort(key=lambda x: x['row_num'])
for res_data_item in batch_results_data:
row_num_item = res_data_item['row_num']; result_item = res_data_item['result']
self.logger.debug(f" Zeile {row_num_item} (aus Batch): Ergebnis aus evaluate_branch_task: {result_item}")
# ... (alle appends für sheet_updates_for_this_batch wie im vorherigen korrekten Vorschlag)
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num_item}', 'values': [[result_item.get("branch", "FEHLER BRANCH")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num_item}', 'values': [[result_item.get("confidence", "N/A CONF")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num_item}', 'values': [[result_item.get("consistency", "error CONS")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num_item}', 'values': [[result_item.get("justification", "Keine Begr. JUST")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num_item}', 'values': [[current_timestamp]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num_item}', 'values': [[current_version]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num_item}',
'values': [[result_item.get("branch", "FEHLER BRANCH")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num_item}',
'values': [[result_item.get("confidence", "N/A CONF")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num_item}',
'values': [[result_item.get("consistency", "error CONS")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num_item}',
'values': [[result_item.get("justification", "Keine Begr. JUST")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num_item}',
'values': [[current_timestamp]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num_item}',
'values': [[current_version]]})
if sheet_updates_for_this_batch:
self.logger.debug(f" Sende Sheet-Update fuer {len(batch_results_data)} Zeilen ({len(sheet_updates_for_this_batch)} Zellen) dieses Batches...")
success = self.sheet_handler.batch_update_cells(sheet_updates_for_this_batch)
if success:
self.logger.info(f" Sheet-Update fuer Batch Zeilen {batch_start_row_log}-{batch_end_row_log} erfolgreich.")
tasks_for_current_processing_batch = []
pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.8
self.logger.debug(f"--- Batch Zeilen {batch_start_row_log}-{batch_end_row_log} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---")
time.sleep(pause_duration)
if limit is not None and processed_count >= limit:
self.logger.info(f"Gesamtes Verarbeitungslimit ({limit}) fuer process_branch_batch erreicht. Beende.")
break
if sheet_updates_for_this_batch:
self.logger.debug(f" Sende Sheet-Update fuer {len(batch_results_data)} Zeilen ({len(sheet_updates_for_this_batch)} Zellen) dieses Batches...")
success = self.sheet_handler.batch_update_cells(sheet_updates_for_this_batch)
if success: self.logger.info(f" Sheet-Update fuer Batch Zeilen {batch_start_row_log}-{batch_end_row_log} erfolgreich.")
pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.8
self.logger.debug(f"--- Batch Zeilen {batch_start_row_log}-{batch_end_row_log} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---")
time.sleep(pause_duration)
self.logger.info(f"Brancheneinschaetzung (Parallel Batch) abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.")
self.logger.info(f"Brancheneinschaetzung (Parallel Batch) abgeschlossen. {processed_tasks_in_run} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.")
# ==============================================================================