Files
Brancheneinstufung2/data_processor.py
2025-06-26 14:50:35 +00:00

1820 lines
119 KiB
Python

# --- START OF FILE data_processor.py (Part 1/7) ---
#!/usr/bin/env python3
"""
data_processor.py
Zentrale Klasse zur Orchestrierung und Verarbeitung von Unternehmensdaten.
Enthält die Logik für die Verarbeitung einzelner Zeilen sowie die Steuerung
verschiedener Batch-Modi und Dienstprogramme.
"""
import logging
import time
import traceback
import concurrent.futures
import threading
import pickle
import json
import os
from datetime import datetime
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
# Import der abhängigen Module
from config import Config, COLUMN_MAP, MODEL_FILE, IMPUTER_FILE, PATTERNS_FILE_JSON
from helpers import (
serp_website_lookup, get_website_raw, scrape_website_details, summarize_website_content,
URL_CHECK_MARKER, evaluate_branche_chatgpt, get_numeric_filter_value,
summarize_batch_openai, load_target_schema, ALLOWED_TARGET_BRANCHES,
serp_wikipedia_lookup, search_linkedin_contacts, is_valid_wikipedia_article_url
)
# Klassen-Imports
from google_sheet_handler import GoogleSheetHandler
from wikipedia_scraper import WikipediaScraper
class DataProcessor:
"""
Zentrale Klasse zur Orchestrierung und Verarbeitung von Unternehmensdaten.
"""
def __init__(self, sheet_handler, wiki_scraper):
"""
Initialisiert den DataProcessor mit Instanzen von Handler-Klassen.
"""
self.logger = logging.getLogger(__name__ + ".DataProcessor")
self.logger.info("Initialisiere DataProcessor...")
if not isinstance(sheet_handler, GoogleSheetHandler):
self.logger.critical("DataProcessor Init FEHLER: Kein gueltiger GoogleSheetHandler uebergeben!")
raise ValueError("DataProcessor benoetigt eine gueltige GoogleSheetHandler Instanz.")
if not isinstance(wiki_scraper, WikipediaScraper):
self.logger.critical("DataProcessor Init FEHLER: Kein gueltiger WikipediaScraper uebergeben!")
raise ValueError("DataProcessor benoetigt eine gueltige WikipediaScraper Instanz.")
self.sheet_handler = sheet_handler
self.wiki_scraper = wiki_scraper
self.model = None
self.imputer = None
self._expected_features = None
self.logger.info("DataProcessor initialisiert mit Handlern.")
self._step_status_map = {
'wiki_verify': "Wiki Verif. Timestamp",
'website_scrape': "Website Scrape Timestamp",
'summarize_website': "Website Scrape Timestamp",
'branch_eval': "Timestamp letzte Pruefung",
'find_wiki_serp': "SerpAPI Wiki Search Timestamp",
'contact_search': "Contact Search Timestamp",
'wiki_updates_from_chatgpt': "Chat Wiki Konsistenzpruefung",
}
def _get_cell_value_safe(self, row, column_key):
"""
Hilfsfunktion fuer sicheren Zellenzugriff anhand des COLUMN_MAP Schluessels.
"""
idx = COLUMN_MAP.get(column_key)
if idx is None:
self.logger.error(f"_get_cell_value_safe: Schluessel '{column_key}' nicht in COLUMN_MAP gefunden.")
return ''
if len(row) > idx:
return row[idx] if row[idx] is not None else ''
else:
self.logger.debug(f"_get_cell_value_safe: Index {idx} fuer '{column_key}' ist gueltig, aber Zeile ist zu kurz (Laenge {len(row)}).")
return ''
def _needs_website_processing(self, row_data, force_reeval):
"""
Prueft, ob Website-Scraping/Summarization fuer diese Zeile noetig ist.
"""
if force_reeval:
return True
at_value = self._get_cell_value_safe(row_data, "Website Scrape Timestamp").strip()
if not at_value:
return True
return False
def _needs_wiki_processing(self, row_data, force_reeval):
"""
Prueft, ob Wikipedia-Suche/Extraktion fuer diese Zeile noetig ist.
"""
if force_reeval:
return True
an_value = self._get_cell_value_safe(row_data, "Wikipedia Timestamp").strip()
if not an_value:
return True
s_value = self._get_cell_value_safe(row_data, "Chat Wiki Konsistenzpruefung").strip().upper()
if s_value == "X (URL COPIED)":
return True
return False
def _needs_wiki_verification(self, row_data, force_reeval):
"""
Prueft, ob Wikipedia-Verifizierung fuer diese Zeile noetig ist.
"""
if force_reeval:
return True
ax_value = self._get_cell_value_safe(row_data, "Wiki Verif. Timestamp").strip()
if not ax_value:
m_value = self._get_cell_value_safe(row_data, "Wiki URL").strip()
if m_value and m_value.lower() not in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"]:
return True
return False
def _needs_chat_evaluations(self, row_data, force_reeval, wiki_data_just_updated):
"""
Prueft, ob ChatGPT-Evaluationen fuer diese Zeile noetig sind.
"""
if force_reeval:
return True
ao_value = self._get_cell_value_safe(row_data, "Timestamp letzte Pruefung").strip()
if not ao_value:
return True
if wiki_data_just_updated:
return True
return False
def _needs_ml_prediction(self, row_data, force_reeval, chat_eval_just_ran):
"""
Prueft, ob die ML-Schaetzung fuer diese Zeile noetig ist.
"""
if force_reeval:
return True
au_value = self._get_cell_value_safe(row_data, "Geschaetzter Techniker Bucket").strip()
if not au_value or au_value.lower() in ["k.a.", "fehler schaetzung"]:
if chat_eval_just_ran:
return True
av_value = self._get_cell_value_safe(row_data, "Finaler Umsatz (Wiki>CRM)").strip()
aw_value = self._get_cell_value_safe(row_data, "Finaler Mitarbeiter (Wiki>CRM)").strip()
if av_value != "k.A." and not av_value.startswith("FEHLER") and aw_value != "k.A." and not aw_value.startswith("FEHLER"):
return True
return False
def _process_single_row(self, row_num_in_sheet, row_data,
steps_to_run, force_reeval=False, clear_x_flag=False):
"""
Verarbeitet die Daten fuer eine einzelne Zeile im Sheet. Fuehrt ausgewaehlte
Anreicherungs- und Analyseprozesse durch.
"""
self.logger.info(f"--- Starte Verarbeitung fuer Zeile {row_num_in_sheet} {'(Re-Eval)' if force_reeval else ''} (Schritte: {', '.join(steps_to_run) if steps_to_run else 'Keine'}) ---")
updates = []
now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
any_processing_done = False
wiki_data_updated_in_this_run = False
chat_eval_just_ran = False
# --- Initiale Werte lesen ---
company_name = self._get_cell_value_safe(row_data, "CRM Name").strip()
website_url = self._get_cell_value_safe(row_data, "CRM Website").strip()
crm_kurzform = self._get_cell_value_safe(row_data, "CRM Kurzform").strip()
crm_branche = self._get_cell_value_safe(row_data, "CRM Branche").strip()
crm_beschreibung = self._get_cell_value_safe(row_data, "CRM Beschreibung").strip()
parent_account_name_d = self._get_cell_value_safe(row_data, "Parent Account Name").strip()
current_wiki_data = {
'url': self._get_cell_value_safe(row_data, "Wiki URL") or 'k.A.',
'sitz_stadt': self._get_cell_value_safe(row_data, "Wiki Sitz Stadt") or 'k.A.',
'sitz_land': self._get_cell_value_safe(row_data, "Wiki Sitz Land") or 'k.A.',
'first_paragraph': self._get_cell_value_safe(row_data, "Wiki Absatz") or 'k.A.',
'branche': self._get_cell_value_safe(row_data, "Wiki Branche") or 'k.A.',
'umsatz': self._get_cell_value_safe(row_data, "Wiki Umsatz") or 'k.A.',
'mitarbeiter': self._get_cell_value_safe(row_data, "Wiki Mitarbeiter") or 'k.A.',
'categories': self._get_cell_value_safe(row_data, "Wiki Kategorien") or 'k.A.'
}
final_wiki_data = current_wiki_data.copy()
website_raw = self._get_cell_value_safe(row_data, "Website Rohtext") or 'k.A.'
website_summary = self._get_cell_value_safe(row_data, "Website Zusammenfassung") or 'k.A.'
website_meta_details = self._get_cell_value_safe(row_data, "Website Meta-Details") or 'k.A.'
url_pruefstatus = self._get_cell_value_safe(row_data, "URL Prüfstatus") or ''
# --- 1. Website Handling (Lookup, Scraping, Summarization, Meta) ---
run_website_step = 'web' in steps_to_run
website_processing_needed = self._needs_website_processing(row_data, force_reeval)
if run_website_step and website_processing_needed:
any_processing_done = True
grund_message = "Re-Eval" if force_reeval else "Timestamp (AJ) leer"
self.logger.info(f"Zeile {row_num_in_sheet}: Fuehre WEBSITE Schritte aus (Grund: {grund_message})...")
if not website_url or website_url.lower() == "k.a.":
self.logger.debug(" -> Website URL (E) leer, suche ueber SERP...")
try:
new_website = serp_website_lookup(company_name)
if new_website and new_website.lower() != "k.a." and not new_website.startswith("k.A. (Fehler"):
website_url = new_website
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["CRM Website"] + 1)}{row_num_in_sheet}', 'values': [[website_url]]})
url_pruefstatus = "URL_OK_SERP"
else:
url_pruefstatus = "URL_SERP_FAILED" if not website_url or website_url.lower() == "k.a." else url_pruefstatus
except Exception as e_serp_lookup_web:
self.logger.error(f"FEHLER bei SERP Website Lookup für '{company_name}': {e_serp_lookup_web}")
url_pruefstatus = "URL_SERP_ERROR"
if website_url and website_url.lower() not in ["k.a.", "http:"]:
self.logger.debug(f" -> Scrape Rohtext & Meta von {website_url[:100]}...")
try:
website_raw = get_website_raw(website_url)
if website_raw == URL_CHECK_MARKER:
url_pruefstatus = URL_CHECK_MARKER
website_summary, website_meta_details = "k.A. (URL prüfen)", "k.A. (URL prüfen)"
elif website_raw and str(website_raw).strip().lower() not in ["k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"]:
url_pruefstatus = "URL_OK_SCRAPED"
website_meta_details = scrape_website_details(website_url) or "k.A. (Keine Meta-Details)"
website_summary = summarize_website_content(website_raw) or "k.A. (Keine Zusammenfassung erhalten)"
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Meta-Details"] + 1)}{row_num_in_sheet}', 'values': [[website_meta_details]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Zusammenfassung"] + 1)}{row_num_in_sheet}', 'values': [[website_summary]]})
else:
if not str(website_raw).startswith("k.A. (Fehler"):
url_pruefstatus = "URL_SCRAPE_EMPTY_OR_BANNER"
website_summary, website_meta_details = "k.A.", "k.A."
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Zusammenfassung"] + 1)}{row_num_in_sheet}', 'values': [[website_summary]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Meta-Details"] + 1)}{row_num_in_sheet}', 'values': [[website_meta_details]]})
except Exception as e_scrape_web:
self.logger.error(f"FEHLER beim Website Scraping für '{company_name}': {e_scrape_web}")
website_raw, website_summary, website_meta_details = f"k.A. (Fehler Scraping: {str(e_scrape_web)[:50]})", "k.A.", "k.A."
url_pruefstatus = "URL_SCRAPE_ERROR"
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Zusammenfassung"] + 1)}{row_num_in_sheet}', 'values': [[website_summary]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Meta-Details"] + 1)}{row_num_in_sheet}', 'values': [[website_meta_details]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Rohtext"] + 1)}{row_num_in_sheet}', 'values': [[website_raw]]})
else:
self.logger.debug(f" -> Keine gültige Website URL für '{company_name}'. Web-Verarbeitung übersprungen.")
website_raw, website_summary, website_meta_details = "k.A. (Keine URL)", "k.A.", "k.A."
if not url_pruefstatus: url_pruefstatus = "URL_MISSING"
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Rohtext"] + 1)}{row_num_in_sheet}', 'values': [[website_raw]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Zusammenfassung"] + 1)}{row_num_in_sheet}', 'values': [[website_summary]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Meta-Details"] + 1)}{row_num_in_sheet}', 'values': [[website_meta_details]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["URL Prüfstatus"] + 1)}{row_num_in_sheet}', 'values': [[url_pruefstatus]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Scrape Timestamp"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
# ======================================================================
# === 2. Wikipedia Handling (Search, Extraction, Status Reset) ==========
# ======================================================================
run_wiki_step = 'wiki' in steps_to_run
wiki_processing_needed = self._needs_wiki_processing(row_data, force_reeval)
if run_wiki_step and wiki_processing_needed:
any_processing_done = True
grund_message_parts_wiki = []
if force_reeval: grund_message_parts_wiki.append('Re-Eval')
if not self._get_cell_value_safe(row_data, "Wikipedia Timestamp").strip(): grund_message_parts_wiki.append('Z leer')
if self._get_cell_value_safe(row_data, "Chat Wiki Konsistenzpruefung").strip().upper() == "X (URL COPIED)": grund_message_parts_wiki.append("AC='X (URL COPIED)'")
grund_message_wiki = ", ".join(filter(None, grund_message_parts_wiki)) or "Bedingung erfüllt"
self.logger.info(f"Zeile {row_num_in_sheet}: Fuehre WIKI Schritte aus (Grund: {grund_message_wiki})...")
current_wiki_url_r = self._get_cell_value_safe(row_data, "Wiki URL").strip()
system_suggested_parent_o = self._get_cell_value_safe(row_data, "System Vorschlag Parent Account").strip()
url_for_extraction = None
source_of_wiki_data_origin_log_msg = "Tochter (Initial)"
additional_info_for_af_col = ""
if not current_wiki_url_r or current_wiki_url_r.lower() == 'k.a.':
if parent_account_name_d and parent_account_name_d.lower() != 'k.a.':
self.logger.info(f" Zeile {row_num_in_sheet}: R leer, D ('{parent_account_name_d}') gesetzt. Suche Wiki für Parent D.")
try:
potential_url = serp_wikipedia_lookup(parent_account_name_d)
if potential_url and not str(potential_url).startswith("FEHLER"):
url_for_extraction = potential_url
source_of_wiki_data_origin_log_msg = f"Parent D ('{parent_account_name_d}')"
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Wiki Konsistenzpruefung"] + 1)}{row_num_in_sheet}', 'values': [["INFO_PARENT_AUS_D"]]})
additional_info_for_af_col = f"INFO: Wiki-URL von Parent (D): {parent_account_name_d}. "
else:
additional_info_for_af_col = f"WARN: Kein Wiki für Parent D '{parent_account_name_d}' gefunden. "
except Exception as e_d_lookup:
self.logger.error(f"Fehler bei Wiki-Suche für Parent D '{parent_account_name_d}': {e_d_lookup}")
additional_info_for_af_col = f"ERR: Suche Parent D fehlgeschlagen. "
if url_for_extraction is None and system_suggested_parent_o and system_suggested_parent_o.lower() != 'k.a.':
self.logger.info(f" Zeile {row_num_in_sheet}: R leer, D nicht erfolgreich. O ('{system_suggested_parent_o}') gesetzt. Suche Wiki für Parent O.")
try:
potential_url = serp_wikipedia_lookup(system_suggested_parent_o)
if potential_url and not str(potential_url).startswith("FEHLER"):
url_for_extraction = potential_url
source_of_wiki_data_origin_log_msg = f"Parent O ('{system_suggested_parent_o}')"
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Wiki Konsistenzpruefung"] + 1)}{row_num_in_sheet}', 'values': [["INFO_PARENT_AUS_O"]]})
additional_info_for_af_col += f"INFO: Wiki-URL von Parent (O): {system_suggested_parent_o}. "
else:
additional_info_for_af_col += f"WARN: Kein Wiki für Parent O '{system_suggested_parent_o}' gefunden. "
except Exception as e_o_lookup:
self.logger.error(f"Fehler bei Wiki-Suche für Parent O '{system_suggested_parent_o}': {e_o_lookup}")
additional_info_for_af_col += f"ERR: Suche Parent O fehlgeschlagen. "
if url_for_extraction is None:
search_for_daughter_needed = False
status_ac_reparse = self._get_cell_value_safe(row_data, "Chat Wiki Konsistenzpruefung").strip().upper() == "X (URL COPIED)"
ts_z_empty = not self._get_cell_value_safe(row_data, "Wikipedia Timestamp").strip()
r_url_valid_looking = current_wiki_url_r and "wikipedia.org/wiki/" in current_wiki_url_r.lower()
if status_ac_reparse or force_reeval or ts_z_empty or not r_url_valid_looking:
if r_url_valid_looking and not (status_ac_reparse or force_reeval):
self.logger.info(f" Zeile {row_num_in_sheet}: Nutze vorhandene Tochter-URL (R): {current_wiki_url_r}")
url_for_extraction = current_wiki_url_r
source_of_wiki_data_origin_log_msg = "Tochter (aus R)"
else:
self.logger.info(f" Zeile {row_num_in_sheet}: Starte neue Suche für Tochter '{company_name}'.")
search_for_daughter_needed = True
if search_for_daughter_needed:
try:
page_obj = self.wiki_scraper.search_company_article(company_name, website_url)
if page_obj:
url_for_extraction = page_obj.url
source_of_wiki_data_origin_log_msg = "Tochter (Suche erfolgreich)"
else:
url_for_extraction = "Kein Artikel gefunden"
except Exception as e_tochter_suche:
self.logger.error(f"Fehler bei Wiki-Suche für Tochter '{company_name}': {e_tochter_suche}")
url_for_extraction = f"Fehler Suche Tochter: {str(e_tochter_suche)[:50]}"
if url_for_extraction and isinstance(url_for_extraction, str) and url_for_extraction.lower() not in ["k.a.", "kein artikel gefunden"] and not url_for_extraction.startswith("FEHLER"):
self.logger.info(f" -> Extrahiere Wiki-Daten von URL ({source_of_wiki_data_origin_log_msg}): {url_for_extraction[:100]}...")
try:
extracted_data = self.wiki_scraper.extract_company_data(url_for_extraction)
if extracted_data and extracted_data.get('url') != 'k.A.':
final_wiki_data = extracted_data
wiki_data_updated_in_this_run = True
current_ac_val = self._get_cell_value_safe(row_data, "Chat Wiki Konsistenzpruefung").strip()
if source_of_wiki_data_origin_log_msg.startswith("Parent") and current_ac_val not in ["INFO_PARENT_AUS_D", "INFO_PARENT_AUS_O"]:
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Verif. Timestamp"] + 1)}{row_num_in_sheet}', 'values': [['']]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Wiki Inkonsistenz"] + 1)}{row_num_in_sheet}', 'values': [['']]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Wiki Artikel"] + 1)}{row_num_in_sheet}', 'values': [['']]})
elif not source_of_wiki_data_origin_log_msg.startswith("Parent"):
self.logger.info(f" -> Setze AC auf '?' für Tochter-Wiki-Update.")
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Wiki Konsistenzpruefung"] + 1)}{row_num_in_sheet}', 'values': [['?']]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Verif. Timestamp"] + 1)}{row_num_in_sheet}', 'values': [['']]})
else:
final_wiki_data['url'] = url_for_extraction
for key in ['sitz_stadt', 'sitz_land', 'first_paragraph', 'branche', 'umsatz', 'mitarbeiter', 'categories']:
final_wiki_data[key] = 'k.A. (Extraktion fehlgeschlagen)'
wiki_data_updated_in_this_run = True
except Exception as e_extract:
self.logger.error(f"FEHLER bei Wikipedia Datenextraktion von {url_for_extraction[:100]}...: {e_extract}")
final_wiki_data['url'] = url_for_extraction
for key in ['sitz_stadt', 'sitz_land', 'first_paragraph', 'branche', 'umsatz', 'mitarbeiter', 'categories']:
final_wiki_data[key] = 'k.A. (FEHLER Extr.)'
wiki_data_updated_in_this_run = True
elif url_for_extraction:
final_wiki_data['url'] = url_for_extraction
for key in ['sitz_stadt', 'sitz_land', 'first_paragraph', 'branche', 'umsatz', 'mitarbeiter', 'categories']:
final_wiki_data[key] = 'k.A.'
wiki_data_updated_in_this_run = True
if wiki_data_updated_in_this_run:
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki URL"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('url', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Sitz Stadt"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('sitz_stadt', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Sitz Land"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('sitz_land', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Absatz"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('first_paragraph', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Branche"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('branche', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Umsatz"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('umsatz', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Mitarbeiter"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('mitarbeiter', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Kategorien"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('categories', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wikipedia Timestamp"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
if additional_info_for_af_col:
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Begruendung bei Abweichung"] + 1)}{row_num_in_sheet}', 'values': [[additional_info_for_af_col]]})
if source_of_wiki_data_origin_log_msg.startswith("Parent"):
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["SerpAPI Wiki Search Timestamp"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
# --- 3. ChatGPT Evaluationen (Branch, FSM, etc.) & Plausi ---
run_chat_step = 'chat' in steps_to_run
chat_processing_needed = self._needs_chat_evaluations(row_data, force_reeval, wiki_data_updated_in_this_run)
if run_chat_step and chat_processing_needed:
any_processing_done = True
chat_eval_just_ran = True
grund_message_chat = "Re-Eval" if force_reeval else ("Wiki-Daten aktualisiert" if wiki_data_updated_in_this_run else "Timestamp (BN) leer")
self.logger.info(f"Zeile {row_num_in_sheet}: Fuehre CHATGPT Evaluationen & Plausi aus (Grund: {grund_message_chat})...")
# 3a. Branchen-Einstufung
self.logger.info(f" Zeile {row_num_in_sheet}: Starte Branchen-Einstufung ueber ChatGPT...")
try:
branch_result = evaluate_branche_chatgpt(
crm_branche,
crm_beschreibung,
final_wiki_data.get('branche', 'k.A.'),
final_wiki_data.get('categories', 'k.A.'),
website_summary
)
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get("branch", "FEHLER BRANCH")]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Branche Konfidenz"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get("confidence", "N/A CONF")]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get("consistency", "error CONS")]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begruendung Abweichung Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get("justification", "No JUST")]]})
except Exception as e_branch_eval:
self.logger.error(f"FEHLER bei Branchen-Einstufung für Zeile {row_num_in_sheet}: {e_branch_eval}")
# 3b, 3c, 3d: Weitere ChatGPT-Evaluationen (hier nicht detailliert implementiert, aber Platzhalter)
# ... Logik für FSM-Relevanz, Mitarbeiter-Schätzung, Umsatz-Schätzung, etc. ...
# 3e. Konsolidierung Umsatz/Mitarbeiter (BD, BE)
self.logger.debug(f" Zeile {row_num_in_sheet}: Konsolidiere Umsatz (BD) und Mitarbeiter (BE)...")
final_umsatz_str_konsolidiert = "k.A."
final_ma_str_konsolidiert = "k.A."
try:
crm_umsatz_val_str = self._get_cell_value_safe(row_data, "CRM Umsatz")
wiki_umsatz_val_str = final_wiki_data.get('umsatz', 'k.A.')
crm_ma_val_str = self._get_cell_value_safe(row_data, "CRM Anzahl Mitarbeiter")
wiki_ma_val_str = final_wiki_data.get('mitarbeiter', 'k.A.')
num_crm_umsatz = get_numeric_filter_value(crm_umsatz_val_str, is_umsatz=True)
num_wiki_umsatz = get_numeric_filter_value(wiki_umsatz_val_str, is_umsatz=True)
num_crm_ma = get_numeric_filter_value(crm_ma_val_str, is_umsatz=False)
num_wiki_ma = get_numeric_filter_value(wiki_ma_val_str, is_umsatz=False)
if parent_account_name_d and parent_account_name_d.lower() != 'k.a.':
self.logger.info(f" -> Parent D ('{parent_account_name_d}') gesetzt. Konsolidiere primär mit CRM-Daten der Tochter.")
final_num_umsatz = num_crm_umsatz if num_crm_umsatz > 0 else num_wiki_umsatz
final_num_ma = num_crm_ma if num_crm_ma > 0 else num_wiki_ma
else:
self.logger.debug(f" -> Parent D leer. Standardkonsolidierung Wiki > CRM.")
final_num_umsatz = num_wiki_umsatz if num_wiki_umsatz > 0 else num_crm_umsatz
final_num_ma = num_wiki_ma if num_wiki_ma > 0 else num_crm_ma
final_umsatz_str_konsolidiert = str(int(round(final_num_umsatz))) if final_num_umsatz > 0 else 'k.A.'
final_ma_str_konsolidiert = str(int(round(final_num_ma))) if final_num_ma > 0 else 'k.A.'
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Finaler Umsatz (Wiki>CRM)"] + 1)}{row_num_in_sheet}', 'values': [[final_umsatz_str_konsolidiert]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Finaler Mitarbeiter (Wiki>CRM)"] + 1)}{row_num_in_sheet}', 'values': [[final_ma_str_konsolidiert]]})
except Exception as e_consolidate:
self.logger.error(f"FEHLER bei Konsolidierung (BD/BE) für Zeile {row_num_in_sheet}: {e_consolidate}")
final_umsatz_str_konsolidiert, final_ma_str_konsolidiert = "FEHLER_KONSO", "FEHLER_KONSO"
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Finaler Umsatz (Wiki>CRM)"] + 1)}{row_num_in_sheet}', 'values': [['FEHLER_KONSO']]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Finaler Mitarbeiter (Wiki>CRM)"] + 1)}{row_num_in_sheet}', 'values': [['FEHLER_KONSO']]})
# 3f. Plausibilitäts-Checks (BG-BM)
self.logger.debug(f" Zeile {row_num_in_sheet}: Führe Plausibilitäts-Checks durch...")
try:
plausi_input_data = {
"Finaler Umsatz (Wiki>CRM)": final_umsatz_str_konsolidiert,
"Finaler Mitarbeiter (Wiki>CRM)": final_ma_str_konsolidiert,
"CRM Umsatz": self._get_cell_value_safe(row_data, "CRM Umsatz"),
"Wiki Umsatz": final_wiki_data.get('umsatz', 'k.A.'),
"CRM Anzahl Mitarbeiter": self._get_cell_value_safe(row_data, "CRM Anzahl Mitarbeiter"),
"Wiki Mitarbeiter": final_wiki_data.get('mitarbeiter', 'k.A.'),
"Parent Account Name": parent_account_name_d,
"System Vorschlag Parent Account": self._get_cell_value_safe(row_data, "System Vorschlag Parent Account"),
"Parent Vorschlag Status": self._get_cell_value_safe(row_data, "Parent Vorschlag Status")
}
plausi_results = self._check_financial_plausibility(plausi_input_data)
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Plausibilität Umsatz"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("plaus_umsatz_flag", "ERR_FLAG")]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Plausibilität Mitarbeiter"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("plaus_ma_flag", "ERR_FLAG")]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Plausibilität Umsatz/MA Ratio"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("plaus_ratio_flag", "ERR_FLAG")]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Abweichung Umsatz CRM/Wiki"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("abweichung_umsatz_flag", "ERR_FLAG")]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Abweichung MA CRM/Wiki"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("abweichung_ma_flag", "ERR_FLAG")]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Plausibilität Begründung"] + 1)}{row_num_in_sheet}', 'values': [[plausi_results.get("plausi_begruendung_final", "Fehler Begr.")]]})
except Exception as e_plausi:
self.logger.error(f"FEHLER bei Plausi-Checks für Zeile {row_num_in_sheet}: {e_plausi}")
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Plausibilität Prüfdatum"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Pruefung"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
# --- 4. Servicetechniker Schaetzung (ML Modell) ---
run_ml_step = 'ml_predict' in steps_to_run
ml_processing_needed = self._needs_ml_prediction(row_data, force_reeval, chat_eval_just_ran)
if run_ml_step and ml_processing_needed:
any_processing_done = True
self.logger.info(f"Zeile {row_num_in_sheet}: Fuehre ML-Schaetzung aus...")
try:
predicted_bucket = self._predict_technician_bucket(row_data)
if predicted_bucket and isinstance(predicted_bucket, str) and not predicted_bucket.startswith("FEHLER"):
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Geschaetzter Techniker Bucket"] + 1)}{row_num_in_sheet}', 'values': [[predicted_bucket]]})
self.logger.info(f" -> ML-Schaetzung erfolgreich: Bucket '{predicted_bucket}'.")
else:
self.logger.warning(f" -> ML-Schaetzung lieferte kein gueltiges Ergebnis: '{predicted_bucket}'.")
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Geschaetzter Techniker Bucket"] + 1)}{row_num_in_sheet}', 'values': [['k.A. (Schaetzung fehlgeschlagen)']]})
except Exception as e_ml:
self.logger.error(f"FEHLER bei ML-Schaetzung fuer Zeile {row_num_in_sheet}: {e_ml}")
self.logger.debug(traceback.format_exc())
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Geschaetzter Techniker Bucket"] + 1)}{row_num_in_sheet}', 'values': [[f'FEHLER Schaetzung: {str(e_ml)[:50]}...']]})
# ======================================================================
# === Abschluss der _process_single_row Verarbeitung ===================
# ======================================================================
# --- 5. Abschliessende Updates (Version, Tokens, ReEval Flag) ---
if any_processing_done:
version_col_idx = COLUMN_MAP.get("Version")
if version_col_idx is not None:
updates.append({'range': f'{self.sheet_handler._get_col_letter(version_col_idx + 1)}{row_num_in_sheet}', 'values': [[getattr(Config, 'VERSION', 'unknown')]]})
else:
self.logger.error("FEHLER: Spaltenschluessel 'Version' nicht in COLUMN_MAP gefunden.")
if force_reeval and clear_x_flag:
reeval_col_idx = COLUMN_MAP.get("ReEval Flag")
if reeval_col_idx is not None:
flag_col_letter = self.sheet_handler._get_col_letter(reeval_col_idx + 1)
if flag_col_letter:
updates.append({'range': f'{flag_col_letter}{row_num_in_sheet}', 'values': [['']]})
self.logger.debug(f" -> Update zum Loeschen des ReEval-Flags (A{row_num_in_sheet}) vorgemerkt.")
else:
self.logger.error(f"FEHLER: Konnte Spaltenbuchstaben fuer 'ReEval Flag' nicht ermitteln.")
else:
self.logger.error("FEHLER: 'ReEval Flag' Spaltenindex nicht in COLUMN_MAP gefunden.")
# --- 6. Batch Update fuer diese Zeile ---
if updates:
self.logger.info(f"Zeile {row_num_in_sheet}: Sende Batch-Update mit {len(updates)} Operationen...")
success = self.sheet_handler.batch_update_cells(updates)
if not success:
self.logger.error(f"Zeile {row_num_in_sheet}: ENDGUELTIGER FEHLER beim Batch-Update nach Retries.")
else:
if not any_processing_done:
self.logger.debug(f"Zeile {row_num_in_sheet}: Keine Updates zum Schreiben.")
pause_duration = max(0.05, getattr(Config, 'RETRY_DELAY', 5) / 20.0)
time.sleep(pause_duration)
self.logger.info(f"--- Verarbeitung fuer Zeile {row_num_in_sheet} abgeschlossen ---")
# ==========================================================================
# === Prozess Methoden (Sequentiell & Re-Evaluation) =======================
# ==========================================================================
def process_rows_sequentially(self, start_sheet_row, num_to_process,
process_wiki_steps=True, process_chatgpt_steps=True,
process_website_steps=True, process_ml_steps=True,
force_reeval_in_single_row=False):
"""
Verarbeitet eine feste Anzahl von Zeilen beginnend bei einer bestimmten
Sheet-Zeilennummer sequentiell.
"""
header_rows = self.sheet_handler._header_rows
if num_to_process is None or not isinstance(num_to_process, int) or num_to_process <= 0:
self.logger.info("Sequentielle Verarbeitung uebersprungen: num_to_process ist ungueltig oder <= 0.")
return
self.logger.info(f"Starte sequentielle Verarbeitung von {num_to_process} Zeilen ab Sheet-Zeile {start_sheet_row}...")
selected_steps_log = []
if process_wiki_steps: selected_steps_log.append("Wiki (wiki)")
if process_chatgpt_steps: selected_steps_log.append("ChatGPT (chat)")
if process_website_steps: selected_steps_log.append("Website (web)")
if process_ml_steps: selected_steps_log.append("ML Predict (ml_predict)")
self.logger.info(f" Ausgewaehlte Schritte: {', '.join(selected_steps_log) if selected_steps_log else 'Keine'}")
if force_reeval_in_single_row:
self.logger.warning(" !!! force_reeval=True wird fuer alle Zeilen in _process_single_row gesetzt !!!")
steps_to_run_set = set()
if process_wiki_steps: steps_to_run_set.add('wiki')
if process_chatgpt_steps: steps_to_run_set.add('chat')
if process_website_steps: steps_to_run_set.add('web')
if process_ml_steps: steps_to_run_set.add('ml_predict')
if not steps_to_run_set:
self.logger.warning("Keine Verarbeitungsschritte ausgewaehlt. Modus wird uebersprungen.")
return
if not self.sheet_handler.load_data():
self.logger.error("Fehler beim Laden der Daten fuer sequentielle Verarbeitung.")
return
all_data = self.sheet_handler.get_all_data_with_headers()
total_sheet_rows = len(all_data)
start_index_in_all_data = start_sheet_row - 1
if start_index_in_all_data >= total_sheet_rows:
self.logger.warning(f"Start-Sheet-Zeile {start_sheet_row} liegt ausserhalb der Daten. Keine Verarbeitung.")
return
if start_index_in_all_data < header_rows:
self.logger.warning(f"Start-Sheet-Zeile {start_sheet_row} liegt in Headern. Starte ab Zeile {header_rows + 1}.")
start_index_in_all_data = header_rows
end_index_in_all_data = min(start_index_in_all_data + num_to_process, total_sheet_rows)
self.logger.info(f"Sequentielle Verarbeitung: Index-Bereich [{start_index_in_all_data}, {end_index_in_all_data}).")
if start_index_in_all_data >= end_index_in_all_data:
self.logger.info(f"Keine Zeilen im definierten Bereich zu verarbeiten.")
return
processed_count = 0
for i in range(start_index_in_all_data, end_index_in_all_data):
row_num_in_sheet = i + 1
row_data = all_data[i]
if row_num_in_sheet <= header_rows: continue
if not any(cell and isinstance(cell, str) and cell.strip() for cell in row_data):
self.logger.debug(f"Ueberspringe leere Zeile {row_num_in_sheet}.")
continue
try:
self._process_single_row(
row_num_in_sheet=row_num_in_sheet,
row_data=row_data,
steps_to_run=steps_to_run_set,
force_reeval=force_reeval_in_single_row,
clear_x_flag=False
)
processed_count += 1
except Exception as e_proc:
self.logger.exception(f"FEHLER bei sequentieller Verarbeitung von Zeile {row_num_in_sheet}: {e_proc}")
self.logger.info(f"Sequentielle Verarbeitung abgeschlossen. {processed_count} Zeilen bearbeitet.")
def process_reevaluation_rows(self, row_limit=None, clear_flag=True,
process_wiki_steps=True, process_chatgpt_steps=True,
process_website_steps=True, process_ml_steps=True):
"""
Verarbeitet nur Zeilen, die in Spalte A mit 'x' markiert sind.
"""
self.logger.info(f"Starte Re-Evaluierungsmodus (Spalte A = 'x'). Max. Zeilen: {row_limit if row_limit is not None else 'Unbegrenzt'}")
selected_steps_log = []
if process_wiki_steps: selected_steps_log.append("Wiki")
if process_chatgpt_steps: selected_steps_log.append("ChatGPT")
if process_website_steps: selected_steps_log.append("Website")
if process_ml_steps: selected_steps_log.append("ML Predict")
self.logger.info(f"Ausgewaehlte Schritte fuer Re-Eval: {', '.join(selected_steps_log) if selected_steps_log else 'Keine'}")
steps_to_run_set = set()
if process_wiki_steps: steps_to_run_set.add('wiki')
if process_chatgpt_steps: steps_to_run_set.add('chat')
if process_website_steps: steps_to_run_set.add('web')
if process_ml_steps: steps_to_run_set.add('ml_predict')
if not steps_to_run_set:
self.logger.warning("Keine Verarbeitungsschritte fuer Re-Eval ausgewaehlt. Modus wird uebersprungen.")
return
if not self.sheet_handler.load_data():
self.logger.error("Fehler beim Laden der Daten fuer Re-Evaluation.")
return
all_data = self.sheet_handler.get_all_data_with_headers()
header_rows = self.sheet_handler._header_rows
if not all_data or len(all_data) <= header_rows:
self.logger.warning("Keine Datenzeilen fuer Re-Evaluation gefunden.")
return
reeval_col_idx = COLUMN_MAP.get("ReEval Flag")
if reeval_col_idx is None:
self.logger.critical("FEHLER: 'ReEval Flag' Spaltenindex nicht in COLUMN_MAP gefunden. Breche ab.")
return
rows_to_process = []
for idx, row_data in enumerate(all_data):
if idx < header_rows: continue
row_num_in_sheet = idx + 1
cell_a_value = self._get_cell_value_safe(row_data, "ReEval Flag").strip().lower()
if cell_a_value == "x":
rows_to_process.append({'row_num': row_num_in_sheet, 'data': row_data})
found_count = len(rows_to_process)
self.logger.info(f"{found_count} Zeilen mit ReEval-Flag 'x' gefunden.")
if found_count == 0: return
processed_count_actual = 0
for task in rows_to_process:
if row_limit is not None and processed_count_actual >= row_limit:
self.logger.info(f"Zeilenlimit ({row_limit}) fuer Re-Evaluation erreicht.")
break
self.logger.info(f"Bearbeite Re-Eval Zeile {task['row_num']}...")
processed_count_actual += 1
try:
self._process_single_row(
row_num_in_sheet=task['row_num'],
row_data=task['data'],
steps_to_run=steps_to_run_set,
force_reeval=True,
clear_x_flag=clear_flag
)
except Exception as e_proc_reval:
self.logger.exception(f"FEHLER bei Re-Evaluation von Zeile {task['row_num']}: {e_proc_reval}")
self.logger.info(f"Re-Evaluierung abgeschlossen. {processed_count_actual} Zeilen verarbeitet.")
# ==========================================================================
# === Batch Processing Methods =============================================
# ==========================================================================
@retry_on_failure
def _process_verification_openai_batch(self, batch_data):
"""
Verarbeitet einen Batch von Wikipedia-Verifizierungsanfragen ueber OpenAI.
"""
if not batch_data: return {}
self.logger.debug(f"Sende OpenAI-Batch fuer Wiki-Verifizierung ({len(batch_data)} Zeilen)...")
aggregated_prompt = (
"Du bist ein Experte in der Verifizierung von Wikipedia-Artikeln fuer Unternehmen. "
"Fuer jeden der folgenden Eintraege pruefe, ob der vorhandene Wikipedia-Artikel plausibel zum Firmennamen und zur Beschreibung passt. "
"Gib das Ergebnis fuer jeden Eintrag ausschliesslich im folgenden Format auf einer neuen Zeile aus:\n"
"Eintrag <Zeilennummer>: <Antwort>\n\n"
"Moegliche Antworten:\n"
"- 'OK' (wenn der Artikel gut passt)\n"
"- 'X | Alternativer Artikel: <URL> | Begruendung: <Kurze Begruendung>'\n"
"- 'X | Kein passender Artikel gefunden | Begruendung: <Kurze Begruendung>'\n\n"
"Eintraege zur Pruefung:\n--------------------\n"
)
max_desc_length = 200
for item in batch_data:
entry_text = (
f"Eintrag {item['row_num']}:\n"
f" Firmenname: {str(item.get('company_name', 'k.A.'))}\n"
f" CRM-Beschreibung: {str(item.get('crm_desc', 'k.A.'))[:max_desc_length]}\n"
f" Wikipedia-URL: {str(item.get('wiki_url', 'k.A.'))}\n"
f" Wiki-Absatz: {str(item.get('wiki_paragraph', 'k.A.'))[:max_desc_length]}\n"
f" Wiki-Kategorien: {str(item.get('wiki_categories', 'k.A.'))[:max_desc_length]}\n----\n"
)
aggregated_prompt += entry_text
try:
chat_response = summarize_batch_openai(aggregated_prompt, temperature=0.0) # Using summarize helper, but it's just a call_openai_chat wrapper
if not chat_response: raise openai.error.APIError("Keine Antwort von OpenAI erhalten.")
except Exception as e:
self.logger.error(f"Endgueltiger FEHLER beim OpenAI-Batch-Aufruf fuer Wiki-Verifizierung: {e}")
return {item['row_num']: f"FEHLER API: {str(e)[:100]}" for item in batch_data}
answers = {}
original_batch_row_nums = {item['row_num'] for item in batch_data}
lines = chat_response.strip().split('\n')
for line in lines:
match = re.match(r"Eintrag (\d+): (.*)", line.strip())
if match:
row_num, answer_text = int(match.group(1)), match.group(2).strip()
if row_num in original_batch_row_nums: answers[row_num] = answer_text
for row_num in original_batch_row_nums:
if row_num not in answers: answers[row_num] = "FEHLER: Antwort nicht geparst"
return answers
def process_verification_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None):
"""
Batch-Prozess nur fuer Wikipedia-Verifizierung (Spalten S-U, V-Y werden geleert).
Laedt Daten neu, prueft fuer jede Zeile im Bereich, ob Timestamp AX (Wiki Verif.)
bereits gesetzt ist, ob eine Wiki URL (M) vorhanden ist und ob Status S
nicht bereits 'OK', 'X (URL Copied)' oder 'X (Invalid Suggestion)' ist.
Setzt AX + AP fuer bearbeitete Zeilen und schreibt S-U in Batches.
Args:
start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AX).
end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet).
limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt).
"""
# Verwenden Sie logger, da das Logging jetzt konfiguriert ist
# Logge die Konfiguration des Batch-Laufs
self.logger.info(f"Starte Wikipedia-Verifizierungsmodus (Batch S-U, AX). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT
# --- Daten laden und Startzeile ermitteln ---
# Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt
if start_sheet_row is None:
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren AX...") # <<< GEÄNDERT
# Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AX (Block 1 Column Map).
# Standardmaessig ab Zeile 7
start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key="Wiki Verif. Timestamp", min_sheet_row=7)
# Wenn get_start_row_index -1 zurueckgibt (Fehler)
if start_data_index_no_header == -1:
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT
return # Beende die Methode
# Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten Daten-Index
start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut
self.logger.info(f"Automatisch ermittelte Startzeile (erste leere AX Zelle): {start_sheet_row}") # <<< GEÄNDERT
else:
# Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein.
# Der load_data Aufruf ist mit retry_on_failure dekoriert (Block 2).
if not self.sheet_handler.load_data():
self.logger.error("FEHLER beim Laden der Daten fuer process_verification_batch.") # <<< GEÄNDERT
return # Beende die Methode, wenn das Laden fehlschlaegt
# Holen Sie die gesamte Datenliste (inklusive Header) aus dem SheetHandler.
all_data = self.sheet_handler.get_all_data_with_headers()
# Annahme: header_rows ist als Attribut im SheetHandler verfuegbar (Block 14).
header_rows = self.sheet_handler._header_rows
total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet
# Berechne Endzeile, wenn nicht manuell gesetzt
if end_sheet_row is None:
end_sheet_row = total_sheet_rows # Bis zur letzten Zeile
# Logge den verarbeitungsbereich
self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT
# Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht ueber Gesamtzeilen)
if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows:
self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT
return # Beende die Methode, wenn der Bereich leer ist
# --- Indizes und Buchstaben ---
# Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP (Block 1) vorhanden sind
required_keys = [
"Wiki Verif. Timestamp", "Wiki URL", "Chat Wiki Konsistenzpruefung", # Pruefkriterien / Timestamp (AX, M, S)
"CRM Name", "CRM Beschreibung", "Wiki Absatz", "Wiki Kategorien", # Daten fuer Prompt (B, F, N, R)
"Chat Begruendung Wiki Inkonsistenz", "Chat Vorschlag Wiki Artikel", # Ergebnisspalten (T, U)
"Begruendung bei Abweichung", "Chat Begruendung Abweichung Branche", # Spalten V-Y zum Leeren
"Wikipedia Timestamp", "Timestamp letzte Pruefung", # Spalten AN, AO zum Leeren
"Version", "SerpAPI Wiki Search Timestamp" # Spalten AP, AY zum Leeren
]
# Erstellen Sie ein Dictionary mit Schluesseln und Indizes
col_indices = {key: COLUMN_MAP.get(key) for key in required_keys}
# Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden wurden
if None in col_indices.values():
missing = [k for k, v in col_indices.items() if v is None]
self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_verification_batch: {missing}. Breche ab.") # <<< GEÄNDERT
return # Beende die Methode bei kritischem Fehler
# Ermitteln Sie die Spaltenbuchstaben fuer Updates und Leerung (nutzt interne Helfer _get_col_letter Block 14)
ts_ax_letter = self.sheet_handler._get_col_letter(col_indices["Wiki Verif. Timestamp"] + 1) # Timestamp zu setzen (AX)
s_letter = self.sheet_handler._get_col_letter(col_indices["Chat Wiki Konsistenzpruefung"] + 1) # Status S
t_letter = self.sheet_handler._get_col_letter(col_indices["Chat Begruendung Wiki Inkonsistenz"] + 1) # Begruendung T
u_letter = self.sheet_handler._get_col_letter(col_indices["Chat Vorschlag Wiki Artikel"] + 1) # Vorschlag U
# Spalten V-Y leeren (werden in diesem Modus nicht neu befuellt).
# V ist Begruendung bei Abweichung (von Wiki-URL Pruefung CRM vs Wiki).
# Y ist Begruendung Abweichung Branche (von Chat).
v_idx = col_indices["Begruendung bei Abweichung"]
y_idx = col_indices["Chat Begruendung Abweichung Branche"] # Block 1 Column Map
# Erstellen Sie den Bereichsnamen (z.B. "V:Y")
v_letter = self.sheet_handler._get_col_letter(v_idx + 1)
y_letter = self.sheet_handler._get_col_letter(y_idx + 1)
v_y_range_letter = f'{v_letter}:{y_letter}' # z.B. V:Y
# Erstellen Sie eine Liste von leeren Strings fuer diesen Bereich
empty_vy_values = [''] * (y_idx - v_idx + 1) # Anzahl der Spalten = Y_Index - V_Index + 1
# Timestamps AN, AO, AP, AY leeren.
# Diese werden von anderen Schritten gesetzt und sollen hier zurueckgesetzt werden,
# um sicherzustellen, dass die Zeile bei Bedarf von diesen anderen Schritten erneut bearbeitet wird.
an_letter = self.sheet_handler._get_col_letter(col_indices["Wikipedia Timestamp"] + 1) # AN (Wiki Extraction TS)
ao_letter = self.sheet_handler._get_col_letter(col_indices["Timestamp letzte Pruefung"] + 1) # AO (Chat Evaluation TS)
ap_letter = self.sheet_handler._get_col_letter(col_indices["Version"] + 1) # AP (Version)
ay_letter = self.sheet_handler._get_col_letter(col_indices["SerpAPI Wiki Search Timestamp"] + 1) # AY (SerpAPI Wiki TS)
# --- Verarbeitung ---
# Holen Sie die Batch-Groesse fuer OpenAI-Aufrufe aus Config (Block 1)
openai_batch_size = getattr(Config, 'PROCESSING_BATCH_SIZE', 20) # Nutzt dieselbe Batch-Groesse wie Scraping/Summarization
# Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1)
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50)
current_openai_batch_data = [] # Daten fuer den aktuellen OpenAI Batch (Liste von Dicts)
rows_in_current_openai_batch = [] # 1-basierte Zeilennummern im aktuellen OpenAI Batch
all_sheet_updates = [] # Gesammelte Updates fuer Batch-Schreiben ins Sheet (Liste von Dicts)
processed_count = 0 # Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den Batch aufgenommen werden (im Rahmen des Limits).
skipped_count = 0 # Zaehlt Zeilen, die uebersprungen wurden (wegen Status, fehlender Daten etc.).
skipped_no_wiki_url = 0 # Zaehlt Zeilen, die speziell wegen fehlender M-URL uebersprungen wurden.
# Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte Sheet-Zeilennummer)
for i in range(start_sheet_row, end_sheet_row + 1):
row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste
# Pruefen Sie, ob das Ende des Sheets erreicht wurde
if row_index_in_list >= total_sheet_rows: break # Ende des Sheets erreicht
row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile
# Stellen Sie sicher, dass die Zeile nicht leer ist (mindestens Name vorhanden)
# Nutzt interne Helfer _get_cell_value_safe
company_name = self._get_cell_value_safe(row, "CRM Name").strip() # Block 1 Column Map
if not company_name:
self.logger.debug(f"Zeile {i}: Uebersprungen (Kein Firmenname in Spalte B).") # <<< GEÄNDERT
skipped_count += 1 # Zaehlen als uebersprungen
continue # Springe zur naechsten Zeile
# --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist ---
# Kriterium: Wiki Verif. Timestamp (AX) ist leer
# UND Wiki URL (M) ist gefuellt und gueltig aussehend (nicht k.A., Fehler etc.)
# UND Status S ist NICHT bereits in einem Endzustand (OK, X (UPDATED/COPIED/INVALID)).
# Holen Sie die Werte aus den entsprechenden Spalten (nutzt interne Helfer)
ax_value = self._get_cell_value_safe(row, "Wiki Verif. Timestamp").strip() # Block 1 Column Map
m_value = self._get_cell_value_safe(row, "Wiki URL").strip() # Block 1 Column Map
s_value_upper = self._get_cell_value_safe(row, "Chat Wiki Konsistenzpruefung").strip().upper() # Block 1 Column Map
# Pruefen Sie, ob die Wiki URL (M) gueltig aussieht
is_wiki_url_valid_looking = m_value and isinstance(m_value, str) and "wikipedia.org/wiki/" in m_value.lower() and m_value.lower() not in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"] # Fuege "http:" hinzu basierend auf Log
# Definieren Sie die Endzustaende von Status S (Grossbuchstaben)
s_end_states = ["OK", "X (UPDATED)", "X (URL COPIED)", "X (INVALID SUGGESTION)"]
# Pruefen Sie, ob Status S in einem Endzustand ist
is_s_in_endstate = s_value_upper in s_end_states # Bugfix: Korrekte Zuweisung
# Verarbeitung ist noetig, wenn AX leer UND M gefuellt/gueltig aussieht UND S NICHT im Endzustand ist.
processing_needed_for_row = not ax_value and is_wiki_url_valid_looking and not is_s_in_endstate
# Loggen der Pruefergebnisse fuer diese Zeile auf Debug-Level
log_check = (i < start_sheet_row + 5) or (i % 100 == 0) or (processing_needed_for_row)
if log_check:
self.logger.debug(f"Zeile {i} ({company_name[:50]}... Wiki Verif. Check): AX leer? {not ax_value}, M gueltig? {is_wiki_url_valid_looking}, S ('{s_value_upper}') Endzustand? {is_s_in_endstate}. Benötigt Verarbeitung? {processing_needed_for_row}") # <<< GEÄNDERT
# Wenn die Verarbeitung fuer diese Zeile nicht noetig ist
if not processing_needed_for_row:
skipped_count += 1 # Zaehlen als uebersprungene Zeile
# Zaehlen Sie separat, wenn die Zeile speziell wegen fehlender M-URL uebersprungen wurde
if not is_wiki_url_valid_looking: skipped_no_wiki_url += 1
continue # Springe zur naechsten Zeile
# --- Wenn Verarbeitung noetig: Fuege zur Batch-Liste fuer OpenAI hinzu ---
processed_count += 1 # Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im Rahmen des Limits zaehlen)
# Pruefe das Limit fuer verarbeitete Zeilen
if limit is not None and isinstance(limit, int) and limit > 0 and processed_count > limit:
# Wenn das Limit erreicht ist und es ein positives Limit gibt
self.logger.info(f"Verarbeitungslimit ({limit}) fuer process_verification_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT
break # Brich die Schleife ab
# Sammle die benoetigten Daten fuer den OpenAI Prompt (Block 26 - _process_verification_openai_batch).
# Diese Daten werden in einem Dictionary fuer den Batch gesammelt.
crm_desc = self._get_cell_value_safe(row, "CRM Beschreibung") # Block 1 Column Map
wiki_paragraph = self._get_cell_value_safe(row, "Wiki Absatz") # Block 1 Column Map
wiki_categories = self._get_cell_value_safe(row, "Wiki Kategorien") # Block 1 Column Map
# Fuege die Daten dieser Zeile zur aktuellen Batch-Liste fuer OpenAI hinzu
current_openai_batch_data.append({
'row_num': i, # Die 1-basierte Sheet-Zeilennummer
'company_name': company_name, # Nutzt den initial geladenen Namen
'crm_desc': crm_desc,
'wiki_url': m_value, # Nutzt die M-URL aus dem Sheet
'wiki_paragraph': wiki_paragraph,
'wiki_categories': wiki_categories
})
# Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu
rows_in_current_openai_batch.append(i)
# --- Verarbeite den Batch, wenn voll ---
# Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat.
# openai_batch_size wird aus Config geholt (Block 1).
if len(current_openai_batch_data) >= openai_batch_size:
# Logge den Start der Batch-Verarbeitung
batch_start_row = current_openai_batch_data[0]['row_num']
batch_end_row = current_openai_batch_data[-1]['row_num']
self.logger.debug(f"\n--- Starte Wiki-Verifizierungs-Batch ({len(current_openai_batch_data)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
# Rufe die interne Methode auf, die den OpenAI Call fuer den Batch macht.
# _process_verification_openai_batch (Block 26) ist mit retry_on_failure dekoriert.
# Wenn _process_verification_openai_batch eine Exception wirft (nach Retries), wird diese hier gefangen.
batch_results = self._process_verification_openai_batch(current_openai_batch_data)
# Ergebnisse sollten ein Dictionary {row_num: raw_chatgpt_answer} sein, auch bei Fehlern.
# Sammle Sheet Updates basierend auf den Batch-Ergebnissen.
# Setze immer den Timestamp AX und die Werte in S, T, U und V-Y.
# Der aktuelle Zeitstempel fuer den Batch
current_batch_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen
# Iteriere ueber die Zeilennummern, die in DIESEM OpenAI Batch waren
for row_num in rows_in_current_openai_batch:
# Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary.
# Fallback auf einen Fehlerstring, wenn das Ergebnis fehlt (sollte nicht passieren, wenn _process_verification_openai_batch korrekt ist).
answer = batch_results.get(row_num, "FEHLER: Batch-Ergebnis fehlt")
# self.logger.debug(f"Zeile {row_num} Verifizierungsantwort: '{answer[:100]}...'") # Zu viel Laerm (gekuerzt)
# Logik zur Bestimmung der Werte fuer S, T, U basierend auf 'answer' (aehnlich wie in altem _process_batch)
wiki_confirm, alt_article, wiki_explanation = "", "", "" # Initialisiere mit leeren Strings
# Pruefe auf Standard-Antworten und Fehler-Antworten
if isinstance(answer, str) and answer.upper() == "OK":
wiki_confirm = "OK"
wiki_explanation = "Passt laut KI zur Firma." # Standard Begruendung bei OK
elif isinstance(answer, str) and answer.startswith("X |"):
# Parse die Antwort im Format "X | <Detail> | <Begruendung>"
parts = answer.split("|", 2) # Teile maximal in 3 Teile
wiki_confirm = "X" # Status ist X
if len(parts) > 1:
detail = parts[1].strip() # Zweiter Teil ist Detail (Alternative URL oder "Kein passender Artikel gefunden")
if detail.lower().startswith("alternativer artikel:"):
alt_article = detail.split(":", 1)[1].strip() # Extrahiere URL
elif detail.lower() == "kein passender artikel gefunden":
alt_article = detail # Text "Kein passender Artikel gefunden"
else:
alt_article = detail # Unbekanntes Detail
if len(parts) > 2:
reason_part = parts[2].strip() # Dritter Teil ist Begruendung
if reason_part.lower().startswith("begruendung:"):
wiki_explanation = reason_part.split(":", 1)[1].strip() # Extrahiere Begruendungstext
else:
wiki_explanation = reason_part # Unbekannte Begruendung
# Fuege ggf. den rohen Antworttext zur Begruendung hinzu, wenn Parsing unvollstaendig war
if not alt_article or not wiki_explanation:
wiki_explanation += f" (Rohantwort: {answer[:100]}...)"
elif isinstance(answer, str) and answer.startswith("FEHLER"):
# Wenn die Batch-Verarbeitung einen Fehler zurueckgegeben hat
wiki_confirm = "FEHLER"
wiki_explanation = answer # Fehlermeldung in Begruendung schreiben
alt_article = "Siehe Begruendung" # Verweis auf Begruendung
else: # Unerwartetes Format der Antwort (weder OK noch X | noch FEHLER)
wiki_confirm = "?" # Setze Status auf unbekannt
wiki_explanation = f"Unerwartetes Format: {str(answer)[:100]}..." # Speichere Anfang der Antwort in Begruendung (gekuerzt)
alt_article = "Siehe Begruendung" # Verweis auf Begruendung
# Spalten V-Y (Begruendung bei Abweichung etc.) werden in diesem Modus geleert
# Fuer jede Zeile im Batch fuegen wir das Update hinzu.
# empty_vy_values wurde oben vorbereitet.
v_y_values = empty_vy_values # Liste von leeren Strings
# Fuege Update zum Leeren von V-Y hinzu, falls Index gefunden wurde
if v_y_range_letter: # Pruefe, ob der Bereichsname ermittelt werden konnte
batch_sheet_updates.append({'range': f'{v_letter}{row_num}:{y_letter}{row_num}', 'values': [v_y_values]}) # Block 1 Column Map, interne Helfer
# Fuege Updates fuer S, T, U und AX hinzu (nutzt interne Helfer)
batch_sheet_updates.append({'range': f'{s_letter}{row_num}', 'values': [[wiki_confirm]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{t_letter}{row_num}', 'values': [[alt_article]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{u_letter}{row_num}', 'values': [[wiki_explanation]]}) # Block 1 Column Map
# Setze AX Timestamp fuer diese Zeile
batch_sheet_updates.append({'range': f'{ts_ax_letter}{row_num}', 'values': [[current_batch_timestamp]]}) # Block 1 Column Map
# --- Sende gesammelte Updates fuer diesen Batch ---
# Sammle die Updates fuer diesen Batch in der globalen Liste.
# all_sheet_updates.extend(batch_sheet_updates) # Nicht hier sammeln, sondern direkt senden
# Sende die gesammelten Updates fuer DIESEN Batch sofort.
if batch_sheet_updates:
self.logger.debug(f" Sende Sheet-Update fuer {len(rows_in_current_openai_batch)} Zeilen ({len(batch_sheet_updates)} Zellen) dieses Batches...") # <<< GEÄNDERT
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry.
# Wenn es fehlschlaegt, wird es intern geloggt.
success = self.sheet_handler.batch_update_cells(batch_sheet_updates)
if success:
self.logger.info(f" Sheet-Update fuer Wiki-Verifizierungs-Batch Zeilen {batch_start_row}-{batch_end_row} erfolgreich.") # <<< GEÄNDERT
# Der Fehlerfall wird von batch_update_cells geloggt
# Setze Batch-Listen zurueck fuer die naechste Iteration
current_openai_batch_data = []
rows_in_current_openai_batch = []
# Pause nach jedem OpenAI Batch (nutzt Config Block 1).
# Dies ist wichtig, um Rate Limits zu vermeiden.
# Nutze Config.RETRY_DELAY, ggf. kuerzer, da es ein Batch war
pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.5 # 50% der Retry-Wartezeit
self.logger.debug(f"--- Batch Zeilen {batch_start_row}-{batch_end_row} abgeschlossen. Warte {pause_duration:.2f}s vor naechstem Batch ---") # <<< GEÄNDERT
time.sleep(pause_duration)
# --- Verarbeitung des letzten unvollstaendigen Batches nach der Schleife ---
# Wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind
if current_openai_batch_data:
# Logge den Start des finalen Batches
batch_start_row = current_openai_batch_data[0]['row_num']
batch_end_row = current_openai_batch_data[-1]['row_num']
self.logger.debug(f"\n--- Starte FINALEN Wiki-Verifizierungs-Batch ({len(current_openai_batch_data)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
# Rufe die interne Methode auf, die den OpenAI Call macht
batch_results = self._process_verification_openai_batch(current_openai_batch_data)
# Ergebnisse sollten ein Dictionary {row_num: raw_chatgpt_answer} sein, auch bei Fehlern.
# Sammle Sheet Updates (S, T, U, V-Y, AX) fuer diesen finalen Batch
current_batch_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen
# Iteriere ueber die Zeilennummern, die in DIESEM finalen OpenAI Batch waren
for row_num in rows_in_current_openai_batch:
# Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary.
answer = batch_results.get(row_num, "FEHLER: Batch-Ergebnis fehlt") # Fallback
# Logik zur Bestimmung der Werte fuer S, T, U basierend auf 'answer'
wiki_confirm, alt_article, wiki_explanation = "", "", ""
# Leere V-Y Spalten
v_y_values = empty_vy_values # Liste von leeren Strings
if isinstance(answer, str) and answer.upper() == "OK": wiki_confirm = "OK"; wiki_explanation = "Passt laut KI zur Firma."
elif isinstance(answer, str) and answer.startswith("X |"):
parts = answer.split("|", 2); wiki_confirm = "X"
if len(parts) > 1: detail = parts[1].strip(); alt_article = detail.split(":", 1)[1].strip() if detail.lower().startswith("alternativer artikel:") else detail
if len(parts) > 2: reason_part = parts[2].strip(); wiki_explanation = reason_part.split(":", 1)[1].strip() if reason_part.lower().startswith("begruendung:") else reason_part
if not alt_article or not wiki_explanation: wiki_explanation += f" (Rohantwort: {answer[:100]}...)"
elif isinstance(answer, str) and answer.startswith("FEHLER"): wiki_confirm = "FEHLER"; wiki_explanation = answer; alt_article = "Siehe Begruendung"
else: wiki_confirm = "?"; wiki_explanation = f"Unerwartetes Format: {str(answer)[:100]}..."; alt_article = "Siehe Begruendung"
# Fuege Updates fuer S, T, U und AX hinzu
batch_sheet_updates.append({'range': f'{s_letter}{row_num}', 'values': [[wiki_confirm]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{t_letter}{row_num}', 'values': [[alt_article]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{u_letter}{row_num}', 'values': [[wiki_explanation]]}) # Block 1 Column Map
# Setze AX Timestamp
batch_sheet_updates.append({'range': f'{ts_ax_letter}{row_num}', 'values': [[current_batch_timestamp]]}) # Block 1 Column Map
# Fuege Update zum Leeren von V-Y hinzu, falls Index gefunden wurde
if v_y_range_letter:
batch_sheet_updates.append({'range': f'{v_letter}{row_num}:{y_letter}{row_num}', 'values': [v_y_values]}) # Block 1 Column Map, interne Helfer
# Sende die gesammelten Updates fuer DIESEN finalen Batch.
if batch_sheet_updates:
self.logger.debug(f" Sende FINALES Sheet-Update fuer {len(rows_in_current_openai_batch)} Zeilen ({len(batch_sheet_updates)} Zellen)...") # <<< GEÄNDERT
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry.
success = self.sheet_handler.batch_update_cells(batch_sheet_updates)
if success:
self.logger.info(f" FINALES Sheet-Update fuer Wiki-Verifizierungs-Batch erfolgreich.") # <<< GEÄNDERT
# Der Fehlerfall wird von batch_update_cells geloggt
# Logge den Abschluss des Modus
self.logger.info(f"Wikipedia-Verifizierungs-Batch abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen ({skipped_no_wiki_url} wegen fehlender M-URL).") # <<< GEÄNDERT
# Keine Pause nach diesem Modus noetig, da die naechste Aktion im Dispatcher (Block 34) folgt Kürze übersprungen. Bitte aus Originalcode übernehmen.")
def _scrape_raw_text_task(self, task_info, get_website_raw_func):
"""
Scrapt den Rohtext einer Website in einem separaten Thread.
"""
logger = logging.getLogger(__name__ + ".scrape_worker")
row_num, url = task_info['row_num'], task_info['url']
raw_text, error = "k.A.", None
try:
raw_text = get_website_raw_func(url)
if isinstance(raw_text, str) and (raw_text.startswith("k.A. (Fehler") or raw_text.startswith("FEHLER:")):
error = f"Scraping Fehler: {raw_text[:100]}..."
elif not isinstance(raw_text, str) or not raw_text.strip():
error = "Scraping Task Fehler: Funktion gab keinen gueltigen String zurueck."
raw_text = "k.A. (Extraktion fehlgeschlagen)"
except Exception as e:
error = f"Unerwarteter Fehler im Scraping Task Zeile {row_num}: {e}"
logger.error(error)
raw_text = "k.A. (Unerwarteter Fehler Task)"
return {"row_num": row_num, "raw_text": raw_text, "error": error}
def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None):
"""
Batch-Prozess NUR fuer Website-Scraping (Rohtext AR).
Laedt Daten neu, prueft Spalte AR auf Inhalt ('', 'k.A.', etc.) und ueberspringt Zeilen mit Inhalt.
Setzt AR + AT + AP fuer bearbeitete Zeilen. Sendet Updates gebuendelt.
Args:
start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AT).
end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet).
limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt).
"""
# Verwenden Sie logger, da das Logging jetzt konfiguriert ist
# Logge die Konfiguration des Batch-Laufs
self.logger.info(f"Starte Website-Scraping (Batch AR, AT, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT
# --- Daten laden und Startzeile ermitteln ---
# Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt
if start_sheet_row is None:
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren AT...") # <<< GEÄNDERT
# Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AT (Block 1 Column Map).
# Standardmaessig ab Zeile 7
start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key="Website Scrape Timestamp", min_sheet_row=7)
# Wenn get_start_row_index -1 zurueckgibt (Fehler)
if start_data_index_no_header == -1:
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT
return # Beende die Methode
# Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten Daten-Index
start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut
self.logger.info(f"Automatisch ermittelte Startzeile (erste leere AT Zelle): {start_sheet_row}") # <<< GEÄNDERT
else:
# Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein.
# Der load_data Aufruf ist mit retry_on_failure dekoriert (Block 2).
if not self.sheet_handler.load_data():
self.logger.error("FEHLER beim Laden der Daten fuer process_website_scraping_batch.") # <<< GEÄNDERT
return # Beende die Methode, wenn das Laden fehlschlaegt
# Holen Sie die gesamte Datenliste (inklusive Header) aus dem SheetHandler.
all_data = self.sheet_handler.get_all_data_with_headers()
# Annahme: header_rows ist als Attribut im SheetHandler verfuegbar (Block 14).
header_rows = self.sheet_handler._header_rows
total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet
# Berechne Endzeile, wenn nicht manuell gesetzt
if end_sheet_row is None:
end_sheet_row = total_sheet_rows # Bis zur letzten Zeile
# Logge den verarbeitungsbereich
self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT
# Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht ueber Gesamtzeilen)
if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows:
self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT
return # Beende die Methode, wenn der Bereich leer ist
# --- Indizes und Buchstaben ---
# Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP (Block 1) vorhanden sind
required_keys = [
"Website Rohtext", "CRM Website", "Version", "Website Scrape Timestamp", "CRM Name" # AR, D, AP, AT, B
]
# Erstellen Sie ein Dictionary mit Schluesseln und Indizes
col_indices = {key: COLUMN_MAP.get(key) for key in required_keys}
# Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden wurden
if None in col_indices.values():
missing = [k for k, v in col_indices.items() if v is None]
self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_website_scraping_batch: {missing}. Breche ab.") # <<< GEÄNDERT
return # Beende die Methode bei kritischem Fehler
# Ermitteln Sie die Indizes und Buchstaben fuer Updates (AR, AT, AP)
rohtext_col_idx = col_indices["Website Rohtext"]
website_col_idx = col_indices["CRM Website"]
version_col_idx = col_indices["Version"]
timestamp_col_idx = col_indices["Website Scrape Timestamp"]
name_col_idx = col_indices["CRM Name"]
rohtext_col_letter = self.sheet_handler._get_col_letter(rohtext_col_idx + 1) # Block 14 _get_col_letter
version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1)
timestamp_col_letter = self.sheet_handler._get_col_letter(timestamp_col_idx + 1)
# --- Hauptlogik: Iteriere und sammle Batches ---
# Holen Sie die Batch-Groesse fuer Verarbeitung (Threading) aus Config (Block 1)
processing_batch_size = getattr(Config, 'PROCESSING_BATCH_SIZE', 20)
# Holen Sie die maximale Anzahl Worker aus Config (Block 1)
max_scraping_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10)
# Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1)
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50)
tasks_for_processing_batch = [] # Tasks fuer den aktuellen Scraping-Batch (Liste von Dicts)
rows_in_current_scraping_batch = [] # 1-basierte Zeilennummern im aktuellen Batch
all_sheet_updates = [] # Gesammelte Updates fuer Batch-Schreiben ins Sheet (Liste von Dicts)
processed_count = 0 # Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den Batch aufgenommen werden (im Rahmen des Limits).
skipped_count = 0 # Zaehlt Zeilen, die uebersprungen wurden (wegen Inhalt oder fehlender URL).
skipped_no_url = 0 # Zaehlt Zeilen, die speziell wegen fehlender URL uebersprungen wurden.
# Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte Sheet-Zeilennummer)
for i in range(start_sheet_row, end_sheet_row + 1):
row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste
# Pruefen Sie, ob das Ende des Sheets erreicht wurde
if row_index_in_list >= total_sheet_rows: break # Ende des Sheets erreicht
row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile
# Stellen Sie sicher, dass die Zeile nicht leer ist
if not any(cell and isinstance(cell, str) and cell.strip() for cell in row):
#self.logger.debug(f"Zeile {i}: Uebersprungen (Leere Zeile).") # Zu viel Laerm im Debug
skipped_count += 1 # Zaehlen als uebersprungen
continue # Springe zur naechsten Zeile
# --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist ---
# Kriterium: Website Rohtext (AR) ist leer oder ein Standard-Fehlerwert.
# UND Website URL (D) ist vorhanden und gueltig aussehend.
# Holen Sie den Wert aus Spalte AR (Website Rohtext) (nutzt interne Helfer _get_cell_value_safe)
cell_value_ar = self._get_cell_value_safe(row, "Website Rohtext") # Block 1 Column Map
# Pruefen Sie, ob AR leer ist oder einen Standard-Fehlerwert enthaelt.
ar_is_empty_or_default = not cell_value_ar or (isinstance(cell_value_ar, str) and str(cell_value_ar).strip().lower() in ["k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"])
# Holen Sie den Wert aus Spalte D (CRM Website) (nutzt interne Helfer _get_cell_value_safe)
website_url = self._get_cell_value_safe(row, "CRM Website").strip() # Block 1 Column Map
# Pruefen Sie, ob die Website URL (D) vorhanden und gueltig aussehend ist.
website_url_is_valid_looking = website_url and isinstance(website_url, str) and website_url.lower() not in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"] # Fuege "http:" hinzu basierend auf Log
# Verarbeitung ist noetig, wenn AR leer/default ist UND D gefuellt/gueltig aussieht.
processing_needed_for_row = ar_is_empty_or_default and website_url_is_valid_looking
# Loggen der Pruefergebnisse fuer diese Zeile auf Debug-Level
log_check = (i < start_sheet_row + 5) or (i % 100 == 0) or (processing_needed_for_row)
if log_check:
company_name = self._get_cell_value_safe(row, "CRM Name").strip() # Block 1 Column Map
self.logger.debug(f"Zeile {i} ({company_name[:50]}... Website Scraping Check): AR leer/default? {ar_is_empty_or_default}, D gueltig? {website_url_is_valid_looking}. Benötigt Verarbeitung? {processing_needed_for_row}") # <<< GEÄNDERT
# Wenn die Verarbeitung fuer diese Zeile nicht noetig ist
if not processing_needed_for_row:
skipped_count += 1 # Zaehlen als uebersprungene Zeile
# Zaehlen Sie speziell, wenn die Zeile wegen fehlender gueltiger URL uebersprungen wurde.
if not website_url_is_valid_looking: skipped_no_url += 1
continue # Springe zur naechsten Zeile
# --- Wenn Verarbeitung noetig: Fuege zur Batch-Liste hinzu ---
processed_count += 1 # Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im Rahmen des Limits zaehlen)
# Pruefe das Limit fuer verarbeitete Zeilen
if limit is not None and isinstance(limit, int) and limit > 0 and processed_count > limit:
# Wenn das Limit erreicht ist und es ein positives Limit gibt
self.logger.info(f"Verarbeitungslimit ({limit}) fuer process_website_scraping_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT
break # Brich die Schleife ab
# Fuege die benoetigten Daten fuer den Task hinzu (Zeilennummer und URL)
tasks_for_processing_batch.append({"row_num": i, "url": website_url})
# Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu
rows_in_current_scraping_batch.append(i)
# --- Verarbeite den Batch, wenn voll ---
# Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat.
# scraping_batch_size wird aus Config geholt (Block 1).
if len(tasks_for_processing_batch) >= processing_batch_size:
# Logge den Start der Batch-Verarbeitung
batch_start_row = tasks_for_processing_batch[0]['row_num']
batch_end_row = tasks_for_processing_batch[-1]['row_num']
self.logger.debug(f"\n--- Starte Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
scraping_results = {} # Dictionary zum Speichern der Ergebnisse {row_num: raw_text}
batch_error_count = 0 # Fehlerzaehler fuer diesen spezifischen Batch
self.logger.debug(f" Scrape {len(tasks_for_processing_batch)} Websites parallel (max {max_scraping_workers} worker)...") # <<< GEÄNDERT
# Nutzt concurrent.futures.ThreadPoolExecutor fuer paralleles Scraping.
# max_workers wird aus Config geholt (Block 1).
with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
# Map tasks to futures. Ruft die INTERNE Worker-Funktion auf.
# Uebergibt das task_info Dictionary und die globale Funktion get_website_raw (Block 11) als Argument.
future_to_task = {executor.submit(self._scrape_raw_text_task, task, get_website_raw): task for task in tasks_for_processing_batch} # <<< Korrigiert: interne Methode
# Verarbeite die Ergebnisse, sobald sie fertig sind.
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future] # Holen Sie die urspruenglichen Task-Daten (Dict)
try:
# Holen Sie das Ergebnis vom Future. Wenn die Worker-Funktion eine Exception wirft, wird diese hier gefangen.
result = future.result() # Ergebnis ist ein Dictionary {'row_num': ..., 'raw_text': ..., 'error': ...}
# Speichere das Ergebnis im scraping_results Dictionary
scraping_results[result['row_num']] = result['raw_text']
# Wenn der Worker einen Fehler gemeldet hat (z.B. durch Fehlerstring im raw_text oder error-Feld)
if result.get('error'):
batch_error_count += 1 # Erhoehe den Fehlerzaehler fuer diesen Batch
except Exception as exc:
# Dieser Block faengt unerwartete Fehler ab, die waehrend der Future-Ergebnis-Abfrage auftreten.
# Die meisten Fehler sollten von get_website_raws retry/logging behandelt werden.
row_num = task['row_num'] # Zeilennummer aus den Task-Daten
err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage Scraping Task Zeile {row_num} ({task['url'][:100]}): {type(exc).__name__} - {exc}" # Gekuerzt loggen
self.logger.error(err_msg) # <<< GEÄNDERT
# Setze einen Standard-Fehlerwert fuer diese Zeile im Ergebnis
scraping_results[row_num] = "k.A. (Unerwarteter Fehler Task)"
batch_error_count += 1 # Erhoehe den Fehlerzaehler
self.logger.debug(f" Scraping fuer Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") # <<< GEÄNDERT
# Sammle Sheet Updates (AR, AT, AP) fuer diesen Batch.
# Dies geschieht jetzt nach der parallelen Verarbeitung.
if scraping_results:
# Aktueller Zeitstempel und Version
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut
batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen
# Iteriere ueber die Zeilennummern im Batch, fuer die Ergebnisse vorliegen.
# Ergebnisse koennen Fehlerwerte enthalten.
for row_num, raw_text_res in scraping_results.items():
# Fuege Updates fuer AR, AT und AP hinzu (nutzt interne Helfer)
# AR: Roh extrahierter Text (kann auch Fehlerwert sein)
batch_sheet_updates.append({'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]}) # Block 1 Column Map
# AT: Timestamp des Scraping-Versuchs (immer setzen, wenn versucht wurde)
batch_sheet_updates.append({'range': f'{timestamp_col_letter}{row_num}', 'values': [[current_timestamp]]}) # Block 1 Column Map
# AP: Version des Skripts
batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map
# Sammle diese Batch-Updates fuer das groessere Batch-Update am Ende oder bei Limit.
# update_batch_row_limit wird aus Config geholt (Block 1).
all_sheet_updates.extend(batch_sheet_updates)
# Leere den Scraping-Batch fuer die naechste Iteration
tasks_for_processing_batch = []
rows_in_current_scraping_batch = []
# Sende gesammelte Sheet Updates, wenn das Update-Batch-Limit erreicht ist.
# Updates pro Zeile sind 3 (AR, AT, AP). Anzahl der Zeilen = len(all_sheet_updates) / 3.
rows_in_update_batch = len(all_sheet_updates) // 3 # Ganzzahl-Division
if rows_in_update_batch >= update_batch_row_limit:
self.logger.debug(f" Sende gesammelte Sheet-Updates ({rows_in_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry.
# Wenn es fehlschlaegt, wird es intern geloggt.
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
if success:
self.logger.info(f" Sheet-Update fuer {rows_in_update_batch} Zeilen erfolgreich.") # <<< GEÄNDERT
# Der Fehlerfall wird von batch_update_cells geloggt
# Leere die gesammelten Updates nach dem Senden.
all_sheet_updates = []
# rows_in_update_batch muss nicht explizit zurueckgesetzt werden, da es aus len(all_sheet_updates) berechnet wird.
# Keine Pause hier nach jedem kleinen Scraping-Batch, da wir auf batch_update warten.
# Die Pause kommt erst nach dem Batch-Update (oder am Ende des Modus).
# time.sleep(0.1) # Optionale kurze Pause
# --- Verarbeitung des letzten unvollstaendigen Scraping-Batches nach der Schleife ---
# Fuehre den letzten Batch aus, wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind.
if tasks_for_processing_batch:
# Logge den Start des finalen Batches
batch_start_row = tasks_for_processing_batch[0]['row_num']
batch_end_row = tasks_for_processing_batch[-1]['row_num']
self.logger.debug(f"\n--- Starte FINALEN Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
scraping_results = {} # Dictionary fuer die Ergebnisse
batch_error_count = 0 # Fehlerzaehler
self.logger.debug(f" Scrape {len(tasks_for_processing_batch)} Websites parallel (max {max_scraping_workers} worker)...") # <<< GEÄNDERT
with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
# Map tasks to futures. Ruft die INTERNE Worker-Funktion auf.
future_to_task = {executor.submit(self._scrape_raw_text_task, task, get_website_raw): task for task in tasks_for_processing_batch} # <<< Korrigiert: interne Methode
# Verarbeite die Ergebnisse
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future] # Holen Sie die urspruenglichen Task-Daten
try:
result = future.result() # Holen Sie das Ergebnis
scraping_results[result['row_num']] = result['raw_text']
# Pruefe, ob der Worker einen Fehler gemeldet hat
if result.get('error'): batch_error_count += 1
except Exception as exc:
# Faengt unerwartete Fehler bei der Ergebnisabfrage ab
row_num = task['row_num']
err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage Scraping Task Zeile {row_num} ({task['url'][:100]}): {type(exc).__name__} - {exc}" # Gekuerzt loggen
self.logger.error(err_msg) # <<< GEÄNDERT
# Setze einen Standard-Fehlerwert
scraping_results[row_num] = "k.A. (Unerwarteter Fehler Task)"
batch_error_count += 1
self.logger.debug(f" FINALER Scraping Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler).") # <<< GEÄNDERT
# Sammle Sheet Updates (AR, AT, AP) fuer diesen finalen Batch.
if scraping_results:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut
batch_sheet_updates = [] # Updates fuer diesen spezifischen Batch
# Iteriere ueber die Zeilennummern im Batch, fuer die Ergebnisse vorliegen.
for row_num, raw_text_res in scraping_results.items():
# Fuege Updates fuer AR, AT und AP hinzu
batch_sheet_updates.append({'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{timestamp_col_letter}{row_num}', 'values': [[current_timestamp]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map
# Fuege diese Updates zur globalen Liste hinzu (wird dann nur noch einmal gesendet)
all_sheet_updates.extend(batch_sheet_updates)
# --- Finale Sheet Updates senden ---
# Sende alle verbleibenden gesammelten Updates in einem letzten Batch-Update.
if all_sheet_updates:
rows_in_final_update_batch = len(all_sheet_updates) // 3 # Updates pro Zeile ist 3
self.logger.info(f"Sende FINALE gesammelte Sheet-Updates ({rows_in_final_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry.
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
if success:
self.logger.info(f"FINALES Sheet-Update erfolgreich.") # <<< GEÄNDERT
# Der Fehlerfall wird von batch_update_cells geloggt
# Logge den Abschluss des Modus
self.logger.info(f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen.") # <<< GEÄNDERT
# Keine Pause nach diesem Modus noetig, da die naechste Aktion im Dispatcher (Block 34) folgt.
def process_summarization_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None):
"""
Batch-Prozess NUR fuer Website-Zusammenfassung (AS).
Laedt Daten neu, prueft, ob Rohtext (AR) vorhanden und Zusammenfassung (AS) fehlt.
Fasst Rohtexte im Batch ueber OpenAI zusammen und setzt AS + AP.
Args:
start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AS).
end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet).
limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt).
"""
# Verwenden Sie logger, da das Logging jetzt konfiguriert ist
# Logge die Konfiguration des Batch-Laufs
self.logger.info(f"Starte Website-Zusammenfassung (Batch AS, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT
# --- Daten laden und Startzeile ermitteln ---
# Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt
if start_sheet_row is None:
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren AS...") # <<< GEÄNDERT
# Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AS (Block 1 Column Map).
# Standardmaessig ab Zeile 7
start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key="Website Zusammenfassung", min_sheet_row=7)
# Wenn get_start_row_index -1 zurueckgibt (Fehler)
if start_data_index_no_header == -1:
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT
return # Beende die Methode
# Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten Daten-Index
start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut
self.logger.info(f"Automatisch ermittelte Startzeile (erste leere AS Zelle): {start_sheet_row}") # <<< GEÄNDERT
else:
# Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein.
# Der load_data Aufruf ist mit retry_on_failure dekoriert (Block 2).
if not self.sheet_handler.load_data():
self.logger.error("FEHLER beim Laden der Daten fuer process_summarization_batch.") # <<< GEÄNDERT
return # Beende die Methode, wenn das Laden fehlschlaegt
# Holen Sie die gesamte Datenliste (inklusive Header) aus dem SheetHandler.
all_data = self.sheet_handler.get_all_data_with_headers();
# Annahme: header_rows ist als Attribut im SheetHandler verfuegbar (Block 14).
header_rows = self.sheet_handler._header_rows;
total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet
# Berechne Endzeile, wenn nicht manuell gesetzt
if end_sheet_row is None:
end_sheet_row = total_sheet_rows # Bis zur letzten Zeile
# Logge den verarbeitungsbereich
self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT
# Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht ueber Gesamtzeilen)
if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows:
self.logger.info("Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT
return # Beende die Methode, wenn der Bereich leer ist
# --- Indizes und Buchstaben ---
# Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP (Block 1) vorhanden sind
required_keys = [
"Website Rohtext", "Website Zusammenfassung", "Version", "CRM Name" # AR, AS, AP, B
]
# Erstellen Sie ein Dictionary mit Schluesseln und Indizes
col_indices = {key: COLUMN_MAP.get(key) for key in required_keys}
# Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden wurden
if None in col_indices.values():
missing = [k for k, v in col_indices.items() if v is None]
self.logger.critical(f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_summarization_batch: {missing}. Breche ab.") # <<< GEÄNDERT
return # Beende die Methode bei kritischem Fehler
# Ermitteln Sie die Indizes und Buchstaben fuer Updates (AS, AP)
rohtext_col_idx = col_indices["Website Rohtext"]
summary_col_idx = col_indices["Website Zusammenfassung"]
version_col_idx = col_indices["Version"]
name_col_idx = col_indices["CRM Name"] # Benoetigt fuer Logging
summary_col_letter = self.sheet_handler._get_col_letter(summary_col_idx + 1) # Block 14 _get_col_letter
version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1)
# --- Verarbeitung ---
# Holen Sie die Batch-Groesse fuer OpenAI-Aufrufe aus Config (Block 1)
openai_batch_size = getattr(Config, 'OPENAI_BATCH_SIZE_LIMIT', 4)
# Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1)
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50)
tasks_for_openai_batch = [] # Tasks fuer den aktuellen OpenAI Batch (Liste von Dicts)
rows_in_current_openai_batch = [] # 1-basierte Zeilennummern im aktuellen OpenAI Batch
all_sheet_updates = [] # Gesammelte Updates fuer Batch-Schreiben ins Sheet (Liste von Dicts)
processed_count = 0 # Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den Batch aufgenommen werden (im Rahmen des Limits).
skipped_count = 0 # Zaehlt Zeilen, die uebersprungen wurden (wegen fehlendem Rohtext oder vorhandener Zusammenfassung).
# Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte Sheet-Zeilennummer)
for i in range(start_sheet_row, end_sheet_row + 1):
row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste
# Pruefen Sie, ob das Ende des Sheets erreicht wurde
if row_index_in_list >= total_sheet_rows: break # Ende des Sheets erreicht
row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile
# Stellen Sie sicher, dass die Zeile nicht leer ist
if not any(cell and isinstance(cell, str) and cell.strip() for cell in row):
#self.logger.debug(f"Zeile {i}: Uebersprungen (Leere Zeile).") # Zu viel Laerm im Debug
skipped_count += 1 # Zaehlen als uebersprungen
continue # Springe zur naechsten Zeile
# --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist ---
# Kriterium: Website Rohtext (AR) ist vorhanden und gueltig (nicht k.A. oder Fehlerwerte).
# UND Website Zusammenfassung (AS) ist leer oder ein Standard-Fehlerwert.
# Holen Sie den Wert aus Spalte AR (Website Rohtext) (nutzt interne Helfer _get_cell_value_safe)
raw_text = self._get_cell_value_safe(row, "Website Rohtext") # Block 1 Column Map
# Pruefen Sie, ob AR gefuellt und gueltig ist.
raw_text_is_valid = raw_text and isinstance(raw_text, str) and str(raw_text).strip().lower() not in ["k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"]
# Holen Sie den Wert aus Spalte AS (Website Zusammenfassung) (nutzt interne Helfer _get_cell_value_safe)
summary_value = self._get_cell_value_safe(row, "Website Zusammenfassung") # Block 1 Column Map
# Pruefen Sie, ob AS leer ist oder einen Standard-Fehlerwert enthaelt.
summary_is_empty_or_default = not summary_value or (isinstance(summary_value, str) and str(summary_value).strip().lower() in ["k.a.", "k.a. (keine zusammenfassung erhalten)"])
# Verarbeitung ist noetig, wenn AR gueltig ist UND AS leer/default ist.
processing_needed_for_row = raw_text_is_valid and summary_is_empty_or_default
# Loggen der Pruefergebnisse fuer diese Zeile auf Debug-Level
log_check = (i < start_sheet_row + 5) or (i % 100 == 0) or (processing_needed_for_row)
if log_check:
company_name = self._get_cell_value_safe(row, "CRM Name").strip() # Block 1 Column Map
self.logger.debug(f"Zeile {i} ({company_name[:50]}... Website Summarization Check): AR gueltig? {raw_text_is_valid} (len={len(str(raw_text))}), AS leer/default? {summary_is_empty_or_default}. Benötigt Verarbeitung? {processing_needed_for_row}") # <<< GEÄNDERT
# Wenn die Verarbeitung fuer diese Zeile nicht noetig ist
if not processing_needed_for_row:
skipped_count += 1 # Zaehlen als uebersprungene Zeile
continue # Springe zur naechsten Zeile
# --- Wenn Verarbeitung noetig: Fuege zur Batch-Liste fuer OpenAI hinzu ---
processed_count += 1 # Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im Rahmen des Limits zaehlen)
# Pruefe das Limit fuer verarbeitete Zeilen
if limit is not None and isinstance(limit, int) and limit > 0 and processed_count > limit:
# Wenn das Limit erreicht ist und es ein positives Limit gibt
self.logger.info(f"Verarbeitungslimit ({limit}) fuer process_summarization_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT
break # Brich die Schleife ab
# Fuege die benoetigten Daten fuer den OpenAI Batch hinzu (Zeilennummer und Rohtext)
tasks_for_openai_batch.append({'row_num': i, 'raw_text': raw_text})
# Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu
rows_in_current_openai_batch.append(i)
# --- Verarbeite den Batch, wenn voll ---
# Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat.
# openai_batch_size wird aus Config geholt (Block 1).
if len(tasks_for_openai_batch) >= openai_batch_size:
# Logge den Start der Batch-Verarbeitung
batch_start_row = tasks_for_openai_batch[0]['row_num']
batch_end_row = tasks_for_openai_batch[-1]['row_num']
self.logger.debug(f"\n--- Starte Website-Summarization Batch ({len(tasks_for_openai_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
# Rufe die globale Funktion auf, die den OpenAI Call fuer den Batch macht (Block 9).
# summarize_batch_openai ist mit retry_on_failure dekoriert (Block 2).
# Wenn summarize_batch_openai eine Exception wirft (nach Retries), wird diese hier gefangen.
# !!! KORRIGIERTER AUFRUF !!!
try:
# Rufen Sie die korrekte globale Funktion auf
batch_results = summarize_batch_openai(tasks_for_openai_batch) # <<< Korrigierter Aufruf (vorher war fälschlicherweise _process_verification_openai_batch)
# Ergebnisse sollten ein Dictionary {row_num: summary_text} sein, auch bei Fehlern.
# Sammle Sheet Updates (AS, AP) fuer diesen Batch
current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut
batch_sheet_updates = [] # Updates fuer DIESEN spezifischen Batch von Zeilen
# Iteriere ueber die Zeilennummern, die in DIESEM OpenAI Batch waren
for row_num in rows_in_current_openai_batch:
# Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary.
# Fallback auf einen Fehlerstring, wenn das Ergebnis fehlt (sollte nicht passieren, wenn summarize_batch_openai korrekt ist).
summary = batch_results.get(row_num, "k.A. (Batch Ergebnis fehlte)")
# Stelle sicher, dass 'k.A.' bei leeren/kurzen Summaries gesetzt wird
if not summary or (isinstance(summary, str) and summary.strip().lower() in ["k.a.", "k.a. (keine zusammenfassung erhalten)"]):
summary = "k.A. (Keine Zusammenfassung erhalten)"
# Fuege "k.A." oder Fehler an, wenn der Wert von summarize_batch_openai ein Fehlerstring ist
elif isinstance(summary, str) and (summary.startswith("k.A. (Fehler") or summary.startswith("FEHLER:")):
pass # Behalte den Fehlerstring von summarize_batch_openai
# Fuege Updates fuer AS und AP hinzu (nutzt interne Helfer)
batch_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[summary]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map
# Sammle diese Batch-Updates fuer das groessere Batch-Update
all_sheet_updates.extend(batch_sheet_updates)
except Exception as e_openai_batch:
# Wenn summarize_batch_openai eine Exception wirft (nach Retries)
# Der Fehler wird bereits vom retry_on_failure Decorator auf summarize_batch_openai geloggt.
self.logger.error(f"Endgueltiger FEHLER beim OpenAI-Batch-Aufruf fuer Zusammenfassung: {e_openai_batch}") # <<< GEÄNDERT
# Logge den Traceback
self.logger.debug(traceback.format_exc()) # <<< GEÄNDERT
# Fügen Sie Fehlerwerte für alle Zeilen im Batch hinzu
current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut
for row_num in rows_in_current_openai_batch:
error_summary = f"FEHLER OpenAI Batch: {str(e_openai_batch)[:100]}..." # Gekuerzt
# Fuege Updates mit Fehlerwerten fuer AS und AP hinzu
all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[error_summary]]}) # Block 1 Column Map
all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map
# Leere den OpenAI-Batch zurueck
tasks_for_openai_batch = []
rows_in_current_openai_batch = []
# Sende gesammelte Sheet Updates, wenn das Update-Batch-Limit erreicht ist.
# Updates pro Zeile sind 2 (AS, AP). Anzahl der Zeilen = len(all_sheet_updates) / 2.
rows_in_update_batch = len(all_sheet_updates) // 2
if rows_in_update_batch >= update_batch_row_limit:
self.logger.debug(f" Sende gesammelte Sheet-Updates ({rows_in_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry.
# Wenn es fehlschlaegt, wird es intern geloggt.
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
if success:
self.logger.info(f" Sheet-Update fuer {rows_in_update_batch} Zeilen erfolgreich.") # <<< GEÄNDERT
# Der Fehlerfall wird von batch_update_cells geloggt
# Leere die gesammelten Updates nach dem Senden.
all_sheet_updates = []
# Kurze Pause nach jedem OpenAI Batch (nutzt Config Block 1).
# Dies ist wichtig, um Rate Limits zu vermeiden.
pause_duration = getattr(Config, 'RETRY_DELAY', 5) * 0.5 # 50% der Retry-Wartezeit
self.logger.debug(f"Warte {pause_duration:.2f}s vor naechstem Batch...") # <<< GEÄNDERT
time.sleep(pause_duration)
# --- Verarbeitung des letzten unvollstaendigen OpenAI Batches nach der Schleife ---
# Wenn nach der Hauptschleife noch Tasks in der Batch-Liste sind.
if tasks_for_openai_batch: # Korrektur: War vorher `current_openai_batch_data`
# Logge den Start des finalen Batches
batch_start_row = tasks_for_openai_batch[0]['row_num'] # Korrektur: War vorher `current_openai_batch_data`
batch_end_row = tasks_for_openai_batch[-1]['row_num'] # Korrektur: War vorher `current_openai_batch_data`
self.logger.debug(f"\n--- Starte FINALEN Website-Summarization Batch ({len(tasks_for_openai_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
# Rufe die globale Funktion auf, die den OpenAI Call fuer den Batch macht (Block 9).
# summarize_batch_openai ist mit retry_on_failure dekoriert (Block 2).
# Wenn summarize_batch_openai eine Exception wirft (nach Retries), wird diese hier gefangen.
batch_results = None
try:
batch_results = summarize_batch_openai(tasks_for_openai_batch) # <<< Korrekter Aufruf Block 9, Korrektur: War vorher `current_openai_batch_data`
# Ergebnisse sollten ein Dictionary {row_num: summary_text} sein, auch bei Fehlern.
# Sammle Sheet Updates (AS, AP) fuer diesen finalen Batch
current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut
batch_sheet_updates = [] # Updates fuer diesen spezifischen Batch
# Iteriere ueber die Zeilennummern im Batch, fuer die Ergebnisse vorliegen.
for row_num in rows_in_current_openai_batch:
# Hole das Ergebnis fuer diese Zeile aus dem Ergebnis-Dictionary.
summary = batch_results.get(row_num, "k.A. (Batch Ergebnis fehlte)") # Fallback
# Stelle sicher, dass 'k.A.' bei leeren/kurzen Summaries gesetzt wird
if not summary or (isinstance(summary, str) and summary.strip().lower() in ["k.a.", "k.a. (keine zusammenfassung erhalten)"]):
summary = "k.A. (Keine Zusammenfassung erhalten)"
# Fuege "k.A." oder Fehler an, wenn der Wert von summarize_batch_openai ein Fehlerstring ist
elif isinstance(summary, str) and (summary.startswith("k.A. (Fehler") or summary.startswith("FEHLER:")):
pass # Behalte den Fehlerstring von summarize_batch_openai
# Fuege Updates fuer AS und AP hinzu
batch_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[summary]]}) # Block 1 Column Map
batch_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map
# Fuege diese Updates zur globalen Liste hinzu (wird dann nur noch einmal gesendet)
all_sheet_updates.extend(batch_sheet_updates)
except Exception as e_openai_batch:
# Wenn summarize_batch_openai eine Exception wirft (nach Retries)
# Der Fehler wird bereits vom retry_on_failure Decorator auf summarize_batch_openai geloggt.
self.logger.error(f"Endgueltiger FEHLER beim FINALEN OpenAI-Batch-Aufruf fuer Zusammenfassung: {e_openai_batch}") # <<< GEÄNDERT
# Logge den Traceback
self.logger.debug(traceback.format_exc()) # <<< GEÄNDERT
# Fügen Sie Fehlerwerte für alle Zeilen im Batch hinzu
current_version = getattr(Config, 'VERSION', 'unknown') # Block 1 Config Attribut
for row_num in rows_in_current_openai_batch:
error_summary = f"FEHLER OpenAI Batch: {str(e_openai_batch)[:100]}..." # Gekuerzt
# Fuege Updates mit Fehlerwerten fuer AS und AP hinzu
all_sheet_updates.append({'range': f'{summary_col_letter}{row_num}', 'values': [[error_summary]]}) # Block 1 Column Map
all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map
# --- Finale Sheet Updates senden ---
# Sende alle verbleibenden gesammelten Updates in einem letzten Batch-Update.
if all_sheet_updates:
rows_in_final_update_batch = len(all_sheet_updates) // 2 # Updates pro Zeile ist 2
self.logger.info(f"Sende FINALE gesammelte Sheet-Updates ({rows_in_final_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry.
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
if success:
self.logger.info(f"FINALES Sheet-Update erfolgreich.") # <<< GEÄNDERT
# Der Fehlerfall wird von batch_update_cells geloggt
# Logge den Abschluss des Modus
self.logger.info(f"Website-Zusammenfassung (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen.") # <<< GEÄNDERT
# Keine Pause nach diesem Modus noetig, da die naechste Aktion im Dispatcher (Block 34) folgt.