diff --git a/data_processor.py b/data_processor.py new file mode 100644 index 00000000..b7798274 --- /dev/null +++ b/data_processor.py @@ -0,0 +1,1820 @@ +# --- START OF FILE data_processor.py (Part 1/7) --- + +#!/usr/bin/env python3 +""" +data_processor.py + +Zentrale Klasse zur Orchestrierung und Verarbeitung von Unternehmensdaten. +Enthält die Logik für die Verarbeitung einzelner Zeilen sowie die Steuerung +verschiedener Batch-Modi und Dienstprogramme. +""" + +import logging +import time +import traceback +import concurrent.futures +import threading +import pickle +import json +import os +from datetime import datetime + +import pandas as pd +import numpy as np +from sklearn.model_selection import train_test_split, GridSearchCV +from sklearn.impute import SimpleImputer +from sklearn.ensemble import RandomForestClassifier +from sklearn.metrics import accuracy_score, classification_report, confusion_matrix +from imblearn.over_sampling import SMOTE +from imblearn.pipeline import Pipeline as ImbPipeline + +# Import der abhängigen Module +from config import Config, COLUMN_MAP, MODEL_FILE, IMPUTER_FILE, PATTERNS_FILE_JSON +from helpers import ( + serp_website_lookup, get_website_raw, scrape_website_details, summarize_website_content, + URL_CHECK_MARKER, evaluate_branche_chatgpt, get_numeric_filter_value, + summarize_batch_openai, load_target_schema, ALLOWED_TARGET_BRANCHES, + serp_wikipedia_lookup, search_linkedin_contacts, is_valid_wikipedia_article_url +) +# Klassen-Imports +from google_sheet_handler import GoogleSheetHandler +from wikipedia_scraper import WikipediaScraper + + +class DataProcessor: + """ + Zentrale Klasse zur Orchestrierung und Verarbeitung von Unternehmensdaten. + """ + def __init__(self, sheet_handler, wiki_scraper): + """ + Initialisiert den DataProcessor mit Instanzen von Handler-Klassen. + """ + self.logger = logging.getLogger(__name__ + ".DataProcessor") + self.logger.info("Initialisiere DataProcessor...") + + if not isinstance(sheet_handler, GoogleSheetHandler): + self.logger.critical("DataProcessor Init FEHLER: Kein gueltiger GoogleSheetHandler uebergeben!") + raise ValueError("DataProcessor benoetigt eine gueltige GoogleSheetHandler Instanz.") + if not isinstance(wiki_scraper, WikipediaScraper): + self.logger.critical("DataProcessor Init FEHLER: Kein gueltiger WikipediaScraper uebergeben!") + raise ValueError("DataProcessor benoetigt eine gueltige WikipediaScraper Instanz.") + + self.sheet_handler = sheet_handler + self.wiki_scraper = wiki_scraper + + self.model = None + self.imputer = None + self._expected_features = None + + self.logger.info("DataProcessor initialisiert mit Handlern.") + + self._step_status_map = { + 'wiki_verify': "Wiki Verif. Timestamp", + 'website_scrape': "Website Scrape Timestamp", + 'summarize_website': "Website Scrape Timestamp", + 'branch_eval': "Timestamp letzte Pruefung", + 'find_wiki_serp': "SerpAPI Wiki Search Timestamp", + 'contact_search': "Contact Search Timestamp", + 'wiki_updates_from_chatgpt': "Chat Wiki Konsistenzpruefung", + } + + def _get_cell_value_safe(self, row, column_key): + """ + Hilfsfunktion fuer sicheren Zellenzugriff anhand des COLUMN_MAP Schluessels. + """ + idx = COLUMN_MAP.get(column_key) + if idx is None: + self.logger.error(f"_get_cell_value_safe: Schluessel '{column_key}' nicht in COLUMN_MAP gefunden.") + return '' + if len(row) > idx: + return row[idx] if row[idx] is not None else '' + else: + self.logger.debug(f"_get_cell_value_safe: Index {idx} fuer '{column_key}' ist gueltig, aber Zeile ist zu kurz (Laenge {len(row)}).") + return '' + + def _needs_website_processing(self, row_data, force_reeval): + """ + Prueft, ob Website-Scraping/Summarization fuer diese Zeile noetig ist. + """ + if force_reeval: + return True + at_value = self._get_cell_value_safe(row_data, "Website Scrape Timestamp").strip() + if not at_value: + return True + return False + + def _needs_wiki_processing(self, row_data, force_reeval): + """ + Prueft, ob Wikipedia-Suche/Extraktion fuer diese Zeile noetig ist. + """ + if force_reeval: + return True + an_value = self._get_cell_value_safe(row_data, "Wikipedia Timestamp").strip() + if not an_value: + return True + s_value = self._get_cell_value_safe(row_data, "Chat Wiki Konsistenzpruefung").strip().upper() + if s_value == "X (URL COPIED)": + return True + return False + + def _needs_wiki_verification(self, row_data, force_reeval): + """ + Prueft, ob Wikipedia-Verifizierung fuer diese Zeile noetig ist. + """ + if force_reeval: + return True + ax_value = self._get_cell_value_safe(row_data, "Wiki Verif. Timestamp").strip() + if not ax_value: + m_value = self._get_cell_value_safe(row_data, "Wiki URL").strip() + if m_value and m_value.lower() not in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"]: + return True + return False + + def _needs_chat_evaluations(self, row_data, force_reeval, wiki_data_just_updated): + """ + Prueft, ob ChatGPT-Evaluationen fuer diese Zeile noetig sind. + """ + if force_reeval: + return True + ao_value = self._get_cell_value_safe(row_data, "Timestamp letzte Pruefung").strip() + if not ao_value: + return True + if wiki_data_just_updated: + return True + return False + + def _needs_ml_prediction(self, row_data, force_reeval, chat_eval_just_ran): + """ + Prueft, ob die ML-Schaetzung fuer diese Zeile noetig ist. + """ + if force_reeval: + return True + au_value = self._get_cell_value_safe(row_data, "Geschaetzter Techniker Bucket").strip() + if not au_value or au_value.lower() in ["k.a.", "fehler schaetzung"]: + if chat_eval_just_ran: + return True + av_value = self._get_cell_value_safe(row_data, "Finaler Umsatz (Wiki>CRM)").strip() + aw_value = self._get_cell_value_safe(row_data, "Finaler Mitarbeiter (Wiki>CRM)").strip() + if av_value != "k.A." and not av_value.startswith("FEHLER") and aw_value != "k.A." and not aw_value.startswith("FEHLER"): + return True + return False + + def _process_single_row(self, row_num_in_sheet, row_data, + steps_to_run, force_reeval=False, clear_x_flag=False): + """ + Verarbeitet die Daten fuer eine einzelne Zeile im Sheet. Fuehrt ausgewaehlte + Anreicherungs- und Analyseprozesse durch. + """ + self.logger.info(f"--- Starte Verarbeitung fuer Zeile {row_num_in_sheet} {'(Re-Eval)' if force_reeval else ''} (Schritte: {', '.join(steps_to_run) if steps_to_run else 'Keine'}) ---") + updates = [] + now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + any_processing_done = False + wiki_data_updated_in_this_run = False + chat_eval_just_ran = False + + # --- Initiale Werte lesen --- + company_name = self._get_cell_value_safe(row_data, "CRM Name").strip() + website_url = self._get_cell_value_safe(row_data, "CRM Website").strip() + crm_kurzform = self._get_cell_value_safe(row_data, "CRM Kurzform").strip() + crm_branche = self._get_cell_value_safe(row_data, "CRM Branche").strip() + crm_beschreibung = self._get_cell_value_safe(row_data, "CRM Beschreibung").strip() + parent_account_name_d = self._get_cell_value_safe(row_data, "Parent Account Name").strip() + + current_wiki_data = { + 'url': self._get_cell_value_safe(row_data, "Wiki URL") or 'k.A.', + 'sitz_stadt': self._get_cell_value_safe(row_data, "Wiki Sitz Stadt") or 'k.A.', + 'sitz_land': self._get_cell_value_safe(row_data, "Wiki Sitz Land") or 'k.A.', + 'first_paragraph': self._get_cell_value_safe(row_data, "Wiki Absatz") or 'k.A.', + 'branche': self._get_cell_value_safe(row_data, "Wiki Branche") or 'k.A.', + 'umsatz': self._get_cell_value_safe(row_data, "Wiki Umsatz") or 'k.A.', + 'mitarbeiter': self._get_cell_value_safe(row_data, "Wiki Mitarbeiter") or 'k.A.', + 'categories': self._get_cell_value_safe(row_data, "Wiki Kategorien") or 'k.A.' + } + final_wiki_data = current_wiki_data.copy() + + website_raw = self._get_cell_value_safe(row_data, "Website Rohtext") or 'k.A.' + website_summary = self._get_cell_value_safe(row_data, "Website Zusammenfassung") or 'k.A.' + website_meta_details = self._get_cell_value_safe(row_data, "Website Meta-Details") or 'k.A.' + url_pruefstatus = self._get_cell_value_safe(row_data, "URL Prüfstatus") or '' + + # --- 1. Website Handling (Lookup, Scraping, Summarization, Meta) --- + run_website_step = 'web' in steps_to_run + website_processing_needed = self._needs_website_processing(row_data, force_reeval) + + if run_website_step and website_processing_needed: + any_processing_done = True + grund_message = "Re-Eval" if force_reeval else "Timestamp (AJ) leer" + self.logger.info(f"Zeile {row_num_in_sheet}: Fuehre WEBSITE Schritte aus (Grund: {grund_message})...") + + if not website_url or website_url.lower() == "k.a.": + self.logger.debug(" -> Website URL (E) leer, suche ueber SERP...") + try: + new_website = serp_website_lookup(company_name) + if new_website and new_website.lower() != "k.a." and not new_website.startswith("k.A. (Fehler"): + website_url = new_website + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["CRM Website"] + 1)}{row_num_in_sheet}', 'values': [[website_url]]}) + url_pruefstatus = "URL_OK_SERP" + else: + url_pruefstatus = "URL_SERP_FAILED" if not website_url or website_url.lower() == "k.a." else url_pruefstatus + except Exception as e_serp_lookup_web: + self.logger.error(f"FEHLER bei SERP Website Lookup für '{company_name}': {e_serp_lookup_web}") + url_pruefstatus = "URL_SERP_ERROR" + + if website_url and website_url.lower() not in ["k.a.", "http:"]: + self.logger.debug(f" -> Scrape Rohtext & Meta von {website_url[:100]}...") + try: + website_raw = get_website_raw(website_url) + if website_raw == URL_CHECK_MARKER: + url_pruefstatus = URL_CHECK_MARKER + website_summary, website_meta_details = "k.A. (URL prüfen)", "k.A. (URL prüfen)" + elif website_raw and str(website_raw).strip().lower() not in ["k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"]: + url_pruefstatus = "URL_OK_SCRAPED" + website_meta_details = scrape_website_details(website_url) or "k.A. (Keine Meta-Details)" + website_summary = summarize_website_content(website_raw) or "k.A. (Keine Zusammenfassung erhalten)" + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Meta-Details"] + 1)}{row_num_in_sheet}', 'values': [[website_meta_details]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Zusammenfassung"] + 1)}{row_num_in_sheet}', 'values': [[website_summary]]}) + else: + if not str(website_raw).startswith("k.A. (Fehler"): + url_pruefstatus = "URL_SCRAPE_EMPTY_OR_BANNER" + website_summary, website_meta_details = "k.A.", "k.A." + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Zusammenfassung"] + 1)}{row_num_in_sheet}', 'values': [[website_summary]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Meta-Details"] + 1)}{row_num_in_sheet}', 'values': [[website_meta_details]]}) + except Exception as e_scrape_web: + self.logger.error(f"FEHLER beim Website Scraping für '{company_name}': {e_scrape_web}") + website_raw, website_summary, website_meta_details = f"k.A. (Fehler Scraping: {str(e_scrape_web)[:50]})", "k.A.", "k.A." + url_pruefstatus = "URL_SCRAPE_ERROR" + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Zusammenfassung"] + 1)}{row_num_in_sheet}', 'values': [[website_summary]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Meta-Details"] + 1)}{row_num_in_sheet}', 'values': [[website_meta_details]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Rohtext"] + 1)}{row_num_in_sheet}', 'values': [[website_raw]]}) + else: + self.logger.debug(f" -> Keine gültige Website URL für '{company_name}'. Web-Verarbeitung übersprungen.") + website_raw, website_summary, website_meta_details = "k.A. (Keine URL)", "k.A.", "k.A." + if not url_pruefstatus: url_pruefstatus = "URL_MISSING" + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Rohtext"] + 1)}{row_num_in_sheet}', 'values': [[website_raw]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Zusammenfassung"] + 1)}{row_num_in_sheet}', 'values': [[website_summary]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Meta-Details"] + 1)}{row_num_in_sheet}', 'values': [[website_meta_details]]}) + + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["URL Prüfstatus"] + 1)}{row_num_in_sheet}', 'values': [[url_pruefstatus]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Scrape Timestamp"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]}) + + # ====================================================================== + # === 2. Wikipedia Handling (Search, Extraction, Status Reset) ========== + # ====================================================================== + run_wiki_step = 'wiki' in steps_to_run + wiki_processing_needed = self._needs_wiki_processing(row_data, force_reeval) + + if run_wiki_step and wiki_processing_needed: + any_processing_done = True + grund_message_parts_wiki = [] + if force_reeval: grund_message_parts_wiki.append('Re-Eval') + if not self._get_cell_value_safe(row_data, "Wikipedia Timestamp").strip(): grund_message_parts_wiki.append('Z leer') + if self._get_cell_value_safe(row_data, "Chat Wiki Konsistenzpruefung").strip().upper() == "X (URL COPIED)": grund_message_parts_wiki.append("AC='X (URL COPIED)'") + grund_message_wiki = ", ".join(filter(None, grund_message_parts_wiki)) or "Bedingung erfüllt" + self.logger.info(f"Zeile {row_num_in_sheet}: Fuehre WIKI Schritte aus (Grund: {grund_message_wiki})...") + + current_wiki_url_r = self._get_cell_value_safe(row_data, "Wiki URL").strip() + system_suggested_parent_o = self._get_cell_value_safe(row_data, "System Vorschlag Parent Account").strip() + + url_for_extraction = None + source_of_wiki_data_origin_log_msg = "Tochter (Initial)" + additional_info_for_af_col = "" + + if not current_wiki_url_r or current_wiki_url_r.lower() == 'k.a.': + if parent_account_name_d and parent_account_name_d.lower() != 'k.a.': + self.logger.info(f" Zeile {row_num_in_sheet}: R leer, D ('{parent_account_name_d}') gesetzt. Suche Wiki für Parent D.") + try: + potential_url = serp_wikipedia_lookup(parent_account_name_d) + if potential_url and not str(potential_url).startswith("FEHLER"): + url_for_extraction = potential_url + source_of_wiki_data_origin_log_msg = f"Parent D ('{parent_account_name_d}')" + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Wiki Konsistenzpruefung"] + 1)}{row_num_in_sheet}', 'values': [["INFO_PARENT_AUS_D"]]}) + additional_info_for_af_col = f"INFO: Wiki-URL von Parent (D): {parent_account_name_d}. " + else: + additional_info_for_af_col = f"WARN: Kein Wiki für Parent D '{parent_account_name_d}' gefunden. " + except Exception as e_d_lookup: + self.logger.error(f"Fehler bei Wiki-Suche für Parent D '{parent_account_name_d}': {e_d_lookup}") + additional_info_for_af_col = f"ERR: Suche Parent D fehlgeschlagen. " + + if url_for_extraction is None and system_suggested_parent_o and system_suggested_parent_o.lower() != 'k.a.': + self.logger.info(f" Zeile {row_num_in_sheet}: R leer, D nicht erfolgreich. O ('{system_suggested_parent_o}') gesetzt. Suche Wiki für Parent O.") + try: + potential_url = serp_wikipedia_lookup(system_suggested_parent_o) + if potential_url and not str(potential_url).startswith("FEHLER"): + url_for_extraction = potential_url + source_of_wiki_data_origin_log_msg = f"Parent O ('{system_suggested_parent_o}')" + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Wiki Konsistenzpruefung"] + 1)}{row_num_in_sheet}', 'values': [["INFO_PARENT_AUS_O"]]}) + additional_info_for_af_col += f"INFO: Wiki-URL von Parent (O): {system_suggested_parent_o}. " + else: + additional_info_for_af_col += f"WARN: Kein Wiki für Parent O '{system_suggested_parent_o}' gefunden. " + except Exception as e_o_lookup: + self.logger.error(f"Fehler bei Wiki-Suche für Parent O '{system_suggested_parent_o}': {e_o_lookup}") + additional_info_for_af_col += f"ERR: Suche Parent O fehlgeschlagen. " + + if url_for_extraction is None: + search_for_daughter_needed = False + status_ac_reparse = self._get_cell_value_safe(row_data, "Chat Wiki Konsistenzpruefung").strip().upper() == "X (URL COPIED)" + ts_z_empty = not self._get_cell_value_safe(row_data, "Wikipedia Timestamp").strip() + r_url_valid_looking = current_wiki_url_r and "wikipedia.org/wiki/" in current_wiki_url_r.lower() + + if status_ac_reparse or force_reeval or ts_z_empty or not r_url_valid_looking: + if r_url_valid_looking and not (status_ac_reparse or force_reeval): + self.logger.info(f" Zeile {row_num_in_sheet}: Nutze vorhandene Tochter-URL (R): {current_wiki_url_r}") + url_for_extraction = current_wiki_url_r + source_of_wiki_data_origin_log_msg = "Tochter (aus R)" + else: + self.logger.info(f" Zeile {row_num_in_sheet}: Starte neue Suche für Tochter '{company_name}'.") + search_for_daughter_needed = True + + if search_for_daughter_needed: + try: + page_obj = self.wiki_scraper.search_company_article(company_name, website_url) + if page_obj: + url_for_extraction = page_obj.url + source_of_wiki_data_origin_log_msg = "Tochter (Suche erfolgreich)" + else: + url_for_extraction = "Kein Artikel gefunden" + except Exception as e_tochter_suche: + self.logger.error(f"Fehler bei Wiki-Suche für Tochter '{company_name}': {e_tochter_suche}") + url_for_extraction = f"Fehler Suche Tochter: {str(e_tochter_suche)[:50]}" + + if url_for_extraction and isinstance(url_for_extraction, str) and url_for_extraction.lower() not in ["k.a.", "kein artikel gefunden"] and not url_for_extraction.startswith("FEHLER"): + self.logger.info(f" -> Extrahiere Wiki-Daten von URL ({source_of_wiki_data_origin_log_msg}): {url_for_extraction[:100]}...") + try: + extracted_data = self.wiki_scraper.extract_company_data(url_for_extraction) + if extracted_data and extracted_data.get('url') != 'k.A.': + final_wiki_data = extracted_data + wiki_data_updated_in_this_run = True + current_ac_val = self._get_cell_value_safe(row_data, "Chat Wiki Konsistenzpruefung").strip() + if source_of_wiki_data_origin_log_msg.startswith("Parent") and current_ac_val not in ["INFO_PARENT_AUS_D", "INFO_PARENT_AUS_O"]: + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Verif. Timestamp"] + 1)}{row_num_in_sheet}', 'values': [['']]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Wiki Inkonsistenz"] + 1)}{row_num_in_sheet}', 'values': [['']]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Wiki Artikel"] + 1)}{row_num_in_sheet}', 'values': [['']]}) + elif not source_of_wiki_data_origin_log_msg.startswith("Parent"): + self.logger.info(f" -> Setze AC auf '?' für Tochter-Wiki-Update.") + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Wiki Konsistenzpruefung"] + 1)}{row_num_in_sheet}', 'values': [['?']]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Verif. Timestamp"] + 1)}{row_num_in_sheet}', 'values': [['']]}) + else: + final_wiki_data['url'] = url_for_extraction + for key in ['sitz_stadt', 'sitz_land', 'first_paragraph', 'branche', 'umsatz', 'mitarbeiter', 'categories']: + final_wiki_data[key] = 'k.A. (Extraktion fehlgeschlagen)' + wiki_data_updated_in_this_run = True + except Exception as e_extract: + self.logger.error(f"FEHLER bei Wikipedia Datenextraktion von {url_for_extraction[:100]}...: {e_extract}") + final_wiki_data['url'] = url_for_extraction + for key in ['sitz_stadt', 'sitz_land', 'first_paragraph', 'branche', 'umsatz', 'mitarbeiter', 'categories']: + final_wiki_data[key] = 'k.A. (FEHLER Extr.)' + wiki_data_updated_in_this_run = True + elif url_for_extraction: + final_wiki_data['url'] = url_for_extraction + for key in ['sitz_stadt', 'sitz_land', 'first_paragraph', 'branche', 'umsatz', 'mitarbeiter', 'categories']: + final_wiki_data[key] = 'k.A.' + wiki_data_updated_in_this_run = True + + if wiki_data_updated_in_this_run: + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki URL"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('url', 'k.A.')]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Sitz Stadt"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('sitz_stadt', 'k.A.')]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Sitz Land"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('sitz_land', 'k.A.')]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Absatz"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('first_paragraph', 'k.A.')]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Branche"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('branche', 'k.A.')]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Umsatz"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('umsatz', 'k.A.')]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Mitarbeiter"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('mitarbeiter', 'k.A.')]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Kategorien"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('categories', 'k.A.')]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wikipedia Timestamp"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]}) + if additional_info_for_af_col: + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Begruendung bei Abweichung"] + 1)}{row_num_in_sheet}', 'values': [[additional_info_for_af_col]]}) + if source_of_wiki_data_origin_log_msg.startswith("Parent"): + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["SerpAPI Wiki Search Timestamp"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]}) + + # --- 3. ChatGPT Evaluationen (Branch, FSM, etc.) & Plausi --- + run_chat_step = 'chat' in steps_to_run + chat_processing_needed = self._needs_chat_evaluations(row_data, force_reeval, wiki_data_updated_in_this_run) + + if run_chat_step and chat_processing_needed: + any_processing_done = True + chat_eval_just_ran = True + + grund_message_chat = "Re-Eval" if force_reeval else ("Wiki-Daten aktualisiert" if wiki_data_updated_in_this_run else "Timestamp (BN) leer") + self.logger.info(f"Zeile {row_num_in_sheet}: Fuehre CHATGPT Evaluationen & Plausi aus (Grund: {grund_message_chat})...") + + # 3a. Branchen-Einstufung + self.logger.info(f" Zeile {row_num_in_sheet}: Starte Branchen-Einstufung ueber ChatGPT...") + try: + branch_result = evaluate_branche_chatgpt( + crm_branche, + crm_beschreibung, + final_wiki_data.get('branche', 'k.A.'), + final_wiki_data.get('categories', 'k.A.'), + website_summary + ) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get("branch", "FEHLER BRANCH")]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get("confidence", "N/A CONF")]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get("consistency", "error CONS")]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get("justification", "No JUST")]]}) + except Exception as e_branch_eval: + self.logger.error(f"FEHLER bei Branchen-Einstufung für Zeile {row_num_in_sheet}: {e_branch_eval}") + + # 3b, 3c, 3d: Weitere ChatGPT-Evaluationen (hier nicht detailliert implementiert, aber Platzhalter) + # ... Logik für FSM-Relevanz, Mitarbeiter-Schätzung, Umsatz-Schätzung, etc. ... + + # 3e. Konsolidierung Umsatz/Mitarbeiter (BD, BE) + self.logger.debug(f" Zeile {row_num_in_sheet}: Konsolidiere Umsatz (BD) und Mitarbeiter (BE)...") + final_umsatz_str_konsolidiert = "k.A." + final_ma_str_konsolidiert = "k.A." + try: + crm_umsatz_val_str = self._get_cell_value_safe(row_data, "CRM Umsatz") + wiki_umsatz_val_str = final_wiki_data.get('umsatz', 'k.A.') + crm_ma_val_str = self._get_cell_value_safe(row_data, "CRM Anzahl Mitarbeiter") + wiki_ma_val_str = final_wiki_data.get('mitarbeiter', 'k.A.') + + num_crm_umsatz = get_numeric_filter_value(crm_umsatz_val_str, is_umsatz=True) + num_wiki_umsatz = get_numeric_filter_value(wiki_umsatz_val_str, is_umsatz=True) + num_crm_ma = get_numeric_filter_value(crm_ma_val_str, is_umsatz=False) + num_wiki_ma = get_numeric_filter_value(wiki_ma_val_str, is_umsatz=False) + + if parent_account_name_d and parent_account_name_d.lower() != 'k.a.': + self.logger.info(f" -> Parent D ('{parent_account_name_d}') gesetzt. Konsolidiere primär mit CRM-Daten der Tochter.") + final_num_umsatz = num_crm_umsatz if num_crm_umsatz > 0 else num_wiki_umsatz + final_num_ma = num_crm_ma if num_crm_ma > 0 else num_wiki_ma + else: + self.logger.debug(f" -> Parent D leer. Standardkonsolidierung Wiki > CRM.") + final_num_umsatz = num_wiki_umsatz if num_wiki_umsatz > 0 else num_crm_umsatz + final_num_ma = num_wiki_ma if num_wiki_ma > 0 else num_crm_ma + + final_umsatz_str_konsolidiert = str(int(round(final_num_umsatz))) if final_num_umsatz > 0 else 'k.A.' + final_ma_str_konsolidiert = str(int(round(final_num_ma))) if final_num_ma > 0 else 'k.A.' + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Finaler Umsatz (Wiki>CRM)"] + 1)}{row_num_in_sheet}', 'values': [[final_umsatz_str_konsolidiert]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Finaler Mitarbeiter (Wiki>CRM)"] + 1)}{row_num_in_sheet}', 'values': [[final_ma_str_konsolidiert]]}) + except Exception as e_consolidate: + self.logger.error(f"FEHLER bei Konsolidierung (BD/BE) für Zeile {row_num_in_sheet}: {e_consolidate}") + final_umsatz_str_konsolidiert, final_ma_str_konsolidiert = "FEHLER_KONSO", "FEHLER_KONSO" + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Finaler Umsatz (Wiki>CRM)"] + 1)}{row_num_in_sheet}', 'values': [['FEHLER_KONSO']]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Finaler Mitarbeiter (Wiki>CRM)"] + 1)}{row_num_in_sheet}', 'values': [['FEHLER_KONSO']]}) + + # 3f. Plausibilitäts-Checks (BG-BM) + self.logger.debug(f" Zeile {row_num_in_sheet}: Führe Plausibilitäts-Checks durch...") + try: + plausi_input_data = { + "Finaler Umsatz (Wiki>CRM)": final_umsatz_str_konsolidiert, + "Finaler Mitarbeiter (Wiki>CRM)": final_ma_str_konsolidiert, + "CRM Umsatz": self._get_cell_value_safe(row_data, "CRM Umsatz"), + "Wiki Umsatz": final_wiki_data.get('umsatz', 'k.A.'), + "CRM Anzahl Mitarbeiter": self._get_cell_value_safe(row_data, "CRM Anzahl Mitarbeiter"), + "Wiki Mitarbeiter": final_wiki_data.get('mitarbeiter', 'k.A.'), + "Parent Account Name": parent_account_name_d, + "System Vorschlag Parent Account": self._get_cell_value_safe(row_data, "System Vorschlag Parent Account"), + "Parent Vorschlag Status": self._get_cell_value_safe(row_data, "Parent Vorschlag Status") + } + plausi_results = self._check_financial_plausibility(plausi_input_data) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Plausibilität Umsatz"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("plaus_umsatz_flag", "ERR_FLAG")]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Plausibilität Mitarbeiter"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("plaus_ma_flag", "ERR_FLAG")]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Plausibilität Umsatz/MA Ratio"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("plaus_ratio_flag", "ERR_FLAG")]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Abweichung Umsatz CRM/Wiki"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("abweichung_umsatz_flag", "ERR_FLAG")]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Abweichung MA CRM/Wiki"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("abweichung_ma_flag", "ERR_FLAG")]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Plausibilität Begründung"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("plausi_begruendung_final", "Fehler Begr.")]]}) + except Exception as e_plausi: + self.logger.error(f"FEHLER bei Plausi-Checks für Zeile {row_num_in_sheet}: {e_plausi}") + + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Plausibilität Prüfdatum"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]}) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]}) + + # --- 4. Servicetechniker Schaetzung (ML Modell) --- + run_ml_step = 'ml_predict' in steps_to_run + ml_processing_needed = self._needs_ml_prediction(row_data, force_reeval, chat_eval_just_ran) + + if run_ml_step and ml_processing_needed: + any_processing_done = True + self.logger.info(f"Zeile {row_num_in_sheet}: Fuehre ML-Schaetzung aus...") + try: + predicted_bucket = self._predict_technician_bucket(row_data) + if predicted_bucket and isinstance(predicted_bucket, str) and not predicted_bucket.startswith("FEHLER"): + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Geschaetzter Techniker Bucket"] + 1)}{row_num_in_sheet}', 'values': [[predicted_bucket]]}) + self.logger.info(f" -> ML-Schaetzung erfolgreich: Bucket '{predicted_bucket}'.") + else: + self.logger.warning(f" -> ML-Schaetzung lieferte kein gueltiges Ergebnis: '{predicted_bucket}'.") + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Geschaetzter Techniker Bucket"] + 1)}{row_num_in_sheet}', 'values': [['k.A. (Schaetzung fehlgeschlagen)']]}) + except Exception as e_ml: + self.logger.error(f"FEHLER bei ML-Schaetzung fuer Zeile {row_num_in_sheet}: {e_ml}") + self.logger.debug(traceback.format_exc()) + updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Geschaetzter Techniker Bucket"] + 1)}{row_num_in_sheet}', 'values': [[f'FEHLER Schaetzung: {str(e_ml)[:50]}...']]}) + + # ====================================================================== + # === Abschluss der _process_single_row Verarbeitung =================== + # ====================================================================== + + # --- 5. Abschliessende Updates (Version, Tokens, ReEval Flag) --- + if any_processing_done: + version_col_idx = COLUMN_MAP.get("Version") + if version_col_idx is not None: + updates.append({'range': f'{self.sheet_handler._get_col_letter(version_col_idx + 1)}{row_num_in_sheet}', 'values': [[getattr(Config, 'VERSION', 'unknown')]]}) + else: + self.logger.error("FEHLER: Spaltenschluessel 'Version' nicht in COLUMN_MAP gefunden.") + + if force_reeval and clear_x_flag: + reeval_col_idx = COLUMN_MAP.get("ReEval Flag") + if reeval_col_idx is not None: + flag_col_letter = self.sheet_handler._get_col_letter(reeval_col_idx + 1) + if flag_col_letter: + updates.append({'range': f'{flag_col_letter}{row_num_in_sheet}', 'values': [['']]}) + self.logger.debug(f" -> Update zum Loeschen des ReEval-Flags (A{row_num_in_sheet}) vorgemerkt.") + else: + self.logger.error(f"FEHLER: Konnte Spaltenbuchstaben fuer 'ReEval Flag' nicht ermitteln.") + else: + self.logger.error("FEHLER: 'ReEval Flag' Spaltenindex nicht in COLUMN_MAP gefunden.") + + # --- 6. Batch Update fuer diese Zeile --- + if updates: + self.logger.info(f"Zeile {row_num_in_sheet}: Sende Batch-Update mit {len(updates)} Operationen...") + success = self.sheet_handler.batch_update_cells(updates) + if not success: + self.logger.error(f"Zeile {row_num_in_sheet}: ENDGUELTIGER FEHLER beim Batch-Update nach Retries.") + else: + if not any_processing_done: + self.logger.debug(f"Zeile {row_num_in_sheet}: Keine Updates zum Schreiben.") + + pause_duration = max(0.05, getattr(Config, 'RETRY_DELAY', 5) / 20.0) + time.sleep(pause_duration) + self.logger.info(f"--- Verarbeitung fuer Zeile {row_num_in_sheet} abgeschlossen ---") + + + # ========================================================================== + # === Prozess Methoden (Sequentiell & Re-Evaluation) ======================= + # ========================================================================== + + def process_rows_sequentially(self, start_sheet_row, num_to_process, + process_wiki_steps=True, process_chatgpt_steps=True, + process_website_steps=True, process_ml_steps=True, + force_reeval_in_single_row=False): + """ + Verarbeitet eine feste Anzahl von Zeilen beginnend bei einer bestimmten + Sheet-Zeilennummer sequentiell. + """ + header_rows = self.sheet_handler._header_rows + if num_to_process is None or not isinstance(num_to_process, int) or num_to_process <= 0: + self.logger.info("Sequentielle Verarbeitung uebersprungen: num_to_process ist ungueltig oder <= 0.") + return + + self.logger.info(f"Starte sequentielle Verarbeitung von {num_to_process} Zeilen ab Sheet-Zeile {start_sheet_row}...") + selected_steps_log = [] + if process_wiki_steps: selected_steps_log.append("Wiki (wiki)") + if process_chatgpt_steps: selected_steps_log.append("ChatGPT (chat)") + if process_website_steps: selected_steps_log.append("Website (web)") + if process_ml_steps: selected_steps_log.append("ML Predict (ml_predict)") + self.logger.info(f" Ausgewaehlte Schritte: {', '.join(selected_steps_log) if selected_steps_log else 'Keine'}") + + if force_reeval_in_single_row: + self.logger.warning(" !!! force_reeval=True wird fuer alle Zeilen in _process_single_row gesetzt !!!") + + steps_to_run_set = set() + if process_wiki_steps: steps_to_run_set.add('wiki') + if process_chatgpt_steps: steps_to_run_set.add('chat') + if process_website_steps: steps_to_run_set.add('web') + if process_ml_steps: steps_to_run_set.add('ml_predict') + if not steps_to_run_set: + self.logger.warning("Keine Verarbeitungsschritte ausgewaehlt. Modus wird uebersprungen.") + return + + if not self.sheet_handler.load_data(): + self.logger.error("Fehler beim Laden der Daten fuer sequentielle Verarbeitung.") + return + + all_data = self.sheet_handler.get_all_data_with_headers() + total_sheet_rows = len(all_data) + start_index_in_all_data = start_sheet_row - 1 + + if start_index_in_all_data >= total_sheet_rows: + self.logger.warning(f"Start-Sheet-Zeile {start_sheet_row} liegt ausserhalb der Daten. Keine Verarbeitung.") + return + if start_index_in_all_data < header_rows: + self.logger.warning(f"Start-Sheet-Zeile {start_sheet_row} liegt in Headern. Starte ab Zeile {header_rows + 1}.") + start_index_in_all_data = header_rows + + end_index_in_all_data = min(start_index_in_all_data + num_to_process, total_sheet_rows) + self.logger.info(f"Sequentielle Verarbeitung: Index-Bereich [{start_index_in_all_data}, {end_index_in_all_data}).") + + if start_index_in_all_data >= end_index_in_all_data: + self.logger.info(f"Keine Zeilen im definierten Bereich zu verarbeiten.") + return + + processed_count = 0 + for i in range(start_index_in_all_data, end_index_in_all_data): + row_num_in_sheet = i + 1 + row_data = all_data[i] + if row_num_in_sheet <= header_rows: continue + if not any(cell and isinstance(cell, str) and cell.strip() for cell in row_data): + self.logger.debug(f"Ueberspringe leere Zeile {row_num_in_sheet}.") + continue + try: + self._process_single_row( + row_num_in_sheet=row_num_in_sheet, + row_data=row_data, + steps_to_run=steps_to_run_set, + force_reeval=force_reeval_in_single_row, + clear_x_flag=False + ) + processed_count += 1 + except Exception as e_proc: + self.logger.exception(f"FEHLER bei sequentieller Verarbeitung von Zeile {row_num_in_sheet}: {e_proc}") + + self.logger.info(f"Sequentielle Verarbeitung abgeschlossen. {processed_count} Zeilen bearbeitet.") + + def process_reevaluation_rows(self, row_limit=None, clear_flag=True, + process_wiki_steps=True, process_chatgpt_steps=True, + process_website_steps=True, process_ml_steps=True): + """ + Verarbeitet nur Zeilen, die in Spalte A mit 'x' markiert sind. + """ + self.logger.info(f"Starte Re-Evaluierungsmodus (Spalte A = 'x'). Max. Zeilen: {row_limit if row_limit is not None else 'Unbegrenzt'}") + selected_steps_log = [] + if process_wiki_steps: selected_steps_log.append("Wiki") + if process_chatgpt_steps: selected_steps_log.append("ChatGPT") + if process_website_steps: selected_steps_log.append("Website") + if process_ml_steps: selected_steps_log.append("ML Predict") + self.logger.info(f"Ausgewaehlte Schritte fuer Re-Eval: {', '.join(selected_steps_log) if selected_steps_log else 'Keine'}") + + steps_to_run_set = set() + if process_wiki_steps: steps_to_run_set.add('wiki') + if process_chatgpt_steps: steps_to_run_set.add('chat') + if process_website_steps: steps_to_run_set.add('web') + if process_ml_steps: steps_to_run_set.add('ml_predict') + if not steps_to_run_set: + self.logger.warning("Keine Verarbeitungsschritte fuer Re-Eval ausgewaehlt. Modus wird uebersprungen.") + return + + if not self.sheet_handler.load_data(): + self.logger.error("Fehler beim Laden der Daten fuer Re-Evaluation.") + return + + all_data = self.sheet_handler.get_all_data_with_headers() + header_rows = self.sheet_handler._header_rows + if not all_data or len(all_data) <= header_rows: + self.logger.warning("Keine Datenzeilen fuer Re-Evaluation gefunden.") + return + + reeval_col_idx = COLUMN_MAP.get("ReEval Flag") + if reeval_col_idx is None: + self.logger.critical("FEHLER: 'ReEval Flag' Spaltenindex nicht in COLUMN_MAP gefunden. Breche ab.") + return + + rows_to_process = [] + for idx, row_data in enumerate(all_data): + if idx < header_rows: continue + row_num_in_sheet = idx + 1 + cell_a_value = self._get_cell_value_safe(row_data, "ReEval Flag").strip().lower() + if cell_a_value == "x": + rows_to_process.append({'row_num': row_num_in_sheet, 'data': row_data}) + + found_count = len(rows_to_process) + self.logger.info(f"{found_count} Zeilen mit ReEval-Flag 'x' gefunden.") + if found_count == 0: return + + processed_count_actual = 0 + for task in rows_to_process: + if row_limit is not None and processed_count_actual >= row_limit: + self.logger.info(f"Zeilenlimit ({row_limit}) fuer Re-Evaluation erreicht.") + break + + self.logger.info(f"Bearbeite Re-Eval Zeile {task['row_num']}...") + processed_count_actual += 1 + try: + self._process_single_row( + row_num_in_sheet=task['row_num'], + row_data=task['data'], + steps_to_run=steps_to_run_set, + force_reeval=True, + clear_x_flag=clear_flag + ) + except Exception as e_proc_reval: + self.logger.exception(f"FEHLER bei Re-Evaluation von Zeile {task['row_num']}: {e_proc_reval}") + + self.logger.info(f"Re-Evaluierung abgeschlossen. {processed_count_actual} Zeilen verarbeitet.") + + # ========================================================================== + # === Batch Processing Methods ============================================= + # ========================================================================== + + @retry_on_failure + def _process_verification_openai_batch(self, batch_data): + """ + Verarbeitet einen Batch von Wikipedia-Verifizierungsanfragen ueber OpenAI. + """ + if not batch_data: return {} + self.logger.debug(f"Sende OpenAI-Batch fuer Wiki-Verifizierung ({len(batch_data)} Zeilen)...") + + aggregated_prompt = ( + "Du bist ein Experte in der Verifizierung von Wikipedia-Artikeln fuer Unternehmen. " + "Fuer jeden der folgenden Eintraege pruefe, ob der vorhandene Wikipedia-Artikel plausibel zum Firmennamen und zur Beschreibung passt. " + "Gib das Ergebnis fuer jeden Eintrag ausschliesslich im folgenden Format auf einer neuen Zeile aus:\n" + "Eintrag : \n\n" + "Moegliche Antworten:\n" + "- 'OK' (wenn der Artikel gut passt)\n" + "- 'X | Alternativer Artikel: | Begruendung: '\n" + "- 'X | Kein passender Artikel gefunden | Begruendung: '\n\n" + "Eintraege zur Pruefung:\n--------------------\n" + ) + max_desc_length = 200 + for item in batch_data: + entry_text = ( + f"Eintrag {item['row_num']}:\n" + f" Firmenname: {str(item.get('company_name', 'k.A.'))}\n" + f" CRM-Beschreibung: {str(item.get('crm_desc', 'k.A.'))[:max_desc_length]}\n" + f" Wikipedia-URL: {str(item.get('wiki_url', 'k.A.'))}\n" + f" Wiki-Absatz: {str(item.get('wiki_paragraph', 'k.A.'))[:max_desc_length]}\n" + f" Wiki-Kategorien: {str(item.get('wiki_categories', 'k.A.'))[:max_desc_length]}\n----\n" + ) + aggregated_prompt += entry_text + + try: + chat_response = summarize_batch_openai(aggregated_prompt, temperature=0.0) # Using summarize helper, but it's just a call_openai_chat wrapper + if not chat_response: raise openai.error.APIError("Keine Antwort von OpenAI erhalten.") + except Exception as e: + self.logger.error(f"Endgueltiger FEHLER beim OpenAI-Batch-Aufruf fuer Wiki-Verifizierung: {e}") + return {item['row_num']: f"FEHLER API: {str(e)[:100]}" for item in batch_data} + + answers = {} + original_batch_row_nums = {item['row_num'] for item in batch_data} + lines = chat_response.strip().split('\n') + for line in lines: + match = re.match(r"Eintrag (\d+): (.*)", line.strip()) + if match: + row_num, answer_text = int(match.group(1)), match.group(2).strip() + if row_num in original_batch_row_nums: answers[row_num] = answer_text + for row_num in original_batch_row_nums: + if row_num not in answers: answers[row_num] = "FEHLER: Antwort nicht geparst" + return answers + + + def process_verification_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None): + """ + Batch-Prozess nur fuer Wikipedia-Verifizierung (Spalten S-U, V-Y werden geleert). + Laedt Daten neu, prueft fuer jede Zeile im Bereich, ob Timestamp AX (Wiki Verif.) + bereits gesetzt ist, ob eine Wiki URL (M) vorhanden ist und ob Status S + nicht bereits 'OK', 'X (URL Copied)' oder 'X (Invalid Suggestion)' ist. + Setzt AX + AP fuer bearbeitete Zeilen und schreibt S-U in Batches. + + Args: + start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AX). + end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet). + limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt). + """ + # Verwenden Sie logger, da das Logging jetzt konfiguriert ist + # Logge die Konfiguration des Batch-Laufs + self.logger.info(f"Starte Wikipedia-Verifizierungsmodus (Batch S-U, AX). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT + + + # --- Daten laden und Startzeile ermitteln --- + # Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt + if start_sheet_row is None: + self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren AX...") # <<< GEÄNDERT + # Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AX (Block 1 Column Map). + # Standardmaessig ab Zeile 7 + start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key="Wiki Verif. Timestamp", min_sheet_row=7) + + # Wenn get_start_row_index -1 zurueckgibt (Fehler) + if start_data_index_no_header == -1: + self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT + return # Beende die Methode + + # Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten Daten-Index + start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut + self.logger.info(f"Automatisch ermittelte Startzeile (erste leere AX Zelle): {start_sheet_row}") # <<< GEÄNDERT + else: + # Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein. + # Der load_data Aufruf ist mit retry_on_failure dekoriert (Block 2). + if not self.sheet_handler.load_data(): + self.logger.error("FEHLER beim Laden der Daten fuer process_verification_batch.") # <<< GEÄNDERT + return # Beende die Methode, wenn das Laden fehlschlaegt + + + # Holen Sie die gesamte Datenliste (inklusive Header) aus dem SheetHandler. + all_data = self.sheet_handler.get_all_data_with_headers() + # Annahme: header_rows ist als Attribut im SheetHandler verfuegbar (Block 14). + header_rows = self.sheet_handler._header_rows + total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet + + # Berechne Endzeile, wenn nicht manuell gesetzt + if end_sheet_row is None: + end_sheet_row = total_sheet_rows # Bis zur letzten Zeile + + # Logge den verarbeitungsbereich + self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT + + # Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht ueber Gesamtzeilen) + if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows: + self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT + return # Beende die Methode, wenn der Bereich leer ist + + + # --- Indizes und Buchstaben --- + # Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP (Block 1) vorhanden sind + required_keys = [ + "Wiki Verif. Timestamp", "Wiki URL", "Chat Wiki Konsistenzpruefung", # Pruefkriterien / Timestamp (AX, M, S) + "CRM Name", "CRM Beschreibung", "Wiki Absatz", "Wiki Kategorien", # Daten fuer Prompt (B, F, N, R) + "Chat Begruendung Wiki Inkonsistenz", "Chat Vorschlag Wiki Artikel", # Ergebnisspalten (T, U) + "Begruendung bei Abweichung", "Chat Begruendung Abweichung Branche", # Spalten V-Y zum Leeren + "Wikipedia Timestamp", "Timestamp letzte Pruefung", # Spalten AN, AO zum Leeren + "Version", "SerpAPI Wiki Search Timestamp" # Spalten AP, AY zum Leeren + ] + # Erstellen Sie ein Dictionary mit Schluesseln und Indizes + col_indices = {key: COLUMN_MAP.get(key) for key in required_keys} + + # Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden wurden + if None in col_indices.values(): + missing = [k for k, v in col_indices.items() if v is None] + self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_verification_batch: {missing}. Breche ab.") # <<< GEÄNDERT + return # Beende die Methode bei kritischem Fehler + + + # Ermitteln Sie die Spaltenbuchstaben fuer Updates und Leerung (nutzt interne Helfer _get_col_letter Block 14) + ts_ax_letter = self.sheet_handler._get_col_letter(col_indices["Wiki Verif. Timestamp"] + 1) # Timestamp zu setzen (AX) + s_letter = self.sheet_handler._get_col_letter(col_indices["Chat Wiki Konsistenzpruefung"] + 1) # Status S + t_letter = self.sheet_handler._get_col_letter(col_indices["Chat Begruendung Wiki Inkonsistenz"] + 1) # Begruendung T + u_letter = self.sheet_handler._get_col_letter(col_indices["Chat Vorschlag Wiki Artikel"] + 1) # Vorschlag U + + # Spalten V-Y leeren (werden in diesem Modus nicht neu befuellt). + # V ist Begruendung bei Abweichung (von Wiki-URL Pruefung CRM vs Wiki). + # Y ist Begruendung Abweichung Branche (von Chat). + v_idx = col_indices["Begruendung bei Abweichung"] + y_idx = col_indices["Chat Begruendung Abweichung Branche"] # Block 1 Column Map + # Erstellen Sie den Bereichsnamen (z.B. "V:Y") + v_letter = self.sheet_handler._get_col_letter(v_idx + 1) + y_letter = self.sheet_handler._get_col_letter(y_idx + 1) + v_y_range_letter = f'{v_letter}:{y_letter}' # z.B. V:Y + # Erstellen Sie eine Liste von leeren Strings fuer diesen Bereich + empty_vy_values = [''] * (y_idx - v_idx + 1) # Anzahl der Spalten = Y_Index - V_Index + 1 + + + # Timestamps AN, AO, AP, AY leeren. + # Diese werden von anderen Schritten gesetzt und sollen hier zurueckgesetzt werden, + # um sicherzustellen, dass die Zeile bei Bedarf von diesen anderen Schritten erneut bearbeitet wird. + an_letter = self.sheet_handler._get_col_letter(col_indices["Wikipedia Timestamp"] + 1) # AN (Wiki Extraction TS) + ao_letter = self.sheet_handler._get_col_letter(col_indices["Timestamp letzte Pruefung"] + 1) # AO (Chat Evaluation TS) + ap_letter = self.sheet_handler._get_col_letter(col_indices["Version"] + 1) # AP (Version) + ay_letter = self.sheet_handler._get_col_letter(col_indices["SerpAPI Wiki Search Timestamp"] + 1) # AY (SerpAPI Wiki TS) + + + # --- Verarbeitung --- + # Holen Sie die Batch-Groesse fuer OpenAI-Aufrufe aus Config (Block 1) + openai_batch_size = getattr(Config, 'PROCESSING_BATCH_SIZE', 20) # Nutzt dieselbe Batch-Groesse wie Scraping/Summarization + # Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1) + update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50) + + + current_openai_batch_data = [] # Daten fuer den aktuellen OpenAI Batch (Liste von Dicts) + rows_in_current_openai_batch = [] # 1-basierte Zeilennummern im aktuellen OpenAI Batch + all_sheet_updates = [] # Gesammelte Updates fuer Batch-Schreiben ins Sheet (Liste von Dicts) + + + processed_count = 0 # Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den Batch aufgenommen werden (im Rahmen des Limits). + skipped_count = 0 # Zaehlt Zeilen, die uebersprungen wurden (wegen Status, fehlender Daten etc.). + skipped_no_wiki_url = 0 # Zaehlt Zeilen, die speziell wegen fehlender M-URL uebersprungen wurden. + + + # Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte Sheet-Zeilennummer) + for i in range(start_sheet_row, end_sheet_row + 1): + row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste + # Pruefen Sie, ob das Ende des Sheets erreicht wurde + if row_index_in_list >= total_sheet_rows: break # Ende des Sheets erreicht + + + row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile + + + # Stellen Sie sicher, dass die Zeile nicht leer ist (mindestens Name vorhanden) + # Nutzt interne Helfer _get_cell_value_safe + company_name = self._get_cell_value_safe(row, "CRM Name").strip() # Block 1 Column Map + if not company_name: + self.logger.debug(f"Zeile {i}: Uebersprungen (Kein Firmenname in Spalte B).") # <<< GEÄNDERT + skipped_count += 1 # Zaehlen als uebersprungen + continue # Springe zur naechsten Zeile + + # --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist --- + # Kriterium: Wiki Verif. Timestamp (AX) ist leer + # UND Wiki URL (M) ist gefuellt und gueltig aussehend (nicht k.A., Fehler etc.) + # UND Status S ist NICHT bereits in einem Endzustand (OK, X (UPDATED/COPIED/INVALID)). + + # Holen Sie die Werte aus den entsprechenden Spalten (nutzt interne Helfer) + ax_value = self._get_cell_value_safe(row, "Wiki Verif. Timestamp").strip() # Block 1 Column Map + m_value = self._get_cell_value_safe(row, "Wiki URL").strip() # Block 1 Column Map + s_value_upper = self._get_cell_value_safe(row, "Chat Wiki Konsistenzpruefung").strip().upper() # Block 1 Column Map + + # Pruefen Sie, ob die Wiki URL (M) gueltig aussieht + is_wiki_url_valid_looking = m_value and isinstance(m_value, str) and "wikipedia.org/wiki/" in m_value.lower() and m_value.lower() not in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"] # Fuege "http:" hinzu basierend auf Log + + + # Definieren Sie die Endzustaende von Status S (Grossbuchstaben) + s_end_states = ["OK", "X (UPDATED)", "X (URL COPIED)", "X (INVALID SUGGESTION)"] + # Pruefen Sie, ob Status S in einem Endzustand ist + is_s_in_endstate = s_value_upper in s_end_states # Bugfix: Korrekte Zuweisung + + + # Verarbeitung ist noetig, wenn AX leer UND M gefuellt/gueltig aussieht UND S NICHT im Endzustand ist. + processing_needed_for_row = not ax_value and is_wiki_url_valid_looking and not is_s_in_endstate + + + # Loggen der Pruefergebnisse fuer diese Zeile auf Debug-Level + log_check = (i < start_sheet_row + 5) or (i % 100 == 0) or (processing_needed_for_row) + if log_check: + self.logger.debug(f"Zeile {i} ({company_name[:50]}... Wiki Verif. Check): AX leer? {not ax_value}, M gueltig? {is_wiki_url_valid_looking}, S ('{s_value_upper}') Endzustand? {is_s_in_endstate}. Benötigt Verarbeitung? {processing_needed_for_row}") # <<< GEÄNDERT + + + # Wenn die Verarbeitung fuer diese Zeile nicht noetig ist + if not processing_needed_for_row: + skipped_count += 1 # Zaehlen als uebersprungene Zeile + # Zaehlen Sie separat, wenn die Zeile speziell wegen fehlender M-URL uebersprungen wurde + if not is_wiki_url_valid_looking: skipped_no_wiki_url += 1 + continue # Springe zur naechsten Zeile + + + # --- Wenn Verarbeitung noetig: Fuege zur Batch-Liste fuer OpenAI hinzu --- + processed_count += 1 # Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im Rahmen des Limits zaehlen) + + # Pruefe das Limit fuer verarbeitete Zeilen + if limit is not None and isinstance(limit, int) and limit > 0 and processed_count > limit: + # Wenn das Limit erreicht ist und es ein positives Limit gibt + self.logger.info(f"Verarbeitungslimit ({limit}) fuer process_verification_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT + break # Brich die Schleife ab + + + # Sammle die benoetigten Daten fuer den OpenAI Prompt (Block 26 - _process_verification_openai_batch). + # Diese Daten werden in einem Dictionary fuer den Batch gesammelt. + crm_desc = self._get_cell_value_safe(row, "CRM Beschreibung") # Block 1 Column Map + wiki_paragraph = self._get_cell_value_safe(row, "Wiki Absatz") # Block 1 Column Map + wiki_categories = self._get_cell_value_safe(row, "Wiki Kategorien") # Block 1 Column Map + + + # Fuege die Daten dieser Zeile zur aktuellen Batch-Liste fuer OpenAI hinzu + current_openai_batch_data.append({ + 'row_num': i, # Die 1-basierte Sheet-Zeilennummer + 'company_name': company_name, # Nutzt den initial geladenen Namen + 'crm_desc': crm_desc, + 'wiki_url': m_value, # Nutzt die M-URL aus dem Sheet + 'wiki_paragraph': wiki_paragraph, + 'wiki_categories': wiki_categories + }) + # Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu + rows_in_current_openai_batch.append(i) + + + # --- Verarbeite den Batch, wenn voll --- + # Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat. + # openai_batch_size wird aus Config geholt (Block 1). + if len(current_openai_batch_data) >= openai_batch_size: + # Logge den Start der Batch-Verarbeitung + batch_start_row = current_openai_batch_data[0]['row_num'] + batch_end_row = current_openai_batch_data[-1]['row_num'] + self.logger.debug(f"\n--- Starte Wiki-Verifizierungs-Batch ({len(current_openai_batch_data)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT + + + # Rufe die interne Methode auf, die den OpenAI Call fuer den Batch macht. + # _process_verification_openai_batch (Block 26) ist mit retry_on_failure dekoriert. + # Wenn _process_verification_openai_batch eine Exception wirft (nach Retries), wird diese hier gefangen. + batch_results = self._process_verification_openai_batch(current_openai_batch_data) + # Ergebnisse sollten ein Dictionary {row_num: raw_chatgpt_answer} sein, auch bei Fehlern. + + + # Sammle Sheet Updates basierend auf den Batch-Ergebnissen. + # Setze immer den Timestamp AX und die Werte in S, T, U und V-Y. + # Der aktuelle Zeitstempel fuer den Batch + current_batch_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen + + + # Iteriere ueber die Zeilennummern, die in DIESEM OpenAI Batch waren + for row_num in rows_in_current_openai_batch: + # Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary. + # Fallback auf einen Fehlerstring, wenn das Ergebnis fehlt (sollte nicht passieren, wenn _process_verification_openai_batch korrekt ist). + answer = batch_results.get(row_num, "FEHLER: Batch-Ergebnis fehlt") + # self.logger.debug(f"Zeile {row_num} Verifizierungsantwort: '{answer[:100]}...'") # Zu viel Laerm (gekuerzt) + + + # Logik zur Bestimmung der Werte fuer S, T, U basierend auf 'answer' (aehnlich wie in altem _process_batch) + wiki_confirm, alt_article, wiki_explanation = "", "", "" # Initialisiere mit leeren Strings + + # Pruefe auf Standard-Antworten und Fehler-Antworten + if isinstance(answer, str) and answer.upper() == "OK": + wiki_confirm = "OK" + wiki_explanation = "Passt laut KI zur Firma." # Standard Begruendung bei OK + elif isinstance(answer, str) and answer.startswith("X |"): + # Parse die Antwort im Format "X | | " + parts = answer.split("|", 2) # Teile maximal in 3 Teile + wiki_confirm = "X" # Status ist X + if len(parts) > 1: + detail = parts[1].strip() # Zweiter Teil ist Detail (Alternative URL oder "Kein passender Artikel gefunden") + if detail.lower().startswith("alternativer artikel:"): + alt_article = detail.split(":", 1)[1].strip() # Extrahiere URL + elif detail.lower() == "kein passender artikel gefunden": + alt_article = detail # Text "Kein passender Artikel gefunden" + else: + alt_article = detail # Unbekanntes Detail + + if len(parts) > 2: + reason_part = parts[2].strip() # Dritter Teil ist Begruendung + if reason_part.lower().startswith("begruendung:"): + wiki_explanation = reason_part.split(":", 1)[1].strip() # Extrahiere Begruendungstext + else: + wiki_explanation = reason_part # Unbekannte Begruendung + + # Fuege ggf. den rohen Antworttext zur Begruendung hinzu, wenn Parsing unvollstaendig war + if not alt_article or not wiki_explanation: + wiki_explanation += f" (Rohantwort: {answer[:100]}...)" + + + elif isinstance(answer, str) and answer.startswith("FEHLER"): + # Wenn die Batch-Verarbeitung einen Fehler zurueckgegeben hat + wiki_confirm = "FEHLER" + wiki_explanation = answer # Fehlermeldung in Begruendung schreiben + alt_article = "Siehe Begruendung" # Verweis auf Begruendung + + else: # Unerwartetes Format der Antwort (weder OK noch X | noch FEHLER) + wiki_confirm = "?" # Setze Status auf unbekannt + wiki_explanation = f"Unerwartetes Format: {str(answer)[:100]}..." # Speichere Anfang der Antwort in Begruendung (gekuerzt) + alt_article = "Siehe Begruendung" # Verweis auf Begruendung + + # Spalten V-Y (Begruendung bei Abweichung etc.) werden in diesem Modus geleert + # Fuer jede Zeile im Batch fuegen wir das Update hinzu. + # empty_vy_values wurde oben vorbereitet. + v_y_values = empty_vy_values # Liste von leeren Strings + # Fuege Update zum Leeren von V-Y hinzu, falls Index gefunden wurde + if v_y_range_letter: # Pruefe, ob der Bereichsname ermittelt werden konnte + batch_sheet_updates.append({'range': f'{v_letter}{row_num}:{y_letter}{row_num}', 'values': [v_y_values]}) # Block 1 Column Map, interne Helfer + + + # Fuege Updates fuer S, T, U und AX hinzu (nutzt interne Helfer) + batch_sheet_updates.append({'range': f'{s_letter}{row_num}', 'values': [[wiki_confirm]]}) # Block 1 Column Map + batch_sheet_updates.append({'range': f'{t_letter}{row_num}', 'values': [[alt_article]]}) # Block 1 Column Map + batch_sheet_updates.append({'range': f'{u_letter}{row_num}', 'values': [[wiki_explanation]]}) # Block 1 Column Map + # Setze AX Timestamp fuer diese Zeile + batch_sheet_updates.append({'range': f'{ts_ax_letter}{row_num}', 'values': [[current_batch_timestamp]]}) # Block 1 Column Map + + + # --- Sende gesammelte Updates fuer diesen Batch --- + # Sammle die Updates fuer diesen Batch in der globalen Liste. + # all_sheet_updates.extend(batch_sheet_updates) # Nicht hier sammeln, sondern direkt senden + + # Sende die gesammelten Updates fuer DIESEN Batch sofort. + if batch_sheet_updates: + self.logger.debug(f" Sende Sheet-Update fuer {len(rows_in_current_openai_batch)} Zeilen ({len(batch_sheet_updates)} Zellen) dieses Batches...") # <<< GEÄNDERT + # Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry. + # Wenn es fehlschlaegt, wird es intern geloggt. + success = self.sheet_handler.batch_update_cells(batch_sheet_updates) + if success: + self.logger.info(f" Sheet-Update fuer Wiki-Verifizierungs-Batch Zeilen {batch_start_row}-{batch_end_row} erfolgreich.") # <<< GEÄNDERT + # Der Fehlerfall wird von batch_update_cells geloggt + + # Setze Batch-Listen zurueck fuer die naechste Iteration + current_openai_batch_data = [] + rows_in_current_openai_batch = [] + + # Pause nach jedem OpenAI Batch (nutzt Config Block 1). + # Dies ist wichtig, um Rate Limits zu vermeiden. + # Nutze Config.RETRY_DELAY, ggf. kuerzer, da es ein Batch war + pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.5 # 50% der Retry-Wartezeit + self.logger.debug(f"--- Batch Zeilen {batch_start_row}-{batch_end_row} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---") # <<< GEÄNDERT + time.sleep(pause_duration) + + + # --- Verarbeitung des letzten unvollstaendigen Batches nach der Schleife --- + # Wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind + if current_openai_batch_data: + # Logge den Start des finalen Batches + batch_start_row = current_openai_batch_data[0]['row_num'] + batch_end_row = current_openai_batch_data[-1]['row_num'] + self.logger.debug(f"\n--- Starte FINALEN Wiki-Verifizierungs-Batch ({len(current_openai_batch_data)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT + + # Rufe die interne Methode auf, die den OpenAI Call macht + batch_results = self._process_verification_openai_batch(current_openai_batch_data) + # Ergebnisse sollten ein Dictionary {row_num: raw_chatgpt_answer} sein, auch bei Fehlern. + + # Sammle Sheet Updates (S, T, U, V-Y, AX) fuer diesen finalen Batch + current_batch_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen + + # Iteriere ueber die Zeilennummern, die in DIESEM finalen OpenAI Batch waren + for row_num in rows_in_current_openai_batch: + # Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary. + answer = batch_results.get(row_num, "FEHLER: Batch-Ergebnis fehlt") # Fallback + + # Logik zur Bestimmung der Werte fuer S, T, U basierend auf 'answer' + wiki_confirm, alt_article, wiki_explanation = "", "", "" + # Leere V-Y Spalten + v_y_values = empty_vy_values # Liste von leeren Strings + if isinstance(answer, str) and answer.upper() == "OK": wiki_confirm = "OK"; wiki_explanation = "Passt laut KI zur Firma." + elif isinstance(answer, str) and answer.startswith("X |"): + parts = answer.split("|", 2); wiki_confirm = "X" + if len(parts) > 1: detail = parts[1].strip(); alt_article = detail.split(":", 1)[1].strip() if detail.lower().startswith("alternativer artikel:") else detail + if len(parts) > 2: reason_part = parts[2].strip(); wiki_explanation = reason_part.split(":", 1)[1].strip() if reason_part.lower().startswith("begruendung:") else reason_part + if not alt_article or not wiki_explanation: wiki_explanation += f" (Rohantwort: {answer[:100]}...)" + elif isinstance(answer, str) and answer.startswith("FEHLER"): wiki_confirm = "FEHLER"; wiki_explanation = answer; alt_article = "Siehe Begruendung" + else: wiki_confirm = "?"; wiki_explanation = f"Unerwartetes Format: {str(answer)[:100]}..."; alt_article = "Siehe Begruendung" + + + # Fuege Updates fuer S, T, U und AX hinzu + batch_sheet_updates.append({'range': f'{s_letter}{row_num}', 'values': [[wiki_confirm]]}) # Block 1 Column Map + batch_sheet_updates.append({'range': f'{t_letter}{row_num}', 'values': [[alt_article]]}) # Block 1 Column Map + batch_sheet_updates.append({'range': f'{u_letter}{row_num}', 'values': [[wiki_explanation]]}) # Block 1 Column Map + # Setze AX Timestamp + batch_sheet_updates.append({'range': f'{ts_ax_letter}{row_num}', 'values': [[current_batch_timestamp]]}) # Block 1 Column Map + + # Fuege Update zum Leeren von V-Y hinzu, falls Index gefunden wurde + if v_y_range_letter: + batch_sheet_updates.append({'range': f'{v_letter}{row_num}:{y_letter}{row_num}', 'values': [v_y_values]}) # Block 1 Column Map, interne Helfer + + + # Sende die gesammelten Updates fuer DIESEN finalen Batch. + if batch_sheet_updates: + self.logger.debug(f" Sende FINALES Sheet-Update fuer {len(rows_in_current_openai_batch)} Zeilen ({len(batch_sheet_updates)} Zellen)...") # <<< GEÄNDERT + # Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry. + success = self.sheet_handler.batch_update_cells(batch_sheet_updates) + if success: + self.logger.info(f" FINALES Sheet-Update fuer Wiki-Verifizierungs-Batch erfolgreich.") # <<< GEÄNDERT + # Der Fehlerfall wird von batch_update_cells geloggt + + + # Logge den Abschluss des Modus + self.logger.info(f"Wikipedia-Verifizierungs-Batch abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen ({skipped_no_wiki_url} wegen fehlender M-URL).") # <<< GEÄNDERT + # Keine Pause nach diesem Modus noetig, da die naechste Aktion im Dispatcher (Block 34) folgt Kürze übersprungen. Bitte aus Originalcode übernehmen.") + + + def _scrape_raw_text_task(self, task_info, get_website_raw_func): + """ + Scrapt den Rohtext einer Website in einem separaten Thread. + """ + logger = logging.getLogger(__name__ + ".scrape_worker") + row_num, url = task_info['row_num'], task_info['url'] + raw_text, error = "k.A.", None + try: + raw_text = get_website_raw_func(url) + if isinstance(raw_text, str) and (raw_text.startswith("k.A. (Fehler") or raw_text.startswith("FEHLER:")): + error = f"Scraping Fehler: {raw_text[:100]}..." + elif not isinstance(raw_text, str) or not raw_text.strip(): + error = "Scraping Task Fehler: Funktion gab keinen gueltigen String zurueck." + raw_text = "k.A. (Extraktion fehlgeschlagen)" + except Exception as e: + error = f"Unerwarteter Fehler im Scraping Task Zeile {row_num}: {e}" + logger.error(error) + raw_text = "k.A. (Unerwarteter Fehler Task)" + return {"row_num": row_num, "raw_text": raw_text, "error": error} + + def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None): + """ + Batch-Prozess NUR fuer Website-Scraping (Rohtext AR). + Laedt Daten neu, prueft Spalte AR auf Inhalt ('', 'k.A.', etc.) und ueberspringt Zeilen mit Inhalt. + Setzt AR + AT + AP fuer bearbeitete Zeilen. Sendet Updates gebuendelt. + + Args: + start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AT). + end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet). + limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt). + """ + # Verwenden Sie logger, da das Logging jetzt konfiguriert ist + # Logge die Konfiguration des Batch-Laufs + self.logger.info(f"Starte Website-Scraping (Batch AR, AT, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT + + + # --- Daten laden und Startzeile ermitteln --- + # Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt + if start_sheet_row is None: + self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren AT...") # <<< GEÄNDERT + # Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AT (Block 1 Column Map). + # Standardmaessig ab Zeile 7 + start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key="Website Scrape Timestamp", min_sheet_row=7) + + # Wenn get_start_row_index -1 zurueckgibt (Fehler) + if start_data_index_no_header == -1: + self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT + return # Beende die Methode + + # Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten Daten-Index + start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut + self.logger.info(f"Automatisch ermittelte Startzeile (erste leere AT Zelle): {start_sheet_row}") # <<< GEÄNDERT + else: + # Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein. + # Der load_data Aufruf ist mit retry_on_failure dekoriert (Block 2). + if not self.sheet_handler.load_data(): + self.logger.error("FEHLER beim Laden der Daten fuer process_website_scraping_batch.") # <<< GEÄNDERT + return # Beende die Methode, wenn das Laden fehlschlaegt + + + # Holen Sie die gesamte Datenliste (inklusive Header) aus dem SheetHandler. + all_data = self.sheet_handler.get_all_data_with_headers() + # Annahme: header_rows ist als Attribut im SheetHandler verfuegbar (Block 14). + header_rows = self.sheet_handler._header_rows + total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet + + + # Berechne Endzeile, wenn nicht manuell gesetzt + if end_sheet_row is None: + end_sheet_row = total_sheet_rows # Bis zur letzten Zeile + + # Logge den verarbeitungsbereich + self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT + + # Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht ueber Gesamtzeilen) + if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows: + self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT + return # Beende die Methode, wenn der Bereich leer ist + + + # --- Indizes und Buchstaben --- + # Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP (Block 1) vorhanden sind + required_keys = [ + "Website Rohtext", "CRM Website", "Version", "Website Scrape Timestamp", "CRM Name" # AR, D, AP, AT, B + ] + # Erstellen Sie ein Dictionary mit Schluesseln und Indizes + col_indices = {key: COLUMN_MAP.get(key) for key in required_keys} + + # Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden wurden + if None in col_indices.values(): + missing = [k for k, v in col_indices.items() if v is None] + self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_website_scraping_batch: {missing}. Breche ab.") # <<< GEÄNDERT + return # Beende die Methode bei kritischem Fehler + + # Ermitteln Sie die Indizes und Buchstaben fuer Updates (AR, AT, AP) + rohtext_col_idx = col_indices["Website Rohtext"] + website_col_idx = col_indices["CRM Website"] + version_col_idx = col_indices["Version"] + timestamp_col_idx = col_indices["Website Scrape Timestamp"] + name_col_idx = col_indices["CRM Name"] + + rohtext_col_letter = self.sheet_handler._get_col_letter(rohtext_col_idx + 1) # Block 14 _get_col_letter + version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1) + timestamp_col_letter = self.sheet_handler._get_col_letter(timestamp_col_idx + 1) + + + # --- Hauptlogik: Iteriere und sammle Batches --- + # Holen Sie die Batch-Groesse fuer Verarbeitung (Threading) aus Config (Block 1) + processing_batch_size = getattr(Config, 'PROCESSING_BATCH_SIZE', 20) + # Holen Sie die maximale Anzahl Worker aus Config (Block 1) + max_scraping_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10) + # Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1) + update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50) + + + tasks_for_processing_batch = [] # Tasks fuer den aktuellen Scraping-Batch (Liste von Dicts) + rows_in_current_scraping_batch = [] # 1-basierte Zeilennummern im aktuellen Batch + all_sheet_updates = [] # Gesammelte Updates fuer Batch-Schreiben ins Sheet (Liste von Dicts) + + + processed_count = 0 # Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den Batch aufgenommen werden (im Rahmen des Limits). + skipped_count = 0 # Zaehlt Zeilen, die uebersprungen wurden (wegen Inhalt oder fehlender URL). + skipped_no_url = 0 # Zaehlt Zeilen, die speziell wegen fehlender URL uebersprungen wurden. + + + # Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte Sheet-Zeilennummer) + for i in range(start_sheet_row, end_sheet_row + 1): + row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste + # Pruefen Sie, ob das Ende des Sheets erreicht wurde + if row_index_in_list >= total_sheet_rows: break # Ende des Sheets erreicht + + + row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile + + + # Stellen Sie sicher, dass die Zeile nicht leer ist + if not any(cell and isinstance(cell, str) and cell.strip() for cell in row): + #self.logger.debug(f"Zeile {i}: Uebersprungen (Leere Zeile).") # Zu viel Laerm im Debug + skipped_count += 1 # Zaehlen als uebersprungen + continue # Springe zur naechsten Zeile + + + # --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist --- + # Kriterium: Website Rohtext (AR) ist leer oder ein Standard-Fehlerwert. + # UND Website URL (D) ist vorhanden und gueltig aussehend. + + # Holen Sie den Wert aus Spalte AR (Website Rohtext) (nutzt interne Helfer _get_cell_value_safe) + cell_value_ar = self._get_cell_value_safe(row, "Website Rohtext") # Block 1 Column Map + # Pruefen Sie, ob AR leer ist oder einen Standard-Fehlerwert enthaelt. + ar_is_empty_or_default = not cell_value_ar or (isinstance(cell_value_ar, str) and str(cell_value_ar).strip().lower() in ["k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"]) + + + # Holen Sie den Wert aus Spalte D (CRM Website) (nutzt interne Helfer _get_cell_value_safe) + website_url = self._get_cell_value_safe(row, "CRM Website").strip() # Block 1 Column Map + # Pruefen Sie, ob die Website URL (D) vorhanden und gueltig aussehend ist. + website_url_is_valid_looking = website_url and isinstance(website_url, str) and website_url.lower() not in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"] # Fuege "http:" hinzu basierend auf Log + + + # Verarbeitung ist noetig, wenn AR leer/default ist UND D gefuellt/gueltig aussieht. + processing_needed_for_row = ar_is_empty_or_default and website_url_is_valid_looking + + + # Loggen der Pruefergebnisse fuer diese Zeile auf Debug-Level + log_check = (i < start_sheet_row + 5) or (i % 100 == 0) or (processing_needed_for_row) + if log_check: + company_name = self._get_cell_value_safe(row, "CRM Name").strip() # Block 1 Column Map + self.logger.debug(f"Zeile {i} ({company_name[:50]}... Website Scraping Check): AR leer/default? {ar_is_empty_or_default}, D gueltig? {website_url_is_valid_looking}. Benötigt Verarbeitung? {processing_needed_for_row}") # <<< GEÄNDERT + + + # Wenn die Verarbeitung fuer diese Zeile nicht noetig ist + if not processing_needed_for_row: + skipped_count += 1 # Zaehlen als uebersprungene Zeile + # Zaehlen Sie speziell, wenn die Zeile wegen fehlender gueltiger URL uebersprungen wurde. + if not website_url_is_valid_looking: skipped_no_url += 1 + continue # Springe zur naechsten Zeile + + + # --- Wenn Verarbeitung noetig: Fuege zur Batch-Liste hinzu --- + processed_count += 1 # Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im Rahmen des Limits zaehlen) + + # Pruefe das Limit fuer verarbeitete Zeilen + if limit is not None and isinstance(limit, int) and limit > 0 and processed_count > limit: + # Wenn das Limit erreicht ist und es ein positives Limit gibt + self.logger.info(f"Verarbeitungslimit ({limit}) fuer process_website_scraping_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT + break # Brich die Schleife ab + + + # Fuege die benoetigten Daten fuer den Task hinzu (Zeilennummer und URL) + tasks_for_processing_batch.append({"row_num": i, "url": website_url}) + # Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu + rows_in_current_scraping_batch.append(i) + + + # --- Verarbeite den Batch, wenn voll --- + # Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat. + # scraping_batch_size wird aus Config geholt (Block 1). + if len(tasks_for_processing_batch) >= processing_batch_size: + # Logge den Start der Batch-Verarbeitung + batch_start_row = tasks_for_processing_batch[0]['row_num'] + batch_end_row = tasks_for_processing_batch[-1]['row_num'] + self.logger.debug(f"\n--- Starte Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT + + scraping_results = {} # Dictionary zum Speichern der Ergebnisse {row_num: raw_text} + batch_error_count = 0 # Fehlerzaehler fuer diesen spezifischen Batch + + self.logger.debug(f" Scrape {len(tasks_for_processing_batch)} Websites parallel (max {max_scraping_workers} worker)...") # <<< GEÄNDERT + # Nutzt concurrent.futures.ThreadPoolExecutor fuer paralleles Scraping. + # max_workers wird aus Config geholt (Block 1). + with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor: + # Map tasks to futures. Ruft die INTERNE Worker-Funktion auf. + # Uebergibt das task_info Dictionary und die globale Funktion get_website_raw (Block 11) als Argument. + future_to_task = {executor.submit(self._scrape_raw_text_task, task, get_website_raw): task for task in tasks_for_processing_batch} # <<< Korrigiert: interne Methode + + # Verarbeite die Ergebnisse, sobald sie fertig sind. + for future in concurrent.futures.as_completed(future_to_task): + task = future_to_task[future] # Holen Sie die urspruenglichen Task-Daten (Dict) + try: + # Holen Sie das Ergebnis vom Future. Wenn die Worker-Funktion eine Exception wirft, wird diese hier gefangen. + result = future.result() # Ergebnis ist ein Dictionary {'row_num': ..., 'raw_text': ..., 'error': ...} + # Speichere das Ergebnis im scraping_results Dictionary + scraping_results[result['row_num']] = result['raw_text'] + # Wenn der Worker einen Fehler gemeldet hat (z.B. durch Fehlerstring im raw_text oder error-Feld) + if result.get('error'): + batch_error_count += 1 # Erhoehe den Fehlerzaehler fuer diesen Batch + + except Exception as exc: + # Dieser Block faengt unerwartete Fehler ab, die waehrend der Future-Ergebnis-Abfrage auftreten. + # Die meisten Fehler sollten von get_website_raws retry/logging behandelt werden. + row_num = task['row_num'] # Zeilennummer aus den Task-Daten + err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage Scraping Task Zeile {row_num} ({task['url'][:100]}): {type(exc).__name__} - {exc}" # Gekuerzt loggen + self.logger.error(err_msg) # <<< GEÄNDERT + # Setze einen Standard-Fehlerwert fuer diese Zeile im Ergebnis + scraping_results[row_num] = "k.A. (Unerwarteter Fehler Task)" + batch_error_count += 1 # Erhoehe den Fehlerzaehler + + + self.logger.debug(f" Scraping fuer Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") # <<< GEÄNDERT + + + # Sammle Sheet Updates (AR, AT, AP) fuer diesen Batch. + # Dies geschieht jetzt nach der parallelen Verarbeitung. + if scraping_results: + # Aktueller Zeitstempel und Version + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut + batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen + + + # Iteriere ueber die Zeilennummern im Batch, fuer die Ergebnisse vorliegen. + # Ergebnisse koennen Fehlerwerte enthalten. + for row_num, raw_text_res in scraping_results.items(): + # Fuege Updates fuer AR, AT und AP hinzu (nutzt interne Helfer) + # AR: Roh extrahierter Text (kann auch Fehlerwert sein) + batch_sheet_updates.append({'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]}) # Block 1 Column Map + # AT: Timestamp des Scraping-Versuchs (immer setzen, wenn versucht wurde) + batch_sheet_updates.append({'range': f'{timestamp_col_letter}{row_num}', 'values': [[current_timestamp]]}) # Block 1 Column Map + # AP: Version des Skripts + batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map + + + # Sammle diese Batch-Updates fuer das groessere Batch-Update am Ende oder bei Limit. + # update_batch_row_limit wird aus Config geholt (Block 1). + all_sheet_updates.extend(batch_sheet_updates) + + + # Leere den Scraping-Batch fuer die naechste Iteration + tasks_for_processing_batch = [] + rows_in_current_scraping_batch = [] + + + # Sende gesammelte Sheet Updates, wenn das Update-Batch-Limit erreicht ist. + # Updates pro Zeile sind 3 (AR, AT, AP). Anzahl der Zeilen = len(all_sheet_updates) / 3. + rows_in_update_batch = len(all_sheet_updates) // 3 # Ganzzahl-Division + + if rows_in_update_batch >= update_batch_row_limit: + self.logger.debug(f" Sende gesammelte Sheet-Updates ({rows_in_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT + # Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry. + # Wenn es fehlschlaegt, wird es intern geloggt. + success = self.sheet_handler.batch_update_cells(all_sheet_updates) + if success: + self.logger.info(f" Sheet-Update fuer {rows_in_update_batch} Zeilen erfolgreich.") # <<< GEÄNDERT + # Der Fehlerfall wird von batch_update_cells geloggt + + # Leere die gesammelten Updates nach dem Senden. + all_sheet_updates = [] + # rows_in_update_batch muss nicht explizit zurueckgesetzt werden, da es aus len(all_sheet_updates) berechnet wird. + + + # Keine Pause hier nach jedem kleinen Scraping-Batch, da wir auf batch_update warten. + # Die Pause kommt erst nach dem Batch-Update (oder am Ende des Modus). + # time.sleep(0.1) # Optionale kurze Pause + + + # --- Verarbeitung des letzten unvollstaendigen Scraping-Batches nach der Schleife --- + # Fuehre den letzten Batch aus, wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind. + if tasks_for_processing_batch: + # Logge den Start des finalen Batches + batch_start_row = tasks_for_processing_batch[0]['row_num'] + batch_end_row = tasks_for_processing_batch[-1]['row_num'] + self.logger.debug(f"\n--- Starte FINALEN Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT + + scraping_results = {} # Dictionary fuer die Ergebnisse + batch_error_count = 0 # Fehlerzaehler + + self.logger.debug(f" Scrape {len(tasks_for_processing_batch)} Websites parallel (max {max_scraping_workers} worker)...") # <<< GEÄNDERT + with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor: + # Map tasks to futures. Ruft die INTERNE Worker-Funktion auf. + future_to_task = {executor.submit(self._scrape_raw_text_task, task, get_website_raw): task for task in tasks_for_processing_batch} # <<< Korrigiert: interne Methode + + # Verarbeite die Ergebnisse + for future in concurrent.futures.as_completed(future_to_task): + task = future_to_task[future] # Holen Sie die urspruenglichen Task-Daten + try: + result = future.result() # Holen Sie das Ergebnis + scraping_results[result['row_num']] = result['raw_text'] + # Pruefe, ob der Worker einen Fehler gemeldet hat + if result.get('error'): batch_error_count += 1 + except Exception as exc: + # Faengt unerwartete Fehler bei der Ergebnisabfrage ab + row_num = task['row_num'] + err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage Scraping Task Zeile {row_num} ({task['url'][:100]}): {type(exc).__name__} - {exc}" # Gekuerzt loggen + self.logger.error(err_msg) # <<< GEÄNDERT + # Setze einen Standard-Fehlerwert + scraping_results[row_num] = "k.A. (Unerwarteter Fehler Task)" + batch_error_count += 1 + + + self.logger.debug(f" FINALER Scraping Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler).") # <<< GEÄNDERT + + # Sammle Sheet Updates (AR, AT, AP) fuer diesen finalen Batch. + if scraping_results: + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut + batch_sheet_updates = [] # Updates fuer diesen spezifischen Batch + # Iteriere ueber die Zeilennummern im Batch, fuer die Ergebnisse vorliegen. + for row_num, raw_text_res in scraping_results.items(): + # Fuege Updates fuer AR, AT und AP hinzu + batch_sheet_updates.append({'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]}) # Block 1 Column Map + batch_sheet_updates.append({'range': f'{timestamp_col_letter}{row_num}', 'values': [[current_timestamp]]}) # Block 1 Column Map + batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map + # Fuege diese Updates zur globalen Liste hinzu (wird dann nur noch einmal gesendet) + all_sheet_updates.extend(batch_sheet_updates) + + + # --- Finale Sheet Updates senden --- + # Sende alle verbleibenden gesammelten Updates in einem letzten Batch-Update. + if all_sheet_updates: + rows_in_final_update_batch = len(all_sheet_updates) // 3 # Updates pro Zeile ist 3 + self.logger.info(f"Sende FINALE gesammelte Sheet-Updates ({rows_in_final_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT + # Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry. + success = self.sheet_handler.batch_update_cells(all_sheet_updates) + if success: + self.logger.info(f"FINALES Sheet-Update erfolgreich.") # <<< GEÄNDERT + # Der Fehlerfall wird von batch_update_cells geloggt + + + # Logge den Abschluss des Modus + self.logger.info(f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen.") # <<< GEÄNDERT + # Keine Pause nach diesem Modus noetig, da die naechste Aktion im Dispatcher (Block 34) folgt. + + def process_summarization_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None): + """ + Batch-Prozess NUR fuer Website-Zusammenfassung (AS). + Laedt Daten neu, prueft, ob Rohtext (AR) vorhanden und Zusammenfassung (AS) fehlt. + Fasst Rohtexte im Batch ueber OpenAI zusammen und setzt AS + AP. + + Args: + start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AS). + end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet). + limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt). + """ + # Verwenden Sie logger, da das Logging jetzt konfiguriert ist + # Logge die Konfiguration des Batch-Laufs + self.logger.info(f"Starte Website-Zusammenfassung (Batch AS, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT + + + # --- Daten laden und Startzeile ermitteln --- + # Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt + if start_sheet_row is None: + self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren AS...") # <<< GEÄNDERT + # Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AS (Block 1 Column Map). + # Standardmaessig ab Zeile 7 + start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key="Website Zusammenfassung", min_sheet_row=7) + + # Wenn get_start_row_index -1 zurueckgibt (Fehler) + if start_data_index_no_header == -1: + self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT + return # Beende die Methode + + # Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten Daten-Index + start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut + self.logger.info(f"Automatisch ermittelte Startzeile (erste leere AS Zelle): {start_sheet_row}") # <<< GEÄNDERT + else: + # Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein. + # Der load_data Aufruf ist mit retry_on_failure dekoriert (Block 2). + if not self.sheet_handler.load_data(): + self.logger.error("FEHLER beim Laden der Daten fuer process_summarization_batch.") # <<< GEÄNDERT + return # Beende die Methode, wenn das Laden fehlschlaegt + + + # Holen Sie die gesamte Datenliste (inklusive Header) aus dem SheetHandler. + all_data = self.sheet_handler.get_all_data_with_headers(); + # Annahme: header_rows ist als Attribut im SheetHandler verfuegbar (Block 14). + header_rows = self.sheet_handler._header_rows; + total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet + + + # Berechne Endzeile, wenn nicht manuell gesetzt + if end_sheet_row is None: + end_sheet_row = total_sheet_rows # Bis zur letzten Zeile + + # Logge den verarbeitungsbereich + self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT + + # Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht ueber Gesamtzeilen) + if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows: + self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT + return # Beende die Methode, wenn der Bereich leer ist + + + # --- Indizes und Buchstaben --- + # Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP (Block 1) vorhanden sind + required_keys = [ + "Website Rohtext", "Website Zusammenfassung", "Version", "CRM Name" # AR, AS, AP, B + ] + # Erstellen Sie ein Dictionary mit Schluesseln und Indizes + col_indices = {key: COLUMN_MAP.get(key) for key in required_keys} + + # Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden wurden + if None in col_indices.values(): + missing = [k for k, v in col_indices.items() if v is None] + self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_summarization_batch: {missing}. Breche ab.") # <<< GEÄNDERT + return # Beende die Methode bei kritischem Fehler + + # Ermitteln Sie die Indizes und Buchstaben fuer Updates (AS, AP) + rohtext_col_idx = col_indices["Website Rohtext"] + summary_col_idx = col_indices["Website Zusammenfassung"] + version_col_idx = col_indices["Version"] + name_col_idx = col_indices["CRM Name"] # Benoetigt fuer Logging + + summary_col_letter = self.sheet_handler._get_col_letter(summary_col_idx + 1) # Block 14 _get_col_letter + version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1) + + + # --- Verarbeitung --- + # Holen Sie die Batch-Groesse fuer OpenAI-Aufrufe aus Config (Block 1) + openai_batch_size = getattr(Config, 'OPENAI_BATCH_SIZE_LIMIT', 4) + # Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1) + update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50) + + + tasks_for_openai_batch = [] # Tasks fuer den aktuellen OpenAI Batch (Liste von Dicts) + rows_in_current_openai_batch = [] # 1-basierte Zeilennummern im aktuellen OpenAI Batch + all_sheet_updates = [] # Gesammelte Updates fuer Batch-Schreiben ins Sheet (Liste von Dicts) + + + processed_count = 0 # Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den Batch aufgenommen werden (im Rahmen des Limits). + skipped_count = 0 # Zaehlt Zeilen, die uebersprungen wurden (wegen fehlendem Rohtext oder vorhandener Zusammenfassung). + + + # Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte Sheet-Zeilennummer) + for i in range(start_sheet_row, end_sheet_row + 1): + row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste + # Pruefen Sie, ob das Ende des Sheets erreicht wurde + if row_index_in_list >= total_sheet_rows: break # Ende des Sheets erreicht + + + row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile + + + # Stellen Sie sicher, dass die Zeile nicht leer ist + if not any(cell and isinstance(cell, str) and cell.strip() for cell in row): + #self.logger.debug(f"Zeile {i}: Uebersprungen (Leere Zeile).") # Zu viel Laerm im Debug + skipped_count += 1 # Zaehlen als uebersprungen + continue # Springe zur naechsten Zeile + + + # --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist --- + # Kriterium: Website Rohtext (AR) ist vorhanden und gueltig (nicht k.A. oder Fehlerwerte). + # UND Website Zusammenfassung (AS) ist leer oder ein Standard-Fehlerwert. + + # Holen Sie den Wert aus Spalte AR (Website Rohtext) (nutzt interne Helfer _get_cell_value_safe) + raw_text = self._get_cell_value_safe(row, "Website Rohtext") # Block 1 Column Map + # Pruefen Sie, ob AR gefuellt und gueltig ist. + raw_text_is_valid = raw_text and isinstance(raw_text, str) and str(raw_text).strip().lower() not in ["k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"] + + + # Holen Sie den Wert aus Spalte AS (Website Zusammenfassung) (nutzt interne Helfer _get_cell_value_safe) + summary_value = self._get_cell_value_safe(row, "Website Zusammenfassung") # Block 1 Column Map + # Pruefen Sie, ob AS leer ist oder einen Standard-Fehlerwert enthaelt. + summary_is_empty_or_default = not summary_value or (isinstance(summary_value, str) and str(summary_value).strip().lower() in ["k.a.", "k.a. (keine zusammenfassung erhalten)"]) + + + # Verarbeitung ist noetig, wenn AR gueltig ist UND AS leer/default ist. + processing_needed_for_row = raw_text_is_valid and summary_is_empty_or_default + + + # Loggen der Pruefergebnisse fuer diese Zeile auf Debug-Level + log_check = (i < start_sheet_row + 5) or (i % 100 == 0) or (processing_needed_for_row) + if log_check: + company_name = self._get_cell_value_safe(row, "CRM Name").strip() # Block 1 Column Map + self.logger.debug(f"Zeile {i} ({company_name[:50]}... Website Summarization Check): AR gueltig? {raw_text_is_valid} (len={len(str(raw_text))}), AS leer/default? {summary_is_empty_or_default}. Benötigt Verarbeitung? {processing_needed_for_row}") # <<< GEÄNDERT + + + # Wenn die Verarbeitung fuer diese Zeile nicht noetig ist + if not processing_needed_for_row: + skipped_count += 1 # Zaehlen als uebersprungene Zeile + continue # Springe zur naechsten Zeile + + + # --- Wenn Verarbeitung noetig: Fuege zur Batch-Liste fuer OpenAI hinzu --- + processed_count += 1 # Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im Rahmen des Limits zaehlen) + + # Pruefe das Limit fuer verarbeitete Zeilen + if limit is not None and isinstance(limit, int) and limit > 0 and processed_count > limit: + # Wenn das Limit erreicht ist und es ein positives Limit gibt + self.logger.info(f"Verarbeitungslimit ({limit}) fuer process_summarization_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT + break # Brich die Schleife ab + + + # Fuege die benoetigten Daten fuer den OpenAI Batch hinzu (Zeilennummer und Rohtext) + tasks_for_openai_batch.append({'row_num': i, 'raw_text': raw_text}) + # Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu + rows_in_current_openai_batch.append(i) + + + # --- Verarbeite den Batch, wenn voll --- + # Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat. + # openai_batch_size wird aus Config geholt (Block 1). + if len(tasks_for_openai_batch) >= openai_batch_size: + # Logge den Start der Batch-Verarbeitung + batch_start_row = tasks_for_openai_batch[0]['row_num'] + batch_end_row = tasks_for_openai_batch[-1]['row_num'] + self.logger.debug(f"\n--- Starte Website-Summarization Batch ({len(tasks_for_openai_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT + + # Rufe die globale Funktion auf, die den OpenAI Call fuer den Batch macht (Block 9). + # summarize_batch_openai ist mit retry_on_failure dekoriert (Block 2). + # Wenn summarize_batch_openai eine Exception wirft (nach Retries), wird diese hier gefangen. + # !!! KORRIGIERTER AUFRUF !!! + try: + # Rufen Sie die korrekte globale Funktion auf + batch_results = summarize_batch_openai(tasks_for_openai_batch) # <<< Korrigierter Aufruf (vorher war fälschlicherweise _process_verification_openai_batch) + # Ergebnisse sollten ein Dictionary {row_num: summary_text} sein, auch bei Fehlern. + + # Sammle Sheet Updates (AS, AP) fuer diesen Batch + current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut + batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen + + # Iteriere ueber die Zeilennummern, die in DIESEM OpenAI Batch waren + for row_num in rows_in_current_openai_batch: + # Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary. + # Fallback auf einen Fehlerstring, wenn das Ergebnis fehlt (sollte nicht passieren, wenn summarize_batch_openai korrekt ist). + summary = batch_results.get(row_num, "k.A. (Batch Ergebnis fehlte)") + # Stelle sicher, dass 'k.A.' bei leeren/kurzen Summaries gesetzt wird + if not summary or (isinstance(summary, str) and summary.strip().lower() in ["k.a.", "k.a. (keine zusammenfassung erhalten)"]): + summary = "k.A. (Keine Zusammenfassung erhalten)" + # Fuege "k.A." oder Fehler an, wenn der Wert von summarize_batch_openai ein Fehlerstring ist + elif isinstance(summary, str) and (summary.startswith("k.A. (Fehler") or summary.startswith("FEHLER:")): + pass # Behalte den Fehlerstring von summarize_batch_openai + + # Fuege Updates fuer AS und AP hinzu (nutzt interne Helfer) + batch_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[summary]]}) # Block 1 Column Map + batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map + + + # Sammle diese Batch-Updates fuer das groessere Batch-Update + all_sheet_updates.extend(batch_sheet_updates) + + + except Exception as e_openai_batch: + # Wenn summarize_batch_openai eine Exception wirft (nach Retries) + # Der Fehler wird bereits vom retry_on_failure Decorator auf summarize_batch_openai geloggt. + self.logger.error(f"Endgueltiger FEHLER beim OpenAI-Batch-Aufruf fuer Zusammenfassung: {e_openai_batch}") # <<< GEÄNDERT + # Logge den Traceback + self.logger.debug(traceback.format_exc()) # <<< GEÄNDERT + # Fügen Sie Fehlerwerte für alle Zeilen im Batch hinzu + current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut + for row_num in rows_in_current_openai_batch: + error_summary = f"FEHLER OpenAI Batch: {str(e_openai_batch)[:100]}..." # Gekuerzt + # Fuege Updates mit Fehlerwerten fuer AS und AP hinzu + all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[error_summary]]}) # Block 1 Column Map + all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map + + + # Leere den OpenAI-Batch zurueck + tasks_for_openai_batch = [] + rows_in_current_openai_batch = [] + + + # Sende gesammelte Sheet Updates, wenn das Update-Batch-Limit erreicht ist. + # Updates pro Zeile sind 2 (AS, AP). Anzahl der Zeilen = len(all_sheet_updates) / 2. + rows_in_update_batch = len(all_sheet_updates) // 2 + + if rows_in_update_batch >= update_batch_row_limit: + self.logger.debug(f" Sende gesammelte Sheet-Updates ({rows_in_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT + # Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry. + # Wenn es fehlschlaegt, wird es intern geloggt. + success = self.sheet_handler.batch_update_cells(all_sheet_updates) + if success: + self.logger.info(f" Sheet-Update fuer {rows_in_update_batch} Zeilen erfolgreich.") # <<< GEÄNDERT + # Der Fehlerfall wird von batch_update_cells geloggt + + # Leere die gesammelten Updates nach dem Senden. + all_sheet_updates = [] + + # Kurze Pause nach jedem OpenAI Batch (nutzt Config Block 1). + # Dies ist wichtig, um Rate Limits zu vermeiden. + pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.5 # 50% der Retry-Wartezeit + self.logger.debug(f"Warte {pause_duration:.2f}s vor naechstem Batch...") # <<< GEÄNDERT + time.sleep(pause_duration) + + + # --- Verarbeitung des letzten unvollstaendigen OpenAI Batches nach der Schleife --- + # Wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind. + if tasks_for_openai_batch: # Korrektur: War vorher `current_openai_batch_data` + # Logge den Start des finalen Batches + batch_start_row = tasks_for_openai_batch[0]['row_num'] # Korrektur: War vorher `current_openai_batch_data` + batch_end_row = tasks_for_openai_batch[-1]['row_num'] # Korrektur: War vorher `current_openai_batch_data` + self.logger.debug(f"\n--- Starte FINALEN Website-Summarization Batch ({len(tasks_for_openai_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT + + # Rufe die globale Funktion auf, die den OpenAI Call fuer den Batch macht (Block 9). + # summarize_batch_openai ist mit retry_on_failure dekoriert (Block 2). + # Wenn summarize_batch_openai eine Exception wirft (nach Retries), wird diese hier gefangen. + batch_results = None + try: + batch_results = summarize_batch_openai(tasks_for_openai_batch) # <<< Korrekter Aufruf Block 9, Korrektur: War vorher `current_openai_batch_data` + # Ergebnisse sollten ein Dictionary {row_num: summary_text} sein, auch bei Fehlern. + + # Sammle Sheet Updates (AS, AP) fuer diesen finalen Batch + current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut + batch_sheet_updates = [] # Updates fuer diesen spezifischen Batch + # Iteriere ueber die Zeilennummern im Batch, fuer die Ergebnisse vorliegen. + for row_num in rows_in_current_openai_batch: + # Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary. + summary = batch_results.get(row_num, "k.A. (Batch Ergebnis fehlte)") # Fallback + + # Stelle sicher, dass 'k.A.' bei leeren/kurzen Summaries gesetzt wird + if not summary or (isinstance(summary, str) and summary.strip().lower() in ["k.a.", "k.a. (keine zusammenfassung erhalten)"]): + summary = "k.A. (Keine Zusammenfassung erhalten)" + # Fuege "k.A." oder Fehler an, wenn der Wert von summarize_batch_openai ein Fehlerstring ist + elif isinstance(summary, str) and (summary.startswith("k.A. (Fehler") or summary.startswith("FEHLER:")): + pass # Behalte den Fehlerstring von summarize_batch_openai + + + # Fuege Updates fuer AS und AP hinzu + batch_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[summary]]}) # Block 1 Column Map + batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map + + + # Fuege diese Updates zur globalen Liste hinzu (wird dann nur noch einmal gesendet) + all_sheet_updates.extend(batch_sheet_updates) + + + except Exception as e_openai_batch: + # Wenn summarize_batch_openai eine Exception wirft (nach Retries) + # Der Fehler wird bereits vom retry_on_failure Decorator auf summarize_batch_openai geloggt. + self.logger.error(f"Endgueltiger FEHLER beim FINALEN OpenAI-Batch-Aufruf fuer Zusammenfassung: {e_openai_batch}") # <<< GEÄNDERT + # Logge den Traceback + self.logger.debug(traceback.format_exc()) # <<< GEÄNDERT + # Fügen Sie Fehlerwerte für alle Zeilen im Batch hinzu + current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut + for row_num in rows_in_current_openai_batch: + error_summary = f"FEHLER OpenAI Batch: {str(e_openai_batch)[:100]}..." # Gekuerzt + # Fuege Updates mit Fehlerwerten fuer AS und AP hinzu + all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[error_summary]]}) # Block 1 Column Map + all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map + + + # --- Finale Sheet Updates senden --- + # Sende alle verbleibenden gesammelten Updates in einem letzten Batch-Update. + if all_sheet_updates: + rows_in_final_update_batch = len(all_sheet_updates) // 2 # Updates pro Zeile ist 2 + self.logger.info(f"Sende FINALE gesammelte Sheet-Updates ({rows_in_final_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT + # Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry. + success = self.sheet_handler.batch_update_cells(all_sheet_updates) + if success: + self.logger.info(f"FINALES Sheet-Update erfolgreich.") # <<< GEÄNDERT + # Der Fehlerfall wird von batch_update_cells geloggt + + + # Logge den Abschluss des Modus + self.logger.info(f"Website-Zusammenfassung (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen.") # <<< GEÄNDERT + # Keine Pause nach diesem Modus noetig, da die naechste Aktion im Dispatcher (Block 34) folgt. \ No newline at end of file