This commit is contained in:
2025-05-11 21:13:31 +00:00
parent d3d4d15549
commit 660a9ddbd2

View File

@@ -6304,7 +6304,7 @@ class DataProcessor:
def process_branch_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None):
self.logger.info(f"Starte Brancheneinschaetzung (Parallel Batch). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...")
# --- Daten laden und Startzeile ermitteln --- (wie gehabt)
# --- Daten laden und Startzeile ermitteln ---
if start_sheet_row is None:
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren Timestamp letzte Pruefung (BC)...")
start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key="Timestamp letzte Pruefung", min_sheet_row=7)
@@ -6329,31 +6329,25 @@ class DataProcessor:
self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.")
return
# --- Indizes und Buchstaben --- (wie gehabt)
# --- Indizes und Buchstaben ---
required_keys = [
"Timestamp letzte Pruefung", "CRM Branche", "CRM Beschreibung", "Wiki Branche",
"Wiki Kategorien", "Website Zusammenfassung", "Version", "Chat Vorschlag Branche",
"Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche",
"CRM Name", "Finaler Umsatz (Wiki>CRM)", "Finaler Mitarbeiter (Wiki>CRM)" # Für Konsolidierung in _process_single_row (obwohl hier nicht direkt genutzt, aber für Vollständigkeit)
"CRM Name"
]
# ... (col_indices und Fehlerprüfung bleiben)
col_indices = {key: COLUMN_MAP.get(key) for key in required_keys}
if None in col_indices.values():
missing = [k for k, v in col_indices.items() if v is None]
self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_branch_batch: {missing}. Breche ab.")
return
# --- Konfiguration fuer Parallelisierung --- (wie gehabt)
MAX_BRANCH_WORKERS = getattr(Config, 'MAX_BRANCH_WORKERS', 10)
OPENAI_CONCURRENCY_LIMIT = getattr(Config, 'OPENAI_CONCURRENCY_LIMIT', 3)
processing_batch_size = getattr(Config, 'PROCESSING_BRANCH_BATCH_SIZE', 20)
# --- Verarbeitungsvariablen ---
tasks_to_submit_to_executor = [] # Sammelt Tasks, bis Limit oder Batch-Größe erreicht
total_tasks_collected_for_processing = 0 # Zählt alle Tasks, die die Kriterien erfüllen
processed_tasks_in_run = 0 # Zählt Tasks, die tatsächlich an den Executor übergeben wurden
tasks_for_current_batch = []
processed_tasks_count = 0 # Zählt Tasks, die tatsächlich verarbeitet wurden
skipped_count = 0
global ALLOWED_TARGET_BRANCHES
@@ -6363,10 +6357,66 @@ class DataProcessor:
self.logger.critical("FEHLER: Ziel-Branchenschema konnte nicht geladen werden. Breche Batch ab.")
return
# Funktion zum Verarbeiten eines einzelnen Batches (um Code-Duplikation zu reduzieren)
def _execute_and_write_batch(batch_tasks_to_run):
nonlocal processed_tasks_count # Zugriff auf die äußere Variable
if not batch_tasks_to_run:
return
batch_start_log = batch_tasks_to_run[0]['row_num']
batch_end_log = batch_tasks_to_run[-1]['row_num']
self.logger.debug(f"\n--- Verarbeite Branch-Evaluation Batch ({len(batch_tasks_to_run)} Tasks, Zeilen {batch_start_log}-{batch_end_log}) ---")
current_batch_results = []
current_batch_errors = 0
openai_sem = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor:
future_map = {executor.submit(self.evaluate_branch_task, task, openai_sem): task for task in batch_tasks_to_run}
for future in concurrent.futures.as_completed(future_map):
task_info = future_map[future]
try:
res_data = future.result()
current_batch_results.append(res_data)
if res_data.get('error'): current_batch_errors +=1
except Exception as exc_future:
self.logger.error(f"Exception im Future für Zeile {task_info['row_num']}: {exc_future}")
current_batch_results.append({"row_num": task_info['row_num'], "result": {"branch": "FEHLER FUTURE", "consistency": "error_task", "justification": str(exc_future)[:100]}, "error": str(exc_future)})
current_batch_errors += 1
self.logger.debug(f" Batch ({batch_start_log}-{batch_end_log}) beendet. {len(current_batch_results)} Ergebnisse, {current_batch_errors} Fehler.")
if current_batch_results:
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ver = getattr(Config, 'VERSION', 'unknown')
updates_this_batch = []
current_batch_results.sort(key=lambda x: x['row_num'])
for item in current_batch_results:
rn, res = item['row_num'], item['result']
self.logger.debug(f" Zeile {rn} (Ergebnis): {res}")
updates_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{rn}', 'values': [[res.get("branch", "ERR BR")]]})
updates_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{rn}', 'values': [[res.get("confidence", "N/A CO")]]})
updates_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{rn}', 'values': [[res.get("consistency", "err CO")]]})
updates_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{rn}', 'values': [[res.get("justification", "No JU")]]})
updates_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{rn}', 'values': [[ts]]})
updates_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{rn}', 'values': [[ver]]})
if updates_this_batch:
self.logger.debug(f" Sende Sheet-Update für {len(current_batch_results)} Zeilen dieses Batches...")
s_upd = self.sheet_handler.batch_update_cells(updates_this_batch)
if s_upd: self.logger.info(f" Sheet-Update für Batch Zeilen {batch_start_log}-{batch_end_log} erfolgreich.")
processed_tasks_count += len(batch_tasks_to_run) # Zähle verarbeitete Tasks
pause_dur = getattr(Config, 'RETRY_DELAY', 5) * 0.8
self.logger.debug(f"--- Batch ({batch_start_log}-{batch_end_log}) abgeschlossen. Warte {pause_dur:.2f}s ---")
time.sleep(pause_dur)
# Ende der Hilfsfunktion _execute_and_write_batch
# Hauptschleife über die Zeilen
for i in range(start_sheet_row, end_sheet_row + 1):
if limit is not None and total_tasks_collected_for_processing >= limit:
self.logger.info(f"Sammel-Limit ({limit}) für Branch-Tasks erreicht. Stoppe weitere Zeilenprüfung.")
if limit is not None and processed_tasks_count >= limit: # Prüfe Limit für *tatsächlich verarbeitete* Tasks
self.logger.info(f"Verarbeitungslimit ({limit}) erreicht. Stoppe weitere Zeilenprüfung.")
break
row_index_in_list = i - 1
@@ -6378,29 +6428,18 @@ class DataProcessor:
continue
company_name_log = self._get_cell_value_safe(row, "CRM Name").strip()
ao_value = self._get_cell_value_safe(row, "Timestamp letzte Pruefung").strip()
processing_needed_based_on_status = not ao_value
if not processing_needed_based_on_status:
if ao_value: # Wenn Timestamp gesetzt ist, überspringen
skipped_count += 1
continue
# --- DEBUG BLOCK für info_sources_count (wie im vorherigen Vorschlag) ---
crm_branche_val = self._get_cell_value_safe(row, "CRM Branche").strip()
# ... (Rest der _val Variablen und der detaillierte Debug-Block für info_sources_count)
crm_beschreibung_val = self._get_cell_value_safe(row, "CRM Beschreibung").strip()
wiki_branche_val = self._get_cell_value_safe(row, "Wiki Branche").strip()
wiki_kategorien_val = self._get_cell_value_safe(row, "Wiki Kategorien").strip()
# --- DEBUG BLOCK für info_sources_count (wie gehabt) ---
crm_branche_val = self._get_cell_value_safe(row, "CRM Branche").strip(); crm_beschreibung_val = self._get_cell_value_safe(row, "CRM Beschreibung").strip()
wiki_branche_val = self._get_cell_value_safe(row, "Wiki Branche").strip(); wiki_kategorien_val = self._get_cell_value_safe(row, "Wiki Kategorien").strip()
website_summary_val = self._get_cell_value_safe(row, "Website Zusammenfassung").strip()
self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Rohwerte für Info-Quellen:")
# ... (kompletter detaillierter Debug-Block für info_sources_count hier einfügen) ...
sources_to_check = {
"CRM Branche": crm_branche_val, "CRM Beschreibung": crm_beschreibung_val,
"Wiki Branche": wiki_branche_val, "Wiki Kategorien": wiki_kategorien_val,
"Website Zusammenfassung": website_summary_val
}
self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Rohwerte für Info-Quellen:")
sources_to_check = {"CRM Branche": crm_branche_val, "CRM Beschreibung": crm_beschreibung_val, "Wiki Branche": wiki_branche_val, "Wiki Kategorien": wiki_kategorien_val, "Website Zusammenfassung": website_summary_val}
info_sources_count = 0; counted_sources = []
for source_name, val in sources_to_check.items():
cond1 = bool(val); cond2 = isinstance(val, str); cond3 = False; cond4 = False; cond5 = False
@@ -6411,96 +6450,43 @@ class DataProcessor:
if is_valid_source: info_sources_count += 1; counted_sources.append(source_name)
self.logger.debug(f" Prüfe Quelle '{source_name}': Wert='{str(val)[:30]}...', c1?{cond1}, c2?{cond2}, c3?{cond3}, c4?{cond4}, c5?{cond5} -> Gültig? {is_valid_source}")
self.logger.debug(f"Zeile {i} ({company_name_log[:30]}...) - Gezählte valide Quellen: {info_sources_count} - {counted_sources}")
if info_sources_count < 2:
self.logger.info(f"Zeile {i} ({company_name_log[:30]}...) (Branch Check): Uebersprungen (Timestamp BC leer, aber nur {info_sources_count} Informationsquellen verfuegbar: {counted_sources}). Mindestens 2 benoetigt.")
skipped_count += 1
continue
# --- ENDE DEBUG BLOCK ---
# Task zur Liste hinzufügen, wenn alle Kriterien erfüllt sind
tasks_to_submit_to_executor.append({
"row_num": i, "crm_branche": crm_branche_val, "beschreibung": crm_beschreibung_val,
"wiki_branche": wiki_branche_val, "wiki_kategorien": wiki_kategorien_val,
"website_summary": website_summary_val
})
total_tasks_collected_for_processing += 1
# Ende der Hauptschleife for i in range...
# Task zur Liste hinzufügen, wenn alle Kriterien erfüllt sind UND Limit noch nicht erreicht
if limit is None or (processed_tasks_count + len(tasks_for_current_batch)) < limit:
tasks_for_current_batch.append({
"row_num": i, "crm_branche": crm_branche_val, "beschreibung": crm_beschreibung_val,
"wiki_branche": wiki_branche_val, "wiki_kategorien": wiki_kategorien_val,
"website_summary": website_summary_val
})
elif limit is not None and (processed_tasks_count + len(tasks_for_current_batch)) >= limit :
# Wenn das Hinzufügen dieses Tasks das Limit erreichen oder überschreiten würde,
# füge ihn noch hinzu (wird im nächsten Batch-Check gekürzt) und beende dann die Schleife
tasks_for_current_batch.append({
"row_num": i, "crm_branche": crm_branche_val, "beschreibung": crm_beschreibung_val,
"wiki_branche": wiki_branche_val, "wiki_kategorien": wiki_kategorien_val,
"website_summary": website_summary_val
})
self.logger.info(f"Zeile {i} wurde als letzter Task vor Erreichen des Limits ({limit}) gesammelt.")
# Die execute_and_write_batch Logik wird getriggert, wenn die Schleife endet oder der Batch voll ist.
# --- Jetzt die gesammelten Tasks in Batches verarbeiten ---
num_tasks_to_process = len(tasks_to_submit_to_executor)
self.logger.info(f"Nach Prüfung aller Zeilen im Bereich: {num_tasks_to_process} Tasks für Branch-Evaluation gesammelt.")
for batch_start_index in range(0, num_tasks_to_process, processing_batch_size):
if limit is not None and processed_tasks_in_run >= limit:
self.logger.info(f"Verarbeitungslimit ({limit}) vor Start des nächsten Batches erreicht.")
break # Verlasse die Batch-Verarbeitungsschleife
# Aktuellen Batch extrahieren
current_batch_tasks = tasks_to_submit_to_executor[batch_start_index : batch_start_index + processing_batch_size]
# Überprüfen, ob der aktuelle Batch das Limit überschreiten würde
if limit is not None and (processed_tasks_in_run + len(current_batch_tasks)) > limit:
can_take_in_batch = limit - processed_tasks_in_run
if can_take_in_batch <= 0: # Sollte nicht passieren, wenn Limit-Check oben greift
break
current_batch_tasks = current_batch_tasks[:can_take_in_batch]
if not current_batch_tasks: # Sollte nicht passieren, wenn Limit-Check oben greift
continue
batch_start_row_log = current_batch_tasks[0]['row_num']
batch_end_row_log = current_batch_tasks[-1]['row_num']
self.logger.debug(f"\n--- Starte Branch-Evaluation Batch ({len(current_batch_tasks)} Tasks, Zeilen {batch_start_row_log}-{batch_end_row_log}) ---")
processed_tasks_in_run += len(current_batch_tasks)
batch_results_data = []
batch_error_count_this_batch = 0
openai_semaphore_branch = threading.Semaphore(OPENAI_CONCURRENCY_LIMIT)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_BRANCH_WORKERS) as executor:
future_to_task = {executor.submit(self.evaluate_branch_task, task, openai_semaphore_branch): task for task in current_batch_tasks}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result_data = future.result()
batch_results_data.append(result_data)
if result_data.get('error'): batch_error_count_this_batch += 1
except Exception as exc:
row_num_exc = task['row_num']
err_msg_exc = f"Unerwarteter Fehler bei Ergebnisabfrage Branch Task Zeile {row_num_exc}: {type(exc).__name__} - {exc}"
self.logger.error(err_msg_exc)
batch_results_data.append({"row_num": row_num_exc, "result": {"branch": "FEHLER", "consistency": "error_task", "justification": err_msg_exc[:500]}, "error": err_msg_exc})
batch_error_count_this_batch += 1
self.logger.debug(f" Branch-Evaluation fuer Batch beendet. {len(batch_results_data)} Ergebnisse erhalten ({batch_error_count_this_batch} Fehler in diesem Batch).")
if batch_results_data:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = getattr(Config, 'VERSION', 'unknown')
sheet_updates_for_this_batch = []
batch_results_data.sort(key=lambda x: x['row_num'])
for res_data_item in batch_results_data:
row_num_item = res_data_item['row_num']; result_item = res_data_item['result']
self.logger.debug(f" Zeile {row_num_item} (aus Batch): Ergebnis aus evaluate_branch_task: {result_item}")
# ... (alle appends für sheet_updates_for_this_batch wie im vorherigen korrekten Vorschlag)
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num_item}', 'values': [[result_item.get("branch", "FEHLER BRANCH")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num_item}', 'values': [[result_item.get("confidence", "N/A CONF")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num_item}', 'values': [[result_item.get("consistency", "error CONS")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num_item}', 'values': [[result_item.get("justification", "Keine Begr. JUST")]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num_item}', 'values': [[current_timestamp]]})
sheet_updates_for_this_batch.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num_item}', 'values': [[current_version]]})
if sheet_updates_for_this_batch:
self.logger.debug(f" Sende Sheet-Update fuer {len(batch_results_data)} Zeilen ({len(sheet_updates_for_this_batch)} Zellen) dieses Batches...")
success = self.sheet_handler.batch_update_cells(sheet_updates_for_this_batch)
if success: self.logger.info(f" Sheet-Update fuer Batch Zeilen {batch_start_row_log}-{batch_end_row_log} erfolgreich.")
pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.8
self.logger.debug(f"--- Batch Zeilen {batch_start_row_log}-{batch_end_row_log} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---")
time.sleep(pause_duration)
# Batch ausführen, wenn voll ODER es die letzte Zeile ist
if len(tasks_for_current_batch) >= processing_batch_size or i == end_sheet_row:
_execute_and_write_batch(tasks_for_current_batch)
tasks_for_current_batch = [] # Batch-Liste für den nächsten Durchlauf leeren
self.logger.info(f"Brancheneinschaetzung (Parallel Batch) abgeschlossen. {processed_tasks_in_run} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.")
# Sicherstellen, dass ein eventuell nicht voller letzter Batch auch noch verarbeitet wird,
# falls die Schleife durch das Limit beendet wurde und noch Tasks übrig sind.
if tasks_for_current_batch:
self.logger.debug(f"Verarbeite verbleibenden Rest-Batch von {len(tasks_for_current_batch)} Tasks...")
_execute_and_write_batch(tasks_for_current_batch)
self.logger.info(f"Brancheneinschaetzung (Parallel Batch) abgeschlossen. {processed_tasks_count} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.")
# ==============================================================================