data_processor.py aktualisiert

This commit is contained in:
2025-07-20 12:37:45 +00:00
parent 5e50c33b62
commit 5c6d2581a6

View File

@@ -17,6 +17,7 @@ import threading
import pickle import pickle
import json import json
import os import os
import re
from datetime import datetime from datetime import datetime
import pandas as pd import pandas as pd
@@ -1008,6 +1009,45 @@ class DataProcessor:
# Das `result` Dictionary wird mit den initialen Fehlerwerten zurückgegeben. # Das `result` Dictionary wird mit den initialen Fehlerwerten zurückgegeben.
return result return result
def _summarize_task_batch(self, task_info):
"""
Robuste Worker-Funktion für die parallele Website-Zusammenfassung.
Wird vom ThreadPoolExecutor in `process_summarize_website` aufgerufen.
Gibt IMMER ein strukturiertes Dictionary zurück.
"""
row_num = task_info['row_num']
raw_text = task_info['raw_text']
company_name = task_info.get('company_name', 'einem Unternehmen')
self.logger.debug(f" -> Batch-Summarize-Task gestartet für Zeile {row_num}...")
result = {
'row_num': row_num,
'summary': 'k.A. (Fehler im Task)',
'error': True,
'status_message': 'Unbekannter Task-Fehler'
}
try:
# Ruft die gehärtete Single-Item-Funktion aus helpers.py auf
summary_text = summarize_website_content(raw_text, company_name)
if summary_text and not summary_text.lower().startswith('k.a.'):
result['summary'] = summary_text
result['error'] = False
result['status_message'] = 'Erfolgreich zusammengefasst'
else:
result['summary'] = summary_text # Fehlergrund übernehmen
result['error'] = True
result['status_message'] = 'Zusammenfassung fehlgeschlagen oder Text zu kurz'
return result
except Exception as e:
self.logger.error(f" -> Kritischer Fehler im Worker-Task `_summarize_task_batch` für Zeile {row_num}: {e}")
result['summary'] = f"FEHLER (API-Fehler bei Zusammenfassung)"
result['status_message'] = f"Kritischer Task-Fehler: {type(e).__name__}"
return result
def process_rows_sequentially( def process_rows_sequentially(
@@ -2195,437 +2235,140 @@ class DataProcessor:
f"Wikipedia-Verifizierungs-Batch abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen ({skipped_no_wiki_url} wegen fehlender M-URL).") # <<< GEÄNDERT f"Wikipedia-Verifizierungs-Batch abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen ({skipped_no_wiki_url} wegen fehlender M-URL).") # <<< GEÄNDERT
def process_summarization_batch( def process_summarize_website(self, start_sheet_row=None, end_sheet_row=None, limit=None):
self,
start_sheet_row=None,
end_sheet_row=None,
limit=None):
""" """
Batch-Prozess NUR fuer Website-Zusammenfassung. Batch-Prozess NUR für Website-Zusammenfassung.
Nutzt einen ThreadPoolExecutor für echte parallele Verarbeitung und verbesserte Stabilität.
""" """
self.logger.info( self.logger.info(f"Starte Website-Zusammenfassung (Parallel Batch, v2.0.3). Bereich: {start_sheet_row or 'Start'}-{end_sheet_row or 'Ende'}, Limit: {limit or 'Unbegrenzt'}")
f"Starte Website-Zusammenfassung (Batch). Bereich: {start_sheet_row if start_sheet_row else 'Start'}-{end_sheet_row if end_sheet_row else 'Ende'}, Limit: {limit if limit else 'Unbegrenzt'}")
# --- 1. Daten laden und Startzeile ermitteln ---
if start_sheet_row is None: if start_sheet_row is None:
start_data_index = self.sheet_handler.get_start_row_index( self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren 'Website Zusammenfassung'...")
check_column_key="Website Zusammenfassung") start_data_idx = self.sheet_handler.get_start_row_index(check_column_key="Website Zusammenfassung")
if start_data_index == -1: if start_data_idx == -1:
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.")
return return
start_sheet_row = start_data_index + self.sheet_handler._header_rows + 1 start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1
else: self.logger.info(f"Automatisch ermittelte Startzeile: {start_sheet_row}")
if not self.sheet_handler.load_data(): if not self.sheet_handler.load_data():
self.logger.error("FEHLER beim Laden der Daten für Summarization-Batch.")
return return
"""
Batch-Prozess NUR fuer Website-Zusammenfassung (AS).
Laedt Daten neu, prueft, ob Rohtext (AR) vorhanden und Zusammenfassung (AS) fehlt.
Fasst Rohtexte im Batch ueber OpenAI zusammen und setzt AS + AP.
Args:
start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AS).
end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet).
limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt).
"""
# Verwenden Sie logger, da das Logging jetzt konfiguriert ist
# Logge die Konfiguration des Batch-Laufs
self.logger.info(
f"Starte Website-Zusammenfassung (Batch AS, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT
# --- Daten laden und Startzeile ermitteln ---
# Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt
if start_sheet_row is None:
self.logger.info(
"Automatische Ermittlung der Startzeile basierend auf leeren AS...") # <<< GEÄNDERT
# Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AS (Block 1 Column Map).
# Standardmaessig ab Zeile 7
start_data_index_no_header = self.sheet_handler.get_start_row_index(
check_column_key="Website Zusammenfassung", min_sheet_row=7)
# Wenn get_start_row_index -1 zurueckgibt (Fehler)
if start_data_index_no_header == -1:
self.logger.error(
"FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT
return # Beende die Methode
# Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten
# Daten-Index
start_sheet_row = start_data_index_no_header + \
self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut
self.logger.info(
f"Automatisch ermittelte Startzeile (erste leere AS Zelle): {start_sheet_row}") # <<< GEÄNDERT
else:
# Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein.
# Der load_data Aufruf ist mit retry_on_failure dekoriert (Block
# 2).
if not self.sheet_handler.load_data():
self.logger.error(
"FEHLER beim Laden der Daten fuer process_summarization_batch.") # <<< GEÄNDERT
return # Beende die Methode, wenn das Laden fehlschlaegt
# Holen Sie die gesamte Datenliste (inklusive Header) aus dem
# SheetHandler.
all_data = self.sheet_handler.get_all_data_with_headers() all_data = self.sheet_handler.get_all_data_with_headers()
# Annahme: header_rows ist als Attribut im SheetHandler verfuegbar
# (Block 14).
header_rows = self.sheet_handler._header_rows header_rows = self.sheet_handler._header_rows
total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet total_sheet_rows = len(all_data)
effective_end_row = end_sheet_row if end_sheet_row is not None else total_sheet_rows
# Berechne Endzeile, wenn nicht manuell gesetzt self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {effective_end_row}.")
if end_sheet_row is None: if start_sheet_row > effective_end_row:
end_sheet_row = total_sheet_rows # Bis zur letzten Zeile self.logger.info("Start liegt nach dem Ende. Keine Zeilen zu verarbeiten.")
return
# Logge den verarbeitungsbereich # --- 2. Spalten-Indizes und Buchstaben vorbereiten ---
self.logger.info( summary_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Zusammenfassung") + 1)
f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT version_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Version") + 1)
# Wir benötigen auch einen Timestamp für die Zusammenfassung. Da keiner existiert, nutzen wir den Scrape-Timestamp neu.
timestamp_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)
# Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht
# ueber Gesamtzeilen)
if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows:
self.logger.info(
"Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT
return # Beende die Methode, wenn der Bereich leer ist
# --- Indizes und Buchstaben --- # --- 3. Tasks sammeln ---
# Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP openai_batch_size = getattr(Config, 'OPENAI_BATCH_SIZE_LIMIT', 10) # Kann höher sein, da parallel
# (Block 1) vorhanden sind max_openai_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10) # Gleiche Worker-Anzahl wie beim Scraping
required_keys = [
"Website Rohtext", "Website Zusammenfassung", "Version", "CRM Name" # AR, AS, AP, B
]
# Erstellen Sie ein Dictionary mit Schluesseln und Indizes
col_indices = {key: COLUMN_MAP.get(key) for key in required_keys}
# Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden
# wurden
if None in col_indices.values():
missing = [k for k, v in col_indices.items() if v is None]
self.logger.critical(
f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_summarization_batch: {missing}. Breche ab.") # <<< GEÄNDERT
return # Beende die Methode bei kritischem Fehler
# Ermitteln Sie die Indizes und Buchstaben fuer Updates (AS, AP)
rohtext_col_idx = col_indices["Website Rohtext"]
summary_col_idx = col_indices["Website Zusammenfassung"]
version_col_idx = col_indices["Version"]
name_col_idx = col_indices["CRM Name"] # Benoetigt fuer Logging
summary_col_letter = self.sheet_handler._get_col_letter(
summary_col_idx + 1) # Block 14 _get_col_letter
version_col_letter = self.sheet_handler._get_col_letter(
version_col_idx + 1)
# --- Verarbeitung ---
# Holen Sie die Batch-Groesse fuer OpenAI-Aufrufe aus Config (Block 1)
openai_batch_size = getattr(Config, 'OPENAI_BATCH_SIZE_LIMIT', 4)
# Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1)
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50) update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50)
# Tasks fuer den aktuellen OpenAI Batch (Liste von Dicts) tasks_for_processing_batch = []
tasks_for_openai_batch = []
# 1-basierte Zeilennummern im aktuellen OpenAI Batch
rows_in_current_openai_batch = []
# Gesammelte Updates fuer Batch-Schreiben ins Sheet (Liste von Dicts)
all_sheet_updates = [] all_sheet_updates = []
# Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den
# Batch aufgenommen werden (im Rahmen des Limits).
processed_count = 0 processed_count = 0
# Zaehlt Zeilen, die uebersprungen wurden (wegen fehlendem Rohtext oder
# vorhandener Zusammenfassung).
skipped_count = 0 skipped_count = 0
# Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte for i in range(start_sheet_row, effective_end_row + 1):
# Sheet-Zeilennummer) row_index_in_list = i - 1
for i in range(start_sheet_row, end_sheet_row + 1): if row_index_in_list >= total_sheet_rows: break
row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste
# Pruefen Sie, ob das Ende des Sheets erreicht wurde
if row_index_in_list >= total_sheet_rows:
break # Ende des Sheets erreicht
row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile row = all_data[row_index_in_list]
if not any(cell and str(cell).strip() for cell in row):
skipped_count += 1
continue
# Stellen Sie sicher, dass die Zeile nicht leer ist # Kriterium: Zusammenfassung ist leer/default UND Rohtext ist valide
if not any(cell and isinstance(cell, str) and cell.strip() summary_value = self._get_cell_value_safe(row, "Website Zusammenfassung")
for cell in row): summary_is_empty_or_default = not summary_value or str(summary_value).strip().lower() in ["k.a.", "k.a. (keine zusammenfassung erhalten)"]
# self.logger.debug(f"Zeile {i}: Uebersprungen (Leere Zeile).")
# # Zu viel Laerm im Debug
skipped_count += 1 # Zaehlen als uebersprungen
continue # Springe zur naechsten Zeile
# --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist --- raw_text = self._get_cell_value_safe(row, "Website Rohtext")
# Kriterium: Website Rohtext (AR) ist vorhanden und gueltig (nicht k.A. oder Fehlerwerte). raw_text_is_valid = raw_text and isinstance(raw_text, str) and len(raw_text) > 100 and not str(raw_text).strip().lower().startswith('k.a.')
# UND Website Zusammenfassung (AS) ist leer oder ein
# Standard-Fehlerwert.
# Holen Sie den Wert aus Spalte AR (Website Rohtext) (nutzt interne if summary_is_empty_or_default and raw_text_is_valid:
# Helfer _get_cell_value_safe) if limit is not None and processed_count >= limit:
raw_text = self._get_cell_value_safe( self.logger.info(f"Verarbeitungslimit ({limit}) erreicht.")
row, "Website Rohtext") # Block 1 Column Map break
# Pruefen Sie, ob AR gefuellt und gueltig ist.
raw_text_is_valid = raw_text and isinstance(
raw_text, str) and str(raw_text).strip().lower() not in [
"k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"]
# Holen Sie den Wert aus Spalte AS (Website Zusammenfassung) (nutzt company_name = self._get_cell_value_safe(row, "CRM Name")
# interne Helfer _get_cell_value_safe) tasks_for_processing_batch.append({"row_num": i, "raw_text": raw_text, "company_name": company_name})
summary_value = self._get_cell_value_safe(
row, "Website Zusammenfassung") # Block 1 Column Map
# Pruefen Sie, ob AS leer ist oder einen Standard-Fehlerwert
# enthaelt.
summary_is_empty_or_default = not summary_value or (
isinstance(
summary_value,
str) and str(summary_value).strip().lower() in [
"k.a.",
"k.a. (keine zusammenfassung erhalten)"])
# Verarbeitung ist noetig, wenn AR gueltig ist UND AS leer/default
# ist.
processing_needed_for_row = raw_text_is_valid and summary_is_empty_or_default
# Loggen der Pruefergebnisse fuer diese Zeile auf Debug-Level
log_check = (
i < start_sheet_row +
5) or (
i %
100 == 0) or (processing_needed_for_row)
if log_check:
company_name = self._get_cell_value_safe(
row, "CRM Name").strip() # Block 1 Column Map
self.logger.debug(
f"Zeile {i} ({company_name[:50]}... Website Summarization Check): AR gueltig? {raw_text_is_valid} (len={len(str(raw_text))}), AS leer/default? {summary_is_empty_or_default}. Benötigt Verarbeitung? {processing_needed_for_row}") # <<< GEÄNDERT
# Wenn die Verarbeitung fuer diese Zeile nicht noetig ist
if not processing_needed_for_row:
skipped_count += 1 # Zaehlen als uebersprungene Zeile
continue # Springe zur naechsten Zeile
# --- Wenn Verarbeitung noetig: Fuege zur Batch-Liste fuer OpenAI hinzu ---
# Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im
# Rahmen des Limits zaehlen)
processed_count += 1 processed_count += 1
else:
skipped_count += 1
# Pruefe das Limit fuer verarbeitete Zeilen # --- 4. Batch-Verarbeitung auslösen ---
if limit is not None and isinstance( if len(tasks_for_processing_batch) >= openai_batch_size or (i == effective_end_row and tasks_for_processing_batch):
limit, int) and limit > 0 and processed_count > limit: self.logger.info(f"--- Starte Website-Summarization Batch für {len(tasks_for_processing_batch)} Tasks (max. {max_openai_workers} Worker) ---")
# Wenn das Limit erreicht ist und es ein positives Limit gibt
self.logger.info(
f"Verarbeitungslimit ({limit}) fuer process_summarization_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT
break # Brich die Schleife ab
# Fuege die benoetigten Daten fuer den OpenAI Batch hinzu summarization_results = {}
# (Zeilennummer und Rohtext) batch_error_count = 0
tasks_for_openai_batch.append({'row_num': i, 'raw_text': raw_text})
# Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu
rows_in_current_openai_batch.append(i)
# --- Verarbeite den Batch, wenn voll --- with ThreadPoolExecutor(max_workers=max_openai_workers) as executor:
# Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat. future_to_task = {executor.submit(self._summarize_task_batch, task): task for task in tasks_for_processing_batch}
# openai_batch_size wird aus Config geholt (Block 1).
if len(tasks_for_openai_batch) >= openai_batch_size:
# Logge den Start der Batch-Verarbeitung
batch_start_row = tasks_for_openai_batch[0]['row_num']
batch_end_row = tasks_for_openai_batch[-1]['row_num']
self.logger.debug(
f"\n--- Starte Website-Summarization Batch ({len(tasks_for_openai_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
# Rufe die globale Funktion auf, die den OpenAI Call fuer den Batch macht (Block 9). for future in as_completed(future_to_task):
# summarize_batch_openai ist mit retry_on_failure dekoriert (Block 2). task = future_to_task[future]
# Wenn summarize_batch_openai eine Exception wirft (nach Retries), wird diese hier gefangen.
# !!! KORRIGIERTER AUFRUF !!!
try: try:
# Rufen Sie die korrekte globale Funktion auf result_dict = future.result()
# <<< Korrigierter Aufruf (vorher war fälschlicherweise _process_verification_openai_batch) if isinstance(result_dict, dict) and 'row_num' in result_dict:
batch_results = summarize_batch_openai( summarization_results[result_dict['row_num']] = result_dict
tasks_for_openai_batch) if result_dict.get('error'):
# Ergebnisse sollten ein Dictionary {row_num: summary_text} batch_error_count += 1
# sein, auch bei Fehlern. self.logger.warning(f"Worker meldete Fehler bei Zusammenfassung für Zeile {result_dict['row_num']}: {result_dict.get('status_message')}")
else:
self.logger.error(f"Inkonsistentes Ergebnis für Zeile {task['row_num']}: Erwartete dict mit 'row_num', bekam {type(result_dict)}. Überspringe.")
summarization_results[task['row_num']] = {'summary': "FEHLER (Inkonsistenter Rückgabetyp)", 'error': True}
batch_error_count += 1
except Exception as exc:
self.logger.error(f"Unerwarteter Fehler bei Ergebnisabfrage für Zeile {task['row_num']}: {exc}", exc_info=True)
summarization_results[task['row_num']] = {'summary': "FEHLER (Task Exception)", 'error': True}
batch_error_count += 1
# Sammle Sheet Updates (AS, AP) fuer diesen Batch self.logger.info(f" -> Zusammenfassung für Batch beendet. {len(summarization_results)} Ergebnisse erhalten ({batch_error_count} davon mit Fehlern).")
current_version = getattr(
Config, 'VERSION', 'unknown') # Block 1 Config Attribut
batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen
# Iteriere ueber die Zeilennummern, die in DIESEM OpenAI # --- 5. Updates für das Google Sheet vorbereiten ---
# Batch waren if summarization_results:
for row_num in rows_in_current_openai_batch: current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary. current_version = getattr(Config, 'VERSION', 'unknown')
# Fallback auf einen Fehlerstring, wenn das Ergebnis
# fehlt (sollte nicht passieren, wenn
# summarize_batch_openai korrekt ist).
summary = batch_results.get(
row_num, "k.A. (Batch Ergebnis fehlte)")
# Stelle sicher, dass 'k.A.' bei leeren/kurzen
# Summaries gesetzt wird
if not summary or (
isinstance(
summary,
str) and summary.strip().lower() in [
"k.a.",
"k.a. (keine zusammenfassung erhalten)"]):
summary = "k.A. (Keine Zusammenfassung erhalten)"
# Fuege "k.A." oder Fehler an, wenn der Wert von
# summarize_batch_openai ein Fehlerstring ist
elif isinstance(summary, str) and (summary.startswith("k.A. (Fehler") or summary.startswith("FEHLER:")):
pass # Behalte den Fehlerstring von summarize_batch_openai
# Fuege Updates fuer AS und AP hinzu (nutzt interne for row_num, res_dict in summarization_results.items():
# Helfer) # Zusammenfassung, Timestamp (wir überschreiben den alten Scrape-TS) und Version werden zum Update hinzugefügt
batch_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [ all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[res_dict.get('summary', 'k.A.')]]})
[summary]]}) # Block 1 Column Map all_sheet_updates.append({'range': f'{timestamp_col_letter}{row_num}', 'values': [[current_timestamp]]})
batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [ all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]})
[current_version]]}) # Block 1 Column Map
# Sammle diese Batch-Updates fuer das groessere tasks_for_processing_batch = [] # Batch leeren
# Batch-Update
all_sheet_updates.extend(batch_sheet_updates)
except Exception as e_openai_batch: # --- 6. Sheet-Update auslösen, wenn Update-Batch voll ist ---
# Wenn summarize_batch_openai eine Exception wirft (nach Retries) # Pro Zeile gibt es 3 Updates
# Der Fehler wird bereits vom retry_on_failure Decorator if len(all_sheet_updates) >= (update_batch_row_limit * 3):
# auf summarize_batch_openai geloggt. self.logger.info(f"Sende gesammelte Sheet-Updates für Zusammenfassungen ({len(all_sheet_updates) // 3} Zeilen)...")
self.logger.error( self.sheet_handler.batch_update_cells(all_sheet_updates)
f"Endgueltiger FEHLER beim OpenAI-Batch-Aufruf fuer Zusammenfassung: {e_openai_batch}") # <<< GEÄNDERT
# Logge den Traceback
self.logger.debug(traceback.format_exc()) # <<< GEÄNDERT
# Fügen Sie Fehlerwerte für alle Zeilen im Batch hinzu
current_version = getattr(
Config, 'VERSION', 'unknown') # Block 1 Config Attribut
for row_num in rows_in_current_openai_batch:
# Gekuerzt
error_summary = f"FEHLER OpenAI Batch: {str(e_openai_batch)[:100]}..."
# Fuege Updates mit Fehlerwerten fuer AS und AP hinzu
all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [
[error_summary]]}) # Block 1 Column Map
all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [
[current_version]]}) # Block 1 Column Map
# Leere den OpenAI-Batch zurueck
tasks_for_openai_batch = []
rows_in_current_openai_batch = []
# Sende gesammelte Sheet Updates, wenn das Update-Batch-Limit erreicht ist.
# Updates pro Zeile sind 2 (AS, AP). Anzahl der Zeilen =
# len(all_sheet_updates) / 2.
rows_in_update_batch = len(all_sheet_updates) // 2
if rows_in_update_batch >= update_batch_row_limit:
self.logger.debug(
f" Sende gesammelte Sheet-Updates ({rows_in_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry.
# Wenn es fehlschlaegt, wird es intern geloggt.
success = self.sheet_handler.batch_update_cells(
all_sheet_updates)
if success:
self.logger.info(
f" Sheet-Update fuer {rows_in_update_batch} Zeilen erfolgreich.") # <<< GEÄNDERT
# Der Fehlerfall wird von batch_update_cells geloggt
# Leere die gesammelten Updates nach dem Senden.
all_sheet_updates = [] all_sheet_updates = []
time.sleep(1)
# Kurze Pause nach jedem OpenAI Batch (nutzt Config Block 1). # --- 7. Finale Updates senden ---
# Dies ist wichtig, um Rate Limits zu vermeiden.
pause_duration = getattr(
Config, 'RETRY_DELAY', 5) * 0.5 # 50% der Retry-Wartezeit
self.logger.debug(
f"Warte {pause_duration:.2f}s vor naechstem Batch...") # <<< GEÄNDERT
time.sleep(pause_duration)
# --- Verarbeitung des letzten unvollstaendigen OpenAI Batches nach der Schleife ---
# Wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind.
if tasks_for_openai_batch: # Korrektur: War vorher `current_openai_batch_data`
# Logge den Start des finalen Batches
# Korrektur: War vorher `current_openai_batch_data`
batch_start_row = tasks_for_openai_batch[0]['row_num']
# Korrektur: War vorher `current_openai_batch_data`
batch_end_row = tasks_for_openai_batch[-1]['row_num']
self.logger.debug(
f"\n--- Starte FINALEN Website-Summarization Batch ({len(tasks_for_openai_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
# Rufe die globale Funktion auf, die den OpenAI Call fuer den Batch macht (Block 9).
# summarize_batch_openai ist mit retry_on_failure dekoriert (Block 2).
# Wenn summarize_batch_openai eine Exception wirft (nach Retries),
# wird diese hier gefangen.
batch_results = None
try:
# <<< Korrekter Aufruf Block 9, Korrektur: War vorher `current_openai_batch_data`
batch_results = summarize_batch_openai(tasks_for_openai_batch)
# Ergebnisse sollten ein Dictionary {row_num: summary_text}
# sein, auch bei Fehlern.
# Sammle Sheet Updates (AS, AP) fuer diesen finalen Batch
current_version = getattr(
Config, 'VERSION', 'unknown') # Block 1 Config Attribut
batch_sheet_updates = [] # Updates fuer diesen spezifischen Batch
# Iteriere ueber die Zeilennummern im Batch, fuer die
# Ergebnisse vorliegen.
for row_num in rows_in_current_openai_batch:
# Hole das Ergebnis fuer diese Zeile aus dem
# Ergebnis-Dictionary.
summary = batch_results.get(
row_num, "k.A. (Batch Ergebnis fehlte)") # Fallback
# Stelle sicher, dass 'k.A.' bei leeren/kurzen Summaries
# gesetzt wird
if not summary or (
isinstance(
summary,
str) and summary.strip().lower() in [
"k.a.",
"k.a. (keine zusammenfassung erhalten)"]):
summary = "k.A. (Keine Zusammenfassung erhalten)"
# Fuege "k.A." oder Fehler an, wenn der Wert von
# summarize_batch_openai ein Fehlerstring ist
elif isinstance(summary, str) and (summary.startswith("k.A. (Fehler") or summary.startswith("FEHLER:")):
pass # Behalte den Fehlerstring von summarize_batch_openai
# Fuege Updates fuer AS und AP hinzu
batch_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [
[summary]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [
[current_version]]}) # Block 1 Column Map
# Fuege diese Updates zur globalen Liste hinzu (wird dann nur
# noch einmal gesendet)
all_sheet_updates.extend(batch_sheet_updates)
except Exception as e_openai_batch:
# Wenn summarize_batch_openai eine Exception wirft (nach Retries)
# Der Fehler wird bereits vom retry_on_failure Decorator auf
# summarize_batch_openai geloggt.
self.logger.error(
f"Endgueltiger FEHLER beim FINALEN OpenAI-Batch-Aufruf fuer Zusammenfassung: {e_openai_batch}") # <<< GEÄNDERT
# Logge den Traceback
self.logger.debug(traceback.format_exc()) # <<< GEÄNDERT
# Fügen Sie Fehlerwerte für alle Zeilen im Batch hinzu
current_version = getattr(
Config, 'VERSION', 'unknown') # Block 1 Config Attribut
for row_num in rows_in_current_openai_batch:
# Gekuerzt
error_summary = f"FEHLER OpenAI Batch: {str(e_openai_batch)[:100]}..."
# Fuege Updates mit Fehlerwerten fuer AS und AP hinzu
all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [
[error_summary]]}) # Block 1 Column Map
all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [
[current_version]]}) # Block 1 Column Map
# --- Finale Sheet Updates senden ---
# Sende alle verbleibenden gesammelten Updates in einem letzten
# Batch-Update.
if all_sheet_updates: if all_sheet_updates:
rows_in_final_update_batch = len( self.logger.info(f"Sende finale gesammelte Sheet-Updates für Zusammenfassungen ({len(all_sheet_updates) // 3} Zeilen)...")
all_sheet_updates) // 2 # Updates pro Zeile ist 2 self.sheet_handler.batch_update_cells(all_sheet_updates)
self.logger.info(
f"Sende FINALE gesammelte Sheet-Updates ({rows_in_final_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT self.logger.info(f"Website-Zusammenfassung (Batch) abgeschlossen. {processed_count} Zeilen zur Verarbeitung ausgewählt, {skipped_count} Zeilen übersprungen.")
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block
# 14) mit Retry.
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
if success:
# <<< GEÄNDERT
self.logger.info(f"FINALES Sheet-Update erfolgreich.")
# Der Fehlerfall wird von batch_update_cells geloggt
# Logge den Abschluss des Modus
self.logger.info(
f"Website-Zusammenfassung (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen.") # <<< GEÄNDERT
def evaluate_branch_task(self, task_data, openai_semaphore): def evaluate_branch_task(self, task_data, openai_semaphore):
""" """