data_processor.py aktualisiert
This commit is contained in:
@@ -17,6 +17,7 @@ import threading
|
||||
import pickle
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
import pandas as pd
|
||||
@@ -1008,6 +1009,45 @@ class DataProcessor:
|
||||
# Das `result` Dictionary wird mit den initialen Fehlerwerten zurückgegeben.
|
||||
return result
|
||||
|
||||
def _summarize_task_batch(self, task_info):
|
||||
"""
|
||||
Robuste Worker-Funktion für die parallele Website-Zusammenfassung.
|
||||
Wird vom ThreadPoolExecutor in `process_summarize_website` aufgerufen.
|
||||
Gibt IMMER ein strukturiertes Dictionary zurück.
|
||||
"""
|
||||
row_num = task_info['row_num']
|
||||
raw_text = task_info['raw_text']
|
||||
company_name = task_info.get('company_name', 'einem Unternehmen')
|
||||
self.logger.debug(f" -> Batch-Summarize-Task gestartet für Zeile {row_num}...")
|
||||
|
||||
result = {
|
||||
'row_num': row_num,
|
||||
'summary': 'k.A. (Fehler im Task)',
|
||||
'error': True,
|
||||
'status_message': 'Unbekannter Task-Fehler'
|
||||
}
|
||||
|
||||
try:
|
||||
# Ruft die gehärtete Single-Item-Funktion aus helpers.py auf
|
||||
summary_text = summarize_website_content(raw_text, company_name)
|
||||
|
||||
if summary_text and not summary_text.lower().startswith('k.a.'):
|
||||
result['summary'] = summary_text
|
||||
result['error'] = False
|
||||
result['status_message'] = 'Erfolgreich zusammengefasst'
|
||||
else:
|
||||
result['summary'] = summary_text # Fehlergrund übernehmen
|
||||
result['error'] = True
|
||||
result['status_message'] = 'Zusammenfassung fehlgeschlagen oder Text zu kurz'
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f" -> Kritischer Fehler im Worker-Task `_summarize_task_batch` für Zeile {row_num}: {e}")
|
||||
result['summary'] = f"FEHLER (API-Fehler bei Zusammenfassung)"
|
||||
result['status_message'] = f"Kritischer Task-Fehler: {type(e).__name__}"
|
||||
return result
|
||||
|
||||
|
||||
|
||||
def process_rows_sequentially(
|
||||
@@ -2195,437 +2235,140 @@ class DataProcessor:
|
||||
f"Wikipedia-Verifizierungs-Batch abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen ({skipped_no_wiki_url} wegen fehlender M-URL).") # <<< GEÄNDERT
|
||||
|
||||
|
||||
def process_summarization_batch(
|
||||
self,
|
||||
start_sheet_row=None,
|
||||
end_sheet_row=None,
|
||||
limit=None):
|
||||
def process_summarize_website(self, start_sheet_row=None, end_sheet_row=None, limit=None):
|
||||
"""
|
||||
Batch-Prozess NUR fuer Website-Zusammenfassung.
|
||||
Batch-Prozess NUR für Website-Zusammenfassung.
|
||||
Nutzt einen ThreadPoolExecutor für echte parallele Verarbeitung und verbesserte Stabilität.
|
||||
"""
|
||||
self.logger.info(
|
||||
f"Starte Website-Zusammenfassung (Batch). Bereich: {start_sheet_row if start_sheet_row else 'Start'}-{end_sheet_row if end_sheet_row else 'Ende'}, Limit: {limit if limit else 'Unbegrenzt'}")
|
||||
self.logger.info(f"Starte Website-Zusammenfassung (Parallel Batch, v2.0.3). Bereich: {start_sheet_row or 'Start'}-{end_sheet_row or 'Ende'}, Limit: {limit or 'Unbegrenzt'}")
|
||||
|
||||
# --- 1. Daten laden und Startzeile ermitteln ---
|
||||
if start_sheet_row is None:
|
||||
start_data_index = self.sheet_handler.get_start_row_index(
|
||||
check_column_key="Website Zusammenfassung")
|
||||
if start_data_index == -1:
|
||||
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren 'Website Zusammenfassung'...")
|
||||
start_data_idx = self.sheet_handler.get_start_row_index(check_column_key="Website Zusammenfassung")
|
||||
if start_data_idx == -1:
|
||||
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.")
|
||||
return
|
||||
start_sheet_row = start_data_index + self.sheet_handler._header_rows + 1
|
||||
else:
|
||||
if not self.sheet_handler.load_data():
|
||||
return
|
||||
"""
|
||||
Batch-Prozess NUR fuer Website-Zusammenfassung (AS).
|
||||
Laedt Daten neu, prueft, ob Rohtext (AR) vorhanden und Zusammenfassung (AS) fehlt.
|
||||
Fasst Rohtexte im Batch ueber OpenAI zusammen und setzt AS + AP.
|
||||
start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1
|
||||
self.logger.info(f"Automatisch ermittelte Startzeile: {start_sheet_row}")
|
||||
|
||||
if not self.sheet_handler.load_data():
|
||||
self.logger.error("FEHLER beim Laden der Daten für Summarization-Batch.")
|
||||
return
|
||||
|
||||
Args:
|
||||
start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AS).
|
||||
end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet).
|
||||
limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt).
|
||||
"""
|
||||
# Verwenden Sie logger, da das Logging jetzt konfiguriert ist
|
||||
# Logge die Konfiguration des Batch-Laufs
|
||||
self.logger.info(
|
||||
f"Starte Website-Zusammenfassung (Batch AS, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT
|
||||
|
||||
# --- Daten laden und Startzeile ermitteln ---
|
||||
# Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt
|
||||
if start_sheet_row is None:
|
||||
self.logger.info(
|
||||
"Automatische Ermittlung der Startzeile basierend auf leeren AS...") # <<< GEÄNDERT
|
||||
# Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AS (Block 1 Column Map).
|
||||
# Standardmaessig ab Zeile 7
|
||||
start_data_index_no_header = self.sheet_handler.get_start_row_index(
|
||||
check_column_key="Website Zusammenfassung", min_sheet_row=7)
|
||||
|
||||
# Wenn get_start_row_index -1 zurueckgibt (Fehler)
|
||||
if start_data_index_no_header == -1:
|
||||
self.logger.error(
|
||||
"FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT
|
||||
return # Beende die Methode
|
||||
|
||||
# Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten
|
||||
# Daten-Index
|
||||
start_sheet_row = start_data_index_no_header + \
|
||||
self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut
|
||||
self.logger.info(
|
||||
f"Automatisch ermittelte Startzeile (erste leere AS Zelle): {start_sheet_row}") # <<< GEÄNDERT
|
||||
else:
|
||||
# Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein.
|
||||
# Der load_data Aufruf ist mit retry_on_failure dekoriert (Block
|
||||
# 2).
|
||||
if not self.sheet_handler.load_data():
|
||||
self.logger.error(
|
||||
"FEHLER beim Laden der Daten fuer process_summarization_batch.") # <<< GEÄNDERT
|
||||
return # Beende die Methode, wenn das Laden fehlschlaegt
|
||||
|
||||
# Holen Sie die gesamte Datenliste (inklusive Header) aus dem
|
||||
# SheetHandler.
|
||||
all_data = self.sheet_handler.get_all_data_with_headers()
|
||||
# Annahme: header_rows ist als Attribut im SheetHandler verfuegbar
|
||||
# (Block 14).
|
||||
header_rows = self.sheet_handler._header_rows
|
||||
total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet
|
||||
total_sheet_rows = len(all_data)
|
||||
effective_end_row = end_sheet_row if end_sheet_row is not None else total_sheet_rows
|
||||
|
||||
self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {effective_end_row}.")
|
||||
if start_sheet_row > effective_end_row:
|
||||
self.logger.info("Start liegt nach dem Ende. Keine Zeilen zu verarbeiten.")
|
||||
return
|
||||
|
||||
# Berechne Endzeile, wenn nicht manuell gesetzt
|
||||
if end_sheet_row is None:
|
||||
end_sheet_row = total_sheet_rows # Bis zur letzten Zeile
|
||||
# --- 2. Spalten-Indizes und Buchstaben vorbereiten ---
|
||||
summary_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Zusammenfassung") + 1)
|
||||
version_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Version") + 1)
|
||||
# Wir benötigen auch einen Timestamp für die Zusammenfassung. Da keiner existiert, nutzen wir den Scrape-Timestamp neu.
|
||||
timestamp_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)
|
||||
|
||||
# Logge den verarbeitungsbereich
|
||||
self.logger.info(
|
||||
f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT
|
||||
|
||||
# Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht
|
||||
# ueber Gesamtzeilen)
|
||||
if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows:
|
||||
self.logger.info(
|
||||
"Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT
|
||||
return # Beende die Methode, wenn der Bereich leer ist
|
||||
|
||||
# --- Indizes und Buchstaben ---
|
||||
# Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP
|
||||
# (Block 1) vorhanden sind
|
||||
required_keys = [
|
||||
"Website Rohtext", "Website Zusammenfassung", "Version", "CRM Name" # AR, AS, AP, B
|
||||
]
|
||||
# Erstellen Sie ein Dictionary mit Schluesseln und Indizes
|
||||
col_indices = {key: COLUMN_MAP.get(key) for key in required_keys}
|
||||
|
||||
# Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden
|
||||
# wurden
|
||||
if None in col_indices.values():
|
||||
missing = [k for k, v in col_indices.items() if v is None]
|
||||
self.logger.critical(
|
||||
f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_summarization_batch: {missing}. Breche ab.") # <<< GEÄNDERT
|
||||
return # Beende die Methode bei kritischem Fehler
|
||||
|
||||
# Ermitteln Sie die Indizes und Buchstaben fuer Updates (AS, AP)
|
||||
rohtext_col_idx = col_indices["Website Rohtext"]
|
||||
summary_col_idx = col_indices["Website Zusammenfassung"]
|
||||
version_col_idx = col_indices["Version"]
|
||||
name_col_idx = col_indices["CRM Name"] # Benoetigt fuer Logging
|
||||
|
||||
summary_col_letter = self.sheet_handler._get_col_letter(
|
||||
summary_col_idx + 1) # Block 14 _get_col_letter
|
||||
version_col_letter = self.sheet_handler._get_col_letter(
|
||||
version_col_idx + 1)
|
||||
|
||||
# --- Verarbeitung ---
|
||||
# Holen Sie die Batch-Groesse fuer OpenAI-Aufrufe aus Config (Block 1)
|
||||
openai_batch_size = getattr(Config, 'OPENAI_BATCH_SIZE_LIMIT', 4)
|
||||
# Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1)
|
||||
# --- 3. Tasks sammeln ---
|
||||
openai_batch_size = getattr(Config, 'OPENAI_BATCH_SIZE_LIMIT', 10) # Kann höher sein, da parallel
|
||||
max_openai_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10) # Gleiche Worker-Anzahl wie beim Scraping
|
||||
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50)
|
||||
|
||||
# Tasks fuer den aktuellen OpenAI Batch (Liste von Dicts)
|
||||
tasks_for_openai_batch = []
|
||||
# 1-basierte Zeilennummern im aktuellen OpenAI Batch
|
||||
rows_in_current_openai_batch = []
|
||||
# Gesammelte Updates fuer Batch-Schreiben ins Sheet (Liste von Dicts)
|
||||
tasks_for_processing_batch = []
|
||||
all_sheet_updates = []
|
||||
|
||||
# Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den
|
||||
# Batch aufgenommen werden (im Rahmen des Limits).
|
||||
processed_count = 0
|
||||
# Zaehlt Zeilen, die uebersprungen wurden (wegen fehlendem Rohtext oder
|
||||
# vorhandener Zusammenfassung).
|
||||
skipped_count = 0
|
||||
|
||||
# Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte
|
||||
# Sheet-Zeilennummer)
|
||||
for i in range(start_sheet_row, end_sheet_row + 1):
|
||||
row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste
|
||||
# Pruefen Sie, ob das Ende des Sheets erreicht wurde
|
||||
if row_index_in_list >= total_sheet_rows:
|
||||
break # Ende des Sheets erreicht
|
||||
for i in range(start_sheet_row, effective_end_row + 1):
|
||||
row_index_in_list = i - 1
|
||||
if row_index_in_list >= total_sheet_rows: break
|
||||
|
||||
row = all_data[row_index_in_list]
|
||||
if not any(cell and str(cell).strip() for cell in row):
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile
|
||||
# Kriterium: Zusammenfassung ist leer/default UND Rohtext ist valide
|
||||
summary_value = self._get_cell_value_safe(row, "Website Zusammenfassung")
|
||||
summary_is_empty_or_default = not summary_value or str(summary_value).strip().lower() in ["k.a.", "k.a. (keine zusammenfassung erhalten)"]
|
||||
|
||||
raw_text = self._get_cell_value_safe(row, "Website Rohtext")
|
||||
raw_text_is_valid = raw_text and isinstance(raw_text, str) and len(raw_text) > 100 and not str(raw_text).strip().lower().startswith('k.a.')
|
||||
|
||||
# Stellen Sie sicher, dass die Zeile nicht leer ist
|
||||
if not any(cell and isinstance(cell, str) and cell.strip()
|
||||
for cell in row):
|
||||
# self.logger.debug(f"Zeile {i}: Uebersprungen (Leere Zeile).")
|
||||
# # Zu viel Laerm im Debug
|
||||
skipped_count += 1 # Zaehlen als uebersprungen
|
||||
continue # Springe zur naechsten Zeile
|
||||
if summary_is_empty_or_default and raw_text_is_valid:
|
||||
if limit is not None and processed_count >= limit:
|
||||
self.logger.info(f"Verarbeitungslimit ({limit}) erreicht.")
|
||||
break
|
||||
|
||||
company_name = self._get_cell_value_safe(row, "CRM Name")
|
||||
tasks_for_processing_batch.append({"row_num": i, "raw_text": raw_text, "company_name": company_name})
|
||||
processed_count += 1
|
||||
else:
|
||||
skipped_count += 1
|
||||
|
||||
# --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist ---
|
||||
# Kriterium: Website Rohtext (AR) ist vorhanden und gueltig (nicht k.A. oder Fehlerwerte).
|
||||
# UND Website Zusammenfassung (AS) ist leer oder ein
|
||||
# Standard-Fehlerwert.
|
||||
# --- 4. Batch-Verarbeitung auslösen ---
|
||||
if len(tasks_for_processing_batch) >= openai_batch_size or (i == effective_end_row and tasks_for_processing_batch):
|
||||
self.logger.info(f"--- Starte Website-Summarization Batch für {len(tasks_for_processing_batch)} Tasks (max. {max_openai_workers} Worker) ---")
|
||||
|
||||
summarization_results = {}
|
||||
batch_error_count = 0
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_openai_workers) as executor:
|
||||
future_to_task = {executor.submit(self._summarize_task_batch, task): task for task in tasks_for_processing_batch}
|
||||
|
||||
for future in as_completed(future_to_task):
|
||||
task = future_to_task[future]
|
||||
try:
|
||||
result_dict = future.result()
|
||||
if isinstance(result_dict, dict) and 'row_num' in result_dict:
|
||||
summarization_results[result_dict['row_num']] = result_dict
|
||||
if result_dict.get('error'):
|
||||
batch_error_count += 1
|
||||
self.logger.warning(f"Worker meldete Fehler bei Zusammenfassung für Zeile {result_dict['row_num']}: {result_dict.get('status_message')}")
|
||||
else:
|
||||
self.logger.error(f"Inkonsistentes Ergebnis für Zeile {task['row_num']}: Erwartete dict mit 'row_num', bekam {type(result_dict)}. Überspringe.")
|
||||
summarization_results[task['row_num']] = {'summary': "FEHLER (Inkonsistenter Rückgabetyp)", 'error': True}
|
||||
batch_error_count += 1
|
||||
except Exception as exc:
|
||||
self.logger.error(f"Unerwarteter Fehler bei Ergebnisabfrage für Zeile {task['row_num']}: {exc}", exc_info=True)
|
||||
summarization_results[task['row_num']] = {'summary': "FEHLER (Task Exception)", 'error': True}
|
||||
batch_error_count += 1
|
||||
|
||||
self.logger.info(f" -> Zusammenfassung für Batch beendet. {len(summarization_results)} Ergebnisse erhalten ({batch_error_count} davon mit Fehlern).")
|
||||
|
||||
# Holen Sie den Wert aus Spalte AR (Website Rohtext) (nutzt interne
|
||||
# Helfer _get_cell_value_safe)
|
||||
raw_text = self._get_cell_value_safe(
|
||||
row, "Website Rohtext") # Block 1 Column Map
|
||||
# Pruefen Sie, ob AR gefuellt und gueltig ist.
|
||||
raw_text_is_valid = raw_text and isinstance(
|
||||
raw_text, str) and str(raw_text).strip().lower() not in [
|
||||
"k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"]
|
||||
# --- 5. Updates für das Google Sheet vorbereiten ---
|
||||
if summarization_results:
|
||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
current_version = getattr(Config, 'VERSION', 'unknown')
|
||||
|
||||
for row_num, res_dict in summarization_results.items():
|
||||
# Zusammenfassung, Timestamp (wir überschreiben den alten Scrape-TS) und Version werden zum Update hinzugefügt
|
||||
all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[res_dict.get('summary', 'k.A.')]]})
|
||||
all_sheet_updates.append({'range': f'{timestamp_col_letter}{row_num}', 'values': [[current_timestamp]]})
|
||||
all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]})
|
||||
|
||||
# Holen Sie den Wert aus Spalte AS (Website Zusammenfassung) (nutzt
|
||||
# interne Helfer _get_cell_value_safe)
|
||||
summary_value = self._get_cell_value_safe(
|
||||
row, "Website Zusammenfassung") # Block 1 Column Map
|
||||
# Pruefen Sie, ob AS leer ist oder einen Standard-Fehlerwert
|
||||
# enthaelt.
|
||||
summary_is_empty_or_default = not summary_value or (
|
||||
isinstance(
|
||||
summary_value,
|
||||
str) and str(summary_value).strip().lower() in [
|
||||
"k.a.",
|
||||
"k.a. (keine zusammenfassung erhalten)"])
|
||||
tasks_for_processing_batch = [] # Batch leeren
|
||||
|
||||
# Verarbeitung ist noetig, wenn AR gueltig ist UND AS leer/default
|
||||
# ist.
|
||||
processing_needed_for_row = raw_text_is_valid and summary_is_empty_or_default
|
||||
|
||||
# Loggen der Pruefergebnisse fuer diese Zeile auf Debug-Level
|
||||
log_check = (
|
||||
i < start_sheet_row +
|
||||
5) or (
|
||||
i %
|
||||
100 == 0) or (processing_needed_for_row)
|
||||
if log_check:
|
||||
company_name = self._get_cell_value_safe(
|
||||
row, "CRM Name").strip() # Block 1 Column Map
|
||||
self.logger.debug(
|
||||
f"Zeile {i} ({company_name[:50]}... Website Summarization Check): AR gueltig? {raw_text_is_valid} (len={len(str(raw_text))}), AS leer/default? {summary_is_empty_or_default}. Benötigt Verarbeitung? {processing_needed_for_row}") # <<< GEÄNDERT
|
||||
|
||||
# Wenn die Verarbeitung fuer diese Zeile nicht noetig ist
|
||||
if not processing_needed_for_row:
|
||||
skipped_count += 1 # Zaehlen als uebersprungene Zeile
|
||||
continue # Springe zur naechsten Zeile
|
||||
|
||||
# --- Wenn Verarbeitung noetig: Fuege zur Batch-Liste fuer OpenAI hinzu ---
|
||||
# Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im
|
||||
# Rahmen des Limits zaehlen)
|
||||
processed_count += 1
|
||||
|
||||
# Pruefe das Limit fuer verarbeitete Zeilen
|
||||
if limit is not None and isinstance(
|
||||
limit, int) and limit > 0 and processed_count > limit:
|
||||
# Wenn das Limit erreicht ist und es ein positives Limit gibt
|
||||
self.logger.info(
|
||||
f"Verarbeitungslimit ({limit}) fuer process_summarization_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT
|
||||
break # Brich die Schleife ab
|
||||
|
||||
# Fuege die benoetigten Daten fuer den OpenAI Batch hinzu
|
||||
# (Zeilennummer und Rohtext)
|
||||
tasks_for_openai_batch.append({'row_num': i, 'raw_text': raw_text})
|
||||
# Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu
|
||||
rows_in_current_openai_batch.append(i)
|
||||
|
||||
# --- Verarbeite den Batch, wenn voll ---
|
||||
# Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat.
|
||||
# openai_batch_size wird aus Config geholt (Block 1).
|
||||
if len(tasks_for_openai_batch) >= openai_batch_size:
|
||||
# Logge den Start der Batch-Verarbeitung
|
||||
batch_start_row = tasks_for_openai_batch[0]['row_num']
|
||||
batch_end_row = tasks_for_openai_batch[-1]['row_num']
|
||||
self.logger.debug(
|
||||
f"\n--- Starte Website-Summarization Batch ({len(tasks_for_openai_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
|
||||
|
||||
# Rufe die globale Funktion auf, die den OpenAI Call fuer den Batch macht (Block 9).
|
||||
# summarize_batch_openai ist mit retry_on_failure dekoriert (Block 2).
|
||||
# Wenn summarize_batch_openai eine Exception wirft (nach Retries), wird diese hier gefangen.
|
||||
# !!! KORRIGIERTER AUFRUF !!!
|
||||
try:
|
||||
# Rufen Sie die korrekte globale Funktion auf
|
||||
# <<< Korrigierter Aufruf (vorher war fälschlicherweise _process_verification_openai_batch)
|
||||
batch_results = summarize_batch_openai(
|
||||
tasks_for_openai_batch)
|
||||
# Ergebnisse sollten ein Dictionary {row_num: summary_text}
|
||||
# sein, auch bei Fehlern.
|
||||
|
||||
# Sammle Sheet Updates (AS, AP) fuer diesen Batch
|
||||
current_version = getattr(
|
||||
Config, 'VERSION', 'unknown') # Block 1 Config Attribut
|
||||
batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen
|
||||
|
||||
# Iteriere ueber die Zeilennummern, die in DIESEM OpenAI
|
||||
# Batch waren
|
||||
for row_num in rows_in_current_openai_batch:
|
||||
# Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary.
|
||||
# Fallback auf einen Fehlerstring, wenn das Ergebnis
|
||||
# fehlt (sollte nicht passieren, wenn
|
||||
# summarize_batch_openai korrekt ist).
|
||||
summary = batch_results.get(
|
||||
row_num, "k.A. (Batch Ergebnis fehlte)")
|
||||
# Stelle sicher, dass 'k.A.' bei leeren/kurzen
|
||||
# Summaries gesetzt wird
|
||||
if not summary or (
|
||||
isinstance(
|
||||
summary,
|
||||
str) and summary.strip().lower() in [
|
||||
"k.a.",
|
||||
"k.a. (keine zusammenfassung erhalten)"]):
|
||||
summary = "k.A. (Keine Zusammenfassung erhalten)"
|
||||
# Fuege "k.A." oder Fehler an, wenn der Wert von
|
||||
# summarize_batch_openai ein Fehlerstring ist
|
||||
elif isinstance(summary, str) and (summary.startswith("k.A. (Fehler") or summary.startswith("FEHLER:")):
|
||||
pass # Behalte den Fehlerstring von summarize_batch_openai
|
||||
|
||||
# Fuege Updates fuer AS und AP hinzu (nutzt interne
|
||||
# Helfer)
|
||||
batch_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [
|
||||
[summary]]}) # Block 1 Column Map
|
||||
batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [
|
||||
[current_version]]}) # Block 1 Column Map
|
||||
|
||||
# Sammle diese Batch-Updates fuer das groessere
|
||||
# Batch-Update
|
||||
all_sheet_updates.extend(batch_sheet_updates)
|
||||
|
||||
except Exception as e_openai_batch:
|
||||
# Wenn summarize_batch_openai eine Exception wirft (nach Retries)
|
||||
# Der Fehler wird bereits vom retry_on_failure Decorator
|
||||
# auf summarize_batch_openai geloggt.
|
||||
self.logger.error(
|
||||
f"Endgueltiger FEHLER beim OpenAI-Batch-Aufruf fuer Zusammenfassung: {e_openai_batch}") # <<< GEÄNDERT
|
||||
# Logge den Traceback
|
||||
self.logger.debug(traceback.format_exc()) # <<< GEÄNDERT
|
||||
# Fügen Sie Fehlerwerte für alle Zeilen im Batch hinzu
|
||||
current_version = getattr(
|
||||
Config, 'VERSION', 'unknown') # Block 1 Config Attribut
|
||||
for row_num in rows_in_current_openai_batch:
|
||||
# Gekuerzt
|
||||
error_summary = f"FEHLER OpenAI Batch: {str(e_openai_batch)[:100]}..."
|
||||
# Fuege Updates mit Fehlerwerten fuer AS und AP hinzu
|
||||
all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [
|
||||
[error_summary]]}) # Block 1 Column Map
|
||||
all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [
|
||||
[current_version]]}) # Block 1 Column Map
|
||||
|
||||
# Leere den OpenAI-Batch zurueck
|
||||
tasks_for_openai_batch = []
|
||||
rows_in_current_openai_batch = []
|
||||
|
||||
# Sende gesammelte Sheet Updates, wenn das Update-Batch-Limit erreicht ist.
|
||||
# Updates pro Zeile sind 2 (AS, AP). Anzahl der Zeilen =
|
||||
# len(all_sheet_updates) / 2.
|
||||
rows_in_update_batch = len(all_sheet_updates) // 2
|
||||
|
||||
if rows_in_update_batch >= update_batch_row_limit:
|
||||
self.logger.debug(
|
||||
f" Sende gesammelte Sheet-Updates ({rows_in_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT
|
||||
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry.
|
||||
# Wenn es fehlschlaegt, wird es intern geloggt.
|
||||
success = self.sheet_handler.batch_update_cells(
|
||||
all_sheet_updates)
|
||||
if success:
|
||||
self.logger.info(
|
||||
f" Sheet-Update fuer {rows_in_update_batch} Zeilen erfolgreich.") # <<< GEÄNDERT
|
||||
# Der Fehlerfall wird von batch_update_cells geloggt
|
||||
|
||||
# Leere die gesammelten Updates nach dem Senden.
|
||||
# --- 6. Sheet-Update auslösen, wenn Update-Batch voll ist ---
|
||||
# Pro Zeile gibt es 3 Updates
|
||||
if len(all_sheet_updates) >= (update_batch_row_limit * 3):
|
||||
self.logger.info(f"Sende gesammelte Sheet-Updates für Zusammenfassungen ({len(all_sheet_updates) // 3} Zeilen)...")
|
||||
self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||
all_sheet_updates = []
|
||||
time.sleep(1)
|
||||
|
||||
# Kurze Pause nach jedem OpenAI Batch (nutzt Config Block 1).
|
||||
# Dies ist wichtig, um Rate Limits zu vermeiden.
|
||||
pause_duration = getattr(
|
||||
Config, 'RETRY_DELAY', 5) * 0.5 # 50% der Retry-Wartezeit
|
||||
self.logger.debug(
|
||||
f"Warte {pause_duration:.2f}s vor naechstem Batch...") # <<< GEÄNDERT
|
||||
time.sleep(pause_duration)
|
||||
|
||||
# --- Verarbeitung des letzten unvollstaendigen OpenAI Batches nach der Schleife ---
|
||||
# Wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind.
|
||||
if tasks_for_openai_batch: # Korrektur: War vorher `current_openai_batch_data`
|
||||
# Logge den Start des finalen Batches
|
||||
# Korrektur: War vorher `current_openai_batch_data`
|
||||
batch_start_row = tasks_for_openai_batch[0]['row_num']
|
||||
# Korrektur: War vorher `current_openai_batch_data`
|
||||
batch_end_row = tasks_for_openai_batch[-1]['row_num']
|
||||
self.logger.debug(
|
||||
f"\n--- Starte FINALEN Website-Summarization Batch ({len(tasks_for_openai_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
|
||||
|
||||
# Rufe die globale Funktion auf, die den OpenAI Call fuer den Batch macht (Block 9).
|
||||
# summarize_batch_openai ist mit retry_on_failure dekoriert (Block 2).
|
||||
# Wenn summarize_batch_openai eine Exception wirft (nach Retries),
|
||||
# wird diese hier gefangen.
|
||||
batch_results = None
|
||||
try:
|
||||
# <<< Korrekter Aufruf Block 9, Korrektur: War vorher `current_openai_batch_data`
|
||||
batch_results = summarize_batch_openai(tasks_for_openai_batch)
|
||||
# Ergebnisse sollten ein Dictionary {row_num: summary_text}
|
||||
# sein, auch bei Fehlern.
|
||||
|
||||
# Sammle Sheet Updates (AS, AP) fuer diesen finalen Batch
|
||||
current_version = getattr(
|
||||
Config, 'VERSION', 'unknown') # Block 1 Config Attribut
|
||||
batch_sheet_updates = [] # Updates fuer diesen spezifischen Batch
|
||||
# Iteriere ueber die Zeilennummern im Batch, fuer die
|
||||
# Ergebnisse vorliegen.
|
||||
for row_num in rows_in_current_openai_batch:
|
||||
# Hole das Ergebnis fuer diese Zeile aus dem
|
||||
# Ergebnis-Dictionary.
|
||||
summary = batch_results.get(
|
||||
row_num, "k.A. (Batch Ergebnis fehlte)") # Fallback
|
||||
|
||||
# Stelle sicher, dass 'k.A.' bei leeren/kurzen Summaries
|
||||
# gesetzt wird
|
||||
if not summary or (
|
||||
isinstance(
|
||||
summary,
|
||||
str) and summary.strip().lower() in [
|
||||
"k.a.",
|
||||
"k.a. (keine zusammenfassung erhalten)"]):
|
||||
summary = "k.A. (Keine Zusammenfassung erhalten)"
|
||||
# Fuege "k.A." oder Fehler an, wenn der Wert von
|
||||
# summarize_batch_openai ein Fehlerstring ist
|
||||
elif isinstance(summary, str) and (summary.startswith("k.A. (Fehler") or summary.startswith("FEHLER:")):
|
||||
pass # Behalte den Fehlerstring von summarize_batch_openai
|
||||
|
||||
# Fuege Updates fuer AS und AP hinzu
|
||||
batch_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [
|
||||
[summary]]}) # Block 1 Column Map
|
||||
batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [
|
||||
[current_version]]}) # Block 1 Column Map
|
||||
|
||||
# Fuege diese Updates zur globalen Liste hinzu (wird dann nur
|
||||
# noch einmal gesendet)
|
||||
all_sheet_updates.extend(batch_sheet_updates)
|
||||
|
||||
except Exception as e_openai_batch:
|
||||
# Wenn summarize_batch_openai eine Exception wirft (nach Retries)
|
||||
# Der Fehler wird bereits vom retry_on_failure Decorator auf
|
||||
# summarize_batch_openai geloggt.
|
||||
self.logger.error(
|
||||
f"Endgueltiger FEHLER beim FINALEN OpenAI-Batch-Aufruf fuer Zusammenfassung: {e_openai_batch}") # <<< GEÄNDERT
|
||||
# Logge den Traceback
|
||||
self.logger.debug(traceback.format_exc()) # <<< GEÄNDERT
|
||||
# Fügen Sie Fehlerwerte für alle Zeilen im Batch hinzu
|
||||
current_version = getattr(
|
||||
Config, 'VERSION', 'unknown') # Block 1 Config Attribut
|
||||
for row_num in rows_in_current_openai_batch:
|
||||
# Gekuerzt
|
||||
error_summary = f"FEHLER OpenAI Batch: {str(e_openai_batch)[:100]}..."
|
||||
# Fuege Updates mit Fehlerwerten fuer AS und AP hinzu
|
||||
all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [
|
||||
[error_summary]]}) # Block 1 Column Map
|
||||
all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [
|
||||
[current_version]]}) # Block 1 Column Map
|
||||
|
||||
# --- Finale Sheet Updates senden ---
|
||||
# Sende alle verbleibenden gesammelten Updates in einem letzten
|
||||
# Batch-Update.
|
||||
# --- 7. Finale Updates senden ---
|
||||
if all_sheet_updates:
|
||||
rows_in_final_update_batch = len(
|
||||
all_sheet_updates) // 2 # Updates pro Zeile ist 2
|
||||
self.logger.info(
|
||||
f"Sende FINALE gesammelte Sheet-Updates ({rows_in_final_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT
|
||||
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block
|
||||
# 14) mit Retry.
|
||||
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||
if success:
|
||||
# <<< GEÄNDERT
|
||||
self.logger.info(f"FINALES Sheet-Update erfolgreich.")
|
||||
# Der Fehlerfall wird von batch_update_cells geloggt
|
||||
self.logger.info(f"Sende finale gesammelte Sheet-Updates für Zusammenfassungen ({len(all_sheet_updates) // 3} Zeilen)...")
|
||||
self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||
|
||||
self.logger.info(f"Website-Zusammenfassung (Batch) abgeschlossen. {processed_count} Zeilen zur Verarbeitung ausgewählt, {skipped_count} Zeilen übersprungen.")
|
||||
|
||||
|
||||
# Logge den Abschluss des Modus
|
||||
self.logger.info(
|
||||
f"Website-Zusammenfassung (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen.") # <<< GEÄNDERT
|
||||
|
||||
def evaluate_branch_task(self, task_data, openai_semaphore):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user