diff --git a/data_processor.py b/data_processor.py index f7696a39..5e09c7bf 100644 --- a/data_processor.py +++ b/data_processor.py @@ -17,6 +17,7 @@ import threading import pickle import json import os +import re from datetime import datetime import pandas as pd @@ -1008,6 +1009,45 @@ class DataProcessor: # Das `result` Dictionary wird mit den initialen Fehlerwerten zurückgegeben. return result +def _summarize_task_batch(self, task_info): + """ + Robuste Worker-Funktion für die parallele Website-Zusammenfassung. + Wird vom ThreadPoolExecutor in `process_summarize_website` aufgerufen. + Gibt IMMER ein strukturiertes Dictionary zurück. + """ + row_num = task_info['row_num'] + raw_text = task_info['raw_text'] + company_name = task_info.get('company_name', 'einem Unternehmen') + self.logger.debug(f" -> Batch-Summarize-Task gestartet für Zeile {row_num}...") + + result = { + 'row_num': row_num, + 'summary': 'k.A. (Fehler im Task)', + 'error': True, + 'status_message': 'Unbekannter Task-Fehler' + } + + try: + # Ruft die gehärtete Single-Item-Funktion aus helpers.py auf + summary_text = summarize_website_content(raw_text, company_name) + + if summary_text and not summary_text.lower().startswith('k.a.'): + result['summary'] = summary_text + result['error'] = False + result['status_message'] = 'Erfolgreich zusammengefasst' + else: + result['summary'] = summary_text # Fehlergrund übernehmen + result['error'] = True + result['status_message'] = 'Zusammenfassung fehlgeschlagen oder Text zu kurz' + + return result + + except Exception as e: + self.logger.error(f" -> Kritischer Fehler im Worker-Task `_summarize_task_batch` für Zeile {row_num}: {e}") + result['summary'] = f"FEHLER (API-Fehler bei Zusammenfassung)" + result['status_message'] = f"Kritischer Task-Fehler: {type(e).__name__}" + return result + def process_rows_sequentially( @@ -2195,437 +2235,140 @@ class DataProcessor: f"Wikipedia-Verifizierungs-Batch abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen ({skipped_no_wiki_url} wegen fehlender M-URL).") # <<< GEÄNDERT - def process_summarization_batch( - self, - start_sheet_row=None, - end_sheet_row=None, - limit=None): + def process_summarize_website(self, start_sheet_row=None, end_sheet_row=None, limit=None): """ - Batch-Prozess NUR fuer Website-Zusammenfassung. + Batch-Prozess NUR für Website-Zusammenfassung. + Nutzt einen ThreadPoolExecutor für echte parallele Verarbeitung und verbesserte Stabilität. """ - self.logger.info( - f"Starte Website-Zusammenfassung (Batch). Bereich: {start_sheet_row if start_sheet_row else 'Start'}-{end_sheet_row if end_sheet_row else 'Ende'}, Limit: {limit if limit else 'Unbegrenzt'}") + self.logger.info(f"Starte Website-Zusammenfassung (Parallel Batch, v2.0.3). Bereich: {start_sheet_row or 'Start'}-{end_sheet_row or 'Ende'}, Limit: {limit or 'Unbegrenzt'}") + + # --- 1. Daten laden und Startzeile ermitteln --- if start_sheet_row is None: - start_data_index = self.sheet_handler.get_start_row_index( - check_column_key="Website Zusammenfassung") - if start_data_index == -1: + self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren 'Website Zusammenfassung'...") + start_data_idx = self.sheet_handler.get_start_row_index(check_column_key="Website Zusammenfassung") + if start_data_idx == -1: + self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") return - start_sheet_row = start_data_index + self.sheet_handler._header_rows + 1 - else: - if not self.sheet_handler.load_data(): - return - """ - Batch-Prozess NUR fuer Website-Zusammenfassung (AS). - Laedt Daten neu, prueft, ob Rohtext (AR) vorhanden und Zusammenfassung (AS) fehlt. - Fasst Rohtexte im Batch ueber OpenAI zusammen und setzt AS + AP. + start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1 + self.logger.info(f"Automatisch ermittelte Startzeile: {start_sheet_row}") + + if not self.sheet_handler.load_data(): + self.logger.error("FEHLER beim Laden der Daten für Summarization-Batch.") + return - Args: - start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AS). - end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet). - limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt). - """ - # Verwenden Sie logger, da das Logging jetzt konfiguriert ist - # Logge die Konfiguration des Batch-Laufs - self.logger.info( - f"Starte Website-Zusammenfassung (Batch AS, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT - - # --- Daten laden und Startzeile ermitteln --- - # Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt - if start_sheet_row is None: - self.logger.info( - "Automatische Ermittlung der Startzeile basierend auf leeren AS...") # <<< GEÄNDERT - # Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AS (Block 1 Column Map). - # Standardmaessig ab Zeile 7 - start_data_index_no_header = self.sheet_handler.get_start_row_index( - check_column_key="Website Zusammenfassung", min_sheet_row=7) - - # Wenn get_start_row_index -1 zurueckgibt (Fehler) - if start_data_index_no_header == -1: - self.logger.error( - "FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT - return # Beende die Methode - - # Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten - # Daten-Index - start_sheet_row = start_data_index_no_header + \ - self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut - self.logger.info( - f"Automatisch ermittelte Startzeile (erste leere AS Zelle): {start_sheet_row}") # <<< GEÄNDERT - else: - # Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein. - # Der load_data Aufruf ist mit retry_on_failure dekoriert (Block - # 2). - if not self.sheet_handler.load_data(): - self.logger.error( - "FEHLER beim Laden der Daten fuer process_summarization_batch.") # <<< GEÄNDERT - return # Beende die Methode, wenn das Laden fehlschlaegt - - # Holen Sie die gesamte Datenliste (inklusive Header) aus dem - # SheetHandler. all_data = self.sheet_handler.get_all_data_with_headers() - # Annahme: header_rows ist als Attribut im SheetHandler verfuegbar - # (Block 14). header_rows = self.sheet_handler._header_rows - total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet + total_sheet_rows = len(all_data) + effective_end_row = end_sheet_row if end_sheet_row is not None else total_sheet_rows + + self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {effective_end_row}.") + if start_sheet_row > effective_end_row: + self.logger.info("Start liegt nach dem Ende. Keine Zeilen zu verarbeiten.") + return - # Berechne Endzeile, wenn nicht manuell gesetzt - if end_sheet_row is None: - end_sheet_row = total_sheet_rows # Bis zur letzten Zeile + # --- 2. Spalten-Indizes und Buchstaben vorbereiten --- + summary_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Zusammenfassung") + 1) + version_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Version") + 1) + # Wir benötigen auch einen Timestamp für die Zusammenfassung. Da keiner existiert, nutzen wir den Scrape-Timestamp neu. + timestamp_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1) - # Logge den verarbeitungsbereich - self.logger.info( - f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT - # Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht - # ueber Gesamtzeilen) - if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows: - self.logger.info( - "Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT - return # Beende die Methode, wenn der Bereich leer ist - - # --- Indizes und Buchstaben --- - # Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP - # (Block 1) vorhanden sind - required_keys = [ - "Website Rohtext", "Website Zusammenfassung", "Version", "CRM Name" # AR, AS, AP, B - ] - # Erstellen Sie ein Dictionary mit Schluesseln und Indizes - col_indices = {key: COLUMN_MAP.get(key) for key in required_keys} - - # Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden - # wurden - if None in col_indices.values(): - missing = [k for k, v in col_indices.items() if v is None] - self.logger.critical( - f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_summarization_batch: {missing}. Breche ab.") # <<< GEÄNDERT - return # Beende die Methode bei kritischem Fehler - - # Ermitteln Sie die Indizes und Buchstaben fuer Updates (AS, AP) - rohtext_col_idx = col_indices["Website Rohtext"] - summary_col_idx = col_indices["Website Zusammenfassung"] - version_col_idx = col_indices["Version"] - name_col_idx = col_indices["CRM Name"] # Benoetigt fuer Logging - - summary_col_letter = self.sheet_handler._get_col_letter( - summary_col_idx + 1) # Block 14 _get_col_letter - version_col_letter = self.sheet_handler._get_col_letter( - version_col_idx + 1) - - # --- Verarbeitung --- - # Holen Sie die Batch-Groesse fuer OpenAI-Aufrufe aus Config (Block 1) - openai_batch_size = getattr(Config, 'OPENAI_BATCH_SIZE_LIMIT', 4) - # Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1) + # --- 3. Tasks sammeln --- + openai_batch_size = getattr(Config, 'OPENAI_BATCH_SIZE_LIMIT', 10) # Kann höher sein, da parallel + max_openai_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10) # Gleiche Worker-Anzahl wie beim Scraping update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50) - # Tasks fuer den aktuellen OpenAI Batch (Liste von Dicts) - tasks_for_openai_batch = [] - # 1-basierte Zeilennummern im aktuellen OpenAI Batch - rows_in_current_openai_batch = [] - # Gesammelte Updates fuer Batch-Schreiben ins Sheet (Liste von Dicts) + tasks_for_processing_batch = [] all_sheet_updates = [] - - # Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den - # Batch aufgenommen werden (im Rahmen des Limits). processed_count = 0 - # Zaehlt Zeilen, die uebersprungen wurden (wegen fehlendem Rohtext oder - # vorhandener Zusammenfassung). skipped_count = 0 - # Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte - # Sheet-Zeilennummer) - for i in range(start_sheet_row, end_sheet_row + 1): - row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste - # Pruefen Sie, ob das Ende des Sheets erreicht wurde - if row_index_in_list >= total_sheet_rows: - break # Ende des Sheets erreicht + for i in range(start_sheet_row, effective_end_row + 1): + row_index_in_list = i - 1 + if row_index_in_list >= total_sheet_rows: break + + row = all_data[row_index_in_list] + if not any(cell and str(cell).strip() for cell in row): + skipped_count += 1 + continue - row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile + # Kriterium: Zusammenfassung ist leer/default UND Rohtext ist valide + summary_value = self._get_cell_value_safe(row, "Website Zusammenfassung") + summary_is_empty_or_default = not summary_value or str(summary_value).strip().lower() in ["k.a.", "k.a. (keine zusammenfassung erhalten)"] + + raw_text = self._get_cell_value_safe(row, "Website Rohtext") + raw_text_is_valid = raw_text and isinstance(raw_text, str) and len(raw_text) > 100 and not str(raw_text).strip().lower().startswith('k.a.') - # Stellen Sie sicher, dass die Zeile nicht leer ist - if not any(cell and isinstance(cell, str) and cell.strip() - for cell in row): - # self.logger.debug(f"Zeile {i}: Uebersprungen (Leere Zeile).") - # # Zu viel Laerm im Debug - skipped_count += 1 # Zaehlen als uebersprungen - continue # Springe zur naechsten Zeile + if summary_is_empty_or_default and raw_text_is_valid: + if limit is not None and processed_count >= limit: + self.logger.info(f"Verarbeitungslimit ({limit}) erreicht.") + break + + company_name = self._get_cell_value_safe(row, "CRM Name") + tasks_for_processing_batch.append({"row_num": i, "raw_text": raw_text, "company_name": company_name}) + processed_count += 1 + else: + skipped_count += 1 - # --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist --- - # Kriterium: Website Rohtext (AR) ist vorhanden und gueltig (nicht k.A. oder Fehlerwerte). - # UND Website Zusammenfassung (AS) ist leer oder ein - # Standard-Fehlerwert. + # --- 4. Batch-Verarbeitung auslösen --- + if len(tasks_for_processing_batch) >= openai_batch_size or (i == effective_end_row and tasks_for_processing_batch): + self.logger.info(f"--- Starte Website-Summarization Batch für {len(tasks_for_processing_batch)} Tasks (max. {max_openai_workers} Worker) ---") + + summarization_results = {} + batch_error_count = 0 + + with ThreadPoolExecutor(max_workers=max_openai_workers) as executor: + future_to_task = {executor.submit(self._summarize_task_batch, task): task for task in tasks_for_processing_batch} + + for future in as_completed(future_to_task): + task = future_to_task[future] + try: + result_dict = future.result() + if isinstance(result_dict, dict) and 'row_num' in result_dict: + summarization_results[result_dict['row_num']] = result_dict + if result_dict.get('error'): + batch_error_count += 1 + self.logger.warning(f"Worker meldete Fehler bei Zusammenfassung für Zeile {result_dict['row_num']}: {result_dict.get('status_message')}") + else: + self.logger.error(f"Inkonsistentes Ergebnis für Zeile {task['row_num']}: Erwartete dict mit 'row_num', bekam {type(result_dict)}. Überspringe.") + summarization_results[task['row_num']] = {'summary': "FEHLER (Inkonsistenter Rückgabetyp)", 'error': True} + batch_error_count += 1 + except Exception as exc: + self.logger.error(f"Unerwarteter Fehler bei Ergebnisabfrage für Zeile {task['row_num']}: {exc}", exc_info=True) + summarization_results[task['row_num']] = {'summary': "FEHLER (Task Exception)", 'error': True} + batch_error_count += 1 + + self.logger.info(f" -> Zusammenfassung für Batch beendet. {len(summarization_results)} Ergebnisse erhalten ({batch_error_count} davon mit Fehlern).") - # Holen Sie den Wert aus Spalte AR (Website Rohtext) (nutzt interne - # Helfer _get_cell_value_safe) - raw_text = self._get_cell_value_safe( - row, "Website Rohtext") # Block 1 Column Map - # Pruefen Sie, ob AR gefuellt und gueltig ist. - raw_text_is_valid = raw_text and isinstance( - raw_text, str) and str(raw_text).strip().lower() not in [ - "k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"] + # --- 5. Updates für das Google Sheet vorbereiten --- + if summarization_results: + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_version = getattr(Config, 'VERSION', 'unknown') + + for row_num, res_dict in summarization_results.items(): + # Zusammenfassung, Timestamp (wir überschreiben den alten Scrape-TS) und Version werden zum Update hinzugefügt + all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[res_dict.get('summary', 'k.A.')]]}) + all_sheet_updates.append({'range': f'{timestamp_col_letter}{row_num}', 'values': [[current_timestamp]]}) + all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) - # Holen Sie den Wert aus Spalte AS (Website Zusammenfassung) (nutzt - # interne Helfer _get_cell_value_safe) - summary_value = self._get_cell_value_safe( - row, "Website Zusammenfassung") # Block 1 Column Map - # Pruefen Sie, ob AS leer ist oder einen Standard-Fehlerwert - # enthaelt. - summary_is_empty_or_default = not summary_value or ( - isinstance( - summary_value, - str) and str(summary_value).strip().lower() in [ - "k.a.", - "k.a. (keine zusammenfassung erhalten)"]) + tasks_for_processing_batch = [] # Batch leeren - # Verarbeitung ist noetig, wenn AR gueltig ist UND AS leer/default - # ist. - processing_needed_for_row = raw_text_is_valid and summary_is_empty_or_default - - # Loggen der Pruefergebnisse fuer diese Zeile auf Debug-Level - log_check = ( - i < start_sheet_row + - 5) or ( - i % - 100 == 0) or (processing_needed_for_row) - if log_check: - company_name = self._get_cell_value_safe( - row, "CRM Name").strip() # Block 1 Column Map - self.logger.debug( - f"Zeile {i} ({company_name[:50]}... Website Summarization Check): AR gueltig? {raw_text_is_valid} (len={len(str(raw_text))}), AS leer/default? {summary_is_empty_or_default}. Benötigt Verarbeitung? {processing_needed_for_row}") # <<< GEÄNDERT - - # Wenn die Verarbeitung fuer diese Zeile nicht noetig ist - if not processing_needed_for_row: - skipped_count += 1 # Zaehlen als uebersprungene Zeile - continue # Springe zur naechsten Zeile - - # --- Wenn Verarbeitung noetig: Fuege zur Batch-Liste fuer OpenAI hinzu --- - # Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im - # Rahmen des Limits zaehlen) - processed_count += 1 - - # Pruefe das Limit fuer verarbeitete Zeilen - if limit is not None and isinstance( - limit, int) and limit > 0 and processed_count > limit: - # Wenn das Limit erreicht ist und es ein positives Limit gibt - self.logger.info( - f"Verarbeitungslimit ({limit}) fuer process_summarization_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT - break # Brich die Schleife ab - - # Fuege die benoetigten Daten fuer den OpenAI Batch hinzu - # (Zeilennummer und Rohtext) - tasks_for_openai_batch.append({'row_num': i, 'raw_text': raw_text}) - # Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu - rows_in_current_openai_batch.append(i) - - # --- Verarbeite den Batch, wenn voll --- - # Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat. - # openai_batch_size wird aus Config geholt (Block 1). - if len(tasks_for_openai_batch) >= openai_batch_size: - # Logge den Start der Batch-Verarbeitung - batch_start_row = tasks_for_openai_batch[0]['row_num'] - batch_end_row = tasks_for_openai_batch[-1]['row_num'] - self.logger.debug( - f"\n--- Starte Website-Summarization Batch ({len(tasks_for_openai_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT - - # Rufe die globale Funktion auf, die den OpenAI Call fuer den Batch macht (Block 9). - # summarize_batch_openai ist mit retry_on_failure dekoriert (Block 2). - # Wenn summarize_batch_openai eine Exception wirft (nach Retries), wird diese hier gefangen. - # !!! KORRIGIERTER AUFRUF !!! - try: - # Rufen Sie die korrekte globale Funktion auf - # <<< Korrigierter Aufruf (vorher war fälschlicherweise _process_verification_openai_batch) - batch_results = summarize_batch_openai( - tasks_for_openai_batch) - # Ergebnisse sollten ein Dictionary {row_num: summary_text} - # sein, auch bei Fehlern. - - # Sammle Sheet Updates (AS, AP) fuer diesen Batch - current_version = getattr( - Config, 'VERSION', 'unknown') # Block 1 Config Attribut - batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen - - # Iteriere ueber die Zeilennummern, die in DIESEM OpenAI - # Batch waren - for row_num in rows_in_current_openai_batch: - # Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary. - # Fallback auf einen Fehlerstring, wenn das Ergebnis - # fehlt (sollte nicht passieren, wenn - # summarize_batch_openai korrekt ist). - summary = batch_results.get( - row_num, "k.A. (Batch Ergebnis fehlte)") - # Stelle sicher, dass 'k.A.' bei leeren/kurzen - # Summaries gesetzt wird - if not summary or ( - isinstance( - summary, - str) and summary.strip().lower() in [ - "k.a.", - "k.a. (keine zusammenfassung erhalten)"]): - summary = "k.A. (Keine Zusammenfassung erhalten)" - # Fuege "k.A." oder Fehler an, wenn der Wert von - # summarize_batch_openai ein Fehlerstring ist - elif isinstance(summary, str) and (summary.startswith("k.A. (Fehler") or summary.startswith("FEHLER:")): - pass # Behalte den Fehlerstring von summarize_batch_openai - - # Fuege Updates fuer AS und AP hinzu (nutzt interne - # Helfer) - batch_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [ - [summary]]}) # Block 1 Column Map - batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [ - [current_version]]}) # Block 1 Column Map - - # Sammle diese Batch-Updates fuer das groessere - # Batch-Update - all_sheet_updates.extend(batch_sheet_updates) - - except Exception as e_openai_batch: - # Wenn summarize_batch_openai eine Exception wirft (nach Retries) - # Der Fehler wird bereits vom retry_on_failure Decorator - # auf summarize_batch_openai geloggt. - self.logger.error( - f"Endgueltiger FEHLER beim OpenAI-Batch-Aufruf fuer Zusammenfassung: {e_openai_batch}") # <<< GEÄNDERT - # Logge den Traceback - self.logger.debug(traceback.format_exc()) # <<< GEÄNDERT - # Fügen Sie Fehlerwerte für alle Zeilen im Batch hinzu - current_version = getattr( - Config, 'VERSION', 'unknown') # Block 1 Config Attribut - for row_num in rows_in_current_openai_batch: - # Gekuerzt - error_summary = f"FEHLER OpenAI Batch: {str(e_openai_batch)[:100]}..." - # Fuege Updates mit Fehlerwerten fuer AS und AP hinzu - all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [ - [error_summary]]}) # Block 1 Column Map - all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [ - [current_version]]}) # Block 1 Column Map - - # Leere den OpenAI-Batch zurueck - tasks_for_openai_batch = [] - rows_in_current_openai_batch = [] - - # Sende gesammelte Sheet Updates, wenn das Update-Batch-Limit erreicht ist. - # Updates pro Zeile sind 2 (AS, AP). Anzahl der Zeilen = - # len(all_sheet_updates) / 2. - rows_in_update_batch = len(all_sheet_updates) // 2 - - if rows_in_update_batch >= update_batch_row_limit: - self.logger.debug( - f" Sende gesammelte Sheet-Updates ({rows_in_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT - # Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry. - # Wenn es fehlschlaegt, wird es intern geloggt. - success = self.sheet_handler.batch_update_cells( - all_sheet_updates) - if success: - self.logger.info( - f" Sheet-Update fuer {rows_in_update_batch} Zeilen erfolgreich.") # <<< GEÄNDERT - # Der Fehlerfall wird von batch_update_cells geloggt - - # Leere die gesammelten Updates nach dem Senden. + # --- 6. Sheet-Update auslösen, wenn Update-Batch voll ist --- + # Pro Zeile gibt es 3 Updates + if len(all_sheet_updates) >= (update_batch_row_limit * 3): + self.logger.info(f"Sende gesammelte Sheet-Updates für Zusammenfassungen ({len(all_sheet_updates) // 3} Zeilen)...") + self.sheet_handler.batch_update_cells(all_sheet_updates) all_sheet_updates = [] + time.sleep(1) - # Kurze Pause nach jedem OpenAI Batch (nutzt Config Block 1). - # Dies ist wichtig, um Rate Limits zu vermeiden. - pause_duration = getattr( - Config, 'RETRY_DELAY', 5) * 0.5 # 50% der Retry-Wartezeit - self.logger.debug( - f"Warte {pause_duration:.2f}s vor naechstem Batch...") # <<< GEÄNDERT - time.sleep(pause_duration) - - # --- Verarbeitung des letzten unvollstaendigen OpenAI Batches nach der Schleife --- - # Wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind. - if tasks_for_openai_batch: # Korrektur: War vorher `current_openai_batch_data` - # Logge den Start des finalen Batches - # Korrektur: War vorher `current_openai_batch_data` - batch_start_row = tasks_for_openai_batch[0]['row_num'] - # Korrektur: War vorher `current_openai_batch_data` - batch_end_row = tasks_for_openai_batch[-1]['row_num'] - self.logger.debug( - f"\n--- Starte FINALEN Website-Summarization Batch ({len(tasks_for_openai_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT - - # Rufe die globale Funktion auf, die den OpenAI Call fuer den Batch macht (Block 9). - # summarize_batch_openai ist mit retry_on_failure dekoriert (Block 2). - # Wenn summarize_batch_openai eine Exception wirft (nach Retries), - # wird diese hier gefangen. - batch_results = None - try: - # <<< Korrekter Aufruf Block 9, Korrektur: War vorher `current_openai_batch_data` - batch_results = summarize_batch_openai(tasks_for_openai_batch) - # Ergebnisse sollten ein Dictionary {row_num: summary_text} - # sein, auch bei Fehlern. - - # Sammle Sheet Updates (AS, AP) fuer diesen finalen Batch - current_version = getattr( - Config, 'VERSION', 'unknown') # Block 1 Config Attribut - batch_sheet_updates = [] # Updates fuer diesen spezifischen Batch - # Iteriere ueber die Zeilennummern im Batch, fuer die - # Ergebnisse vorliegen. - for row_num in rows_in_current_openai_batch: - # Hole das Ergebnis fuer diese Zeile aus dem - # Ergebnis-Dictionary. - summary = batch_results.get( - row_num, "k.A. (Batch Ergebnis fehlte)") # Fallback - - # Stelle sicher, dass 'k.A.' bei leeren/kurzen Summaries - # gesetzt wird - if not summary or ( - isinstance( - summary, - str) and summary.strip().lower() in [ - "k.a.", - "k.a. (keine zusammenfassung erhalten)"]): - summary = "k.A. (Keine Zusammenfassung erhalten)" - # Fuege "k.A." oder Fehler an, wenn der Wert von - # summarize_batch_openai ein Fehlerstring ist - elif isinstance(summary, str) and (summary.startswith("k.A. (Fehler") or summary.startswith("FEHLER:")): - pass # Behalte den Fehlerstring von summarize_batch_openai - - # Fuege Updates fuer AS und AP hinzu - batch_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [ - [summary]]}) # Block 1 Column Map - batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [ - [current_version]]}) # Block 1 Column Map - - # Fuege diese Updates zur globalen Liste hinzu (wird dann nur - # noch einmal gesendet) - all_sheet_updates.extend(batch_sheet_updates) - - except Exception as e_openai_batch: - # Wenn summarize_batch_openai eine Exception wirft (nach Retries) - # Der Fehler wird bereits vom retry_on_failure Decorator auf - # summarize_batch_openai geloggt. - self.logger.error( - f"Endgueltiger FEHLER beim FINALEN OpenAI-Batch-Aufruf fuer Zusammenfassung: {e_openai_batch}") # <<< GEÄNDERT - # Logge den Traceback - self.logger.debug(traceback.format_exc()) # <<< GEÄNDERT - # Fügen Sie Fehlerwerte für alle Zeilen im Batch hinzu - current_version = getattr( - Config, 'VERSION', 'unknown') # Block 1 Config Attribut - for row_num in rows_in_current_openai_batch: - # Gekuerzt - error_summary = f"FEHLER OpenAI Batch: {str(e_openai_batch)[:100]}..." - # Fuege Updates mit Fehlerwerten fuer AS und AP hinzu - all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [ - [error_summary]]}) # Block 1 Column Map - all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [ - [current_version]]}) # Block 1 Column Map - - # --- Finale Sheet Updates senden --- - # Sende alle verbleibenden gesammelten Updates in einem letzten - # Batch-Update. + # --- 7. Finale Updates senden --- if all_sheet_updates: - rows_in_final_update_batch = len( - all_sheet_updates) // 2 # Updates pro Zeile ist 2 - self.logger.info( - f"Sende FINALE gesammelte Sheet-Updates ({rows_in_final_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT - # Nutzt die batch_update_cells Methode des Sheet Handlers (Block - # 14) mit Retry. - success = self.sheet_handler.batch_update_cells(all_sheet_updates) - if success: - # <<< GEÄNDERT - self.logger.info(f"FINALES Sheet-Update erfolgreich.") - # Der Fehlerfall wird von batch_update_cells geloggt + self.logger.info(f"Sende finale gesammelte Sheet-Updates für Zusammenfassungen ({len(all_sheet_updates) // 3} Zeilen)...") + self.sheet_handler.batch_update_cells(all_sheet_updates) + + self.logger.info(f"Website-Zusammenfassung (Batch) abgeschlossen. {processed_count} Zeilen zur Verarbeitung ausgewählt, {skipped_count} Zeilen übersprungen.") + - # Logge den Abschluss des Modus - self.logger.info( - f"Website-Zusammenfassung (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen.") # <<< GEÄNDERT def evaluate_branch_task(self, task_data, openai_semaphore): """