This commit is contained in:
2025-04-22 12:29:03 +00:00
parent 564aef9d20
commit e5741a23c9

View File

@@ -3892,215 +3892,250 @@ class DataProcessor:
Args:
sheet_handler (GoogleSheetHandler): Eine initialisierte Instanz des GoogleSheetHandlers.
"""
# WICHTIG: Stelle sicher, dass sheet_handler übergeben wird und nicht None ist
if sheet_handler is None:
# Kritischer Fehler, da der Handler benötigt wird
logging.critical("DataProcessor Init FEHLER: Kein gültiger sheet_handler übergeben!")
# Hier ist es sinnvoll, einen Fehler zu werfen, um das Problem sofort sichtbar zu machen
raise ValueError("DataProcessor benötigt einen gültigen GoogleSheetHandler.")
self.sheet_handler = sheet_handler
# Erstelle eine Instanz des Scrapers für diesen Prozessor
# Annahme: WikipediaScraper ist importiert
self.wiki_scraper = WikipediaScraper()
# Annahme: WikipediaScraper ist importiert und korrekt
try:
self.wiki_scraper = WikipediaScraper() # Geht davon aus, dass Config etc. verfügbar ist
except NameError:
logging.critical("DataProcessor Init FEHLER: WikipediaScraper Klasse nicht gefunden/importiert!")
raise
except Exception as e:
logging.critical(f"DataProcessor Init FEHLER beim Initialisieren von WikipediaScraper: {e}")
raise
logging.info("DataProcessor initialisiert.")
# Die zentrale Methode zur Verarbeitung einer einzelnen Zeile
# @retry_on_failure
def _process_single_row(self, row_num_in_sheet, row_data,
process_wiki=True, process_chatgpt=True, process_website=True,
force_reeval=False):
logging.info(f"--- Starte Verarbeitung für Zeile {row_num_in_sheet} {'(Re-Eval)' if force_reeval else ''} ---")
updates = []
now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
any_processing_done = False
wiki_data_updated_in_this_run = False
# @retry_on_failure # Retry auf der gesamten Zeile ist riskant
def _process_single_row(self, row_num_in_sheet, row_data,
process_wiki=True, process_chatgpt=True, process_website=True,
force_reeval=False):
"""
Verarbeitet die Daten für eine einzelne Zeile.
Priorisiert Wiki-Artikelsuche/-Validierung VOR Extraktion.
Prüft Timestamps, es sei denn force_reeval=True.
"""
logging.info(f"--- Starte Verarbeitung für Zeile {row_num_in_sheet} {'(Re-Eval)' if force_reeval else ''} ---")
updates = []
now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
any_processing_done = False
wiki_data_updated_in_this_run = False
# ... (Hilfsfunktion get_cell_value und initiale Werte lesen bleiben gleich) ...
company_name = get_cell_value("CRM Name")
website_url = get_cell_value("CRM Website")
crm_branche = get_cell_value("CRM Branche"); crm_beschreibung = get_cell_value("CRM Beschreibung")
konsistenz_s = get_cell_value("Chat Wiki Konsistenzprüfung")
website_raw = get_cell_value("Website Rohtext") or "k.A."
website_summary = get_cell_value("Website Zusammenfassung") or "k.A."
# Hilfsfunktion für sicheren Zellenzugriff
def get_cell_value(key):
# Annahme: COLUMN_MAP ist global verfügbar
idx = COLUMN_MAP.get(key)
if idx is not None and len(row_data) > idx:
# Konvertiere leere Strings explizit zu '' statt None, falls gspread das tut
return row_data[idx] if row_data[idx] is not None else ''
return ""
final_wiki_data = { # Initialisieren
'url': get_cell_value("Wiki URL") or 'k.A.', 'first_paragraph': get_cell_value("Wiki Absatz") or 'k.A.',
'branche': get_cell_value("Wiki Branche") or 'k.A.', 'umsatz': get_cell_value("Wiki Umsatz") or 'k.A.',
'mitarbeiter': get_cell_value("Wiki Mitarbeiter") or 'k.A.', 'categories': get_cell_value("Wiki Kategorien") or 'k.A.'
}
# Lese initiale Werte
company_name = get_cell_value("CRM Name")
website_url = get_cell_value("CRM Website"); original_website = website_url
crm_branche = get_cell_value("CRM Branche"); crm_beschreibung = get_cell_value("CRM Beschreibung")
konsistenz_s = get_cell_value("Chat Wiki Konsistenzprüfung")
website_raw = get_cell_value("Website Rohtext") or "k.A."
website_summary = get_cell_value("Website Zusammenfassung") or "k.A."
# --- 1. Website Handling (bleibt wie in letzter Version, prüft force_reeval or AT fehlt) ---
website_ts_missing = not get_cell_value("Website Scrape Timestamp").strip()
website_processing_needed = process_website and (force_reeval or website_ts_missing)
if website_processing_needed:
# ... (Komplette Website-Logik wie gehabt) ...
any_processing_done = True
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Scrape Timestamp"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
elif process_website:
logging.debug(f"Zeile {row_num_in_sheet}: Überspringe Website (AT vorhanden und kein Re-Eval).")
final_wiki_data = {
'url': get_cell_value("Wiki URL") or 'k.A.',
'first_paragraph': get_cell_value("Wiki Absatz") or 'k.A.',
'branche': get_cell_value("Wiki Branche") or 'k.A.',
'umsatz': get_cell_value("Wiki Umsatz") or 'k.A.',
'mitarbeiter': get_cell_value("Wiki Mitarbeiter") or 'k.A.',
'categories': get_cell_value("Wiki Kategorien") or 'k.A.'
}
# --- 1. Website Handling (Prüft AT oder force_reeval) ---
website_ts_missing = not get_cell_value("Website Scrape Timestamp").strip()
website_processing_needed = process_website and (force_reeval or website_ts_missing)
# --- 2. Wikipedia Verarbeitung (Überarbeitete Logik) ---
wiki_ts_an_missing = not get_cell_value("Wikipedia Timestamp").strip()
status_s_indicates_reparse = konsistenz_s.strip().upper() == "X (URL COPIED)"
wiki_processing_needed = process_wiki and (force_reeval or wiki_ts_an_missing or status_s_indicates_reparse)
if wiki_processing_needed:
any_processing_done = True
logging.info(f"Zeile {row_num_in_sheet}: Starte Wikipedia Verarbeitung (Grund: {'Re-Eval' if force_reeval else f'AN fehlt? {wiki_ts_an_missing}, S=X(Copied)? {status_s_indicates_reparse}'})...")
url_in_m = get_cell_value("Wiki URL").strip()
url_to_extract = None # Die URL, von der WIRKLICH extrahiert wird
# --- NEUE LOGIK: Priorisiere M, suche nur wenn nötig ---
if url_in_m and url_in_m.lower() not in ["k.a.", "kein artikel gefunden"] and url_in_m.lower().startswith("http"):
# Wenn eine URL in M steht: Versuche diese zu verwenden, es sei denn S sagt explizit "neu suchen"
if status_s_indicates_reparse:
logging.warning(f" -> Status S ist 'X (URL Copied)', ignoriere URL '{url_in_m}' in M und starte neue Suche...")
# Führe neue Suche durch
validated_page = self.wiki_scraper.search_company_article(company_name, website_url)
if validated_page:
url_to_extract = validated_page.url
else: # Wenn Suche erfolglos
url_to_extract = None
final_wiki_data = {'url': 'Kein Artikel gefunden', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'}
wiki_data_updated_in_this_run = True # Wird überschrieben
if website_processing_needed:
any_processing_done = True
logging.info(f"Zeile {row_num_in_sheet}: Starte Website Verarbeitung (Grund: {'Re-Eval' if force_reeval else 'AT fehlt'})...")
if not website_url or website_url.strip().lower() == "k.a.":
logging.debug(" -> Suche Website via SERP...")
# Annahme: serp_website_lookup existiert und nutzt logging
new_website = serp_website_lookup(company_name)
if new_website != "k.A.":
website_url = new_website # Wichtig: website_url für Wiki-Validierung aktualisieren!
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["CRM Website"] + 1)}{row_num_in_sheet}', 'values': [[website_url]]})
if website_url and website_url.strip().lower() != "k.a.":
logging.debug(f" -> Scrape Rohtext von {website_url}...")
# Annahme: get_website_raw existiert und nutzt logging
new_website_raw = get_website_raw(website_url)
logging.debug(f" -> Fasse Rohtext zusammen (Länge: {len(str(new_website_raw))})...")
# Annahme: summarize_website_content existiert und nutzt logging
new_website_summary = summarize_website_content(new_website_raw)
website_raw = new_website_raw # Wichtig: globale Variable für ChatGPT aktualisieren
website_summary = new_website_summary # Wichtig: globale Variable für ChatGPT aktualisieren
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Rohtext"] + 1)}{row_num_in_sheet}', 'values': [[website_raw]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Zusammenfassung"] + 1)}{row_num_in_sheet}', 'values': [[website_summary]]})
else:
# Nutze die URL aus M für die Extraktion (keine erneute Validierung hier nötig, da reeval)
logging.info(f" -> Nutze vorhandene URL aus Spalte M für Extraktion: {url_in_m}")
url_to_extract = url_in_m
logging.warning(f" -> Keine gültige Website gefunden/vorhanden für {company_name}.")
website_raw, website_summary = "k.A.", "k.A."
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Rohtext"] + 1)}{row_num_in_sheet}', 'values': [['k.A.']]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Zusammenfassung"] + 1)}{row_num_in_sheet}', 'values': [['k.A.']]})
# Setze AT Timestamp
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Website Scrape Timestamp"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
elif process_website:
logging.debug(f"Zeile {row_num_in_sheet}: Überspringe Website (AT vorhanden und kein Re-Eval).")
# --- 2. Wikipedia Verarbeitung (Überarbeitete Logik) ---
wiki_ts_an_missing = not get_cell_value("Wikipedia Timestamp").strip()
status_s_indicates_reparse = konsistenz_s.strip().upper() == "X (URL COPIED)"
wiki_processing_needed = process_wiki and (force_reeval or wiki_ts_an_missing or status_s_indicates_reparse)
url_in_m = get_cell_value("Wiki URL").strip() # URL, die aktuell in Spalte M steht
if wiki_processing_needed:
any_processing_done = True
logging.info(f"Zeile {row_num_in_sheet}: Starte Wikipedia Artikel Findung/Validierung (Grund: {'Re-Eval' if force_reeval else f'AN fehlt? {wiki_ts_an_missing}, S=X(Copied)? {status_s_indicates_reparse}'})...")
validated_page = None
url_to_extract = None
# --- NEUE LOGIK: Priorisiere M, suche nur wenn nötig ---
if url_in_m and url_in_m.lower() not in ["k.a.", "kein artikel gefunden"] and url_in_m.lower().startswith("http"):
if status_s_indicates_reparse:
logging.warning(f" -> Status S ist 'X (URL Copied)', ignoriere URL '{url_in_m}' in M und starte neue Suche...")
validated_page = self.wiki_scraper.search_company_article(company_name, website_url) # self. hinzufügen
if validated_page:
url_to_extract = validated_page.url
else: # Wenn Suche erfolglos
final_wiki_data = {'url': 'Kein Artikel gefunden', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'}
wiki_data_updated_in_this_run = True
else:
# Bei reeval oder fehlendem AN, nutze die URL aus M, aber validiere sie trotzdem (als Sicherheitscheck)
logging.debug(f" -> Prüfe Validität der vorhandenen URL aus Spalte M: {url_in_m}")
try:
page_from_m = wikipedia.page(url_in_m.split('/wiki/')[-1].replace('_', ' '), auto_suggest=False, preload=True)
# Wichtig: Nutze die aktuelle website_url (könnte sich in Schritt 1 geändert haben)
if self.wiki_scraper._validate_article(page_from_m, company_name, website_url): # self. hinzufügen
url_to_extract = page_from_m.url # Nimm die ggf. weitergeleitete URL
logging.info(f" -> Vorhandene URL aus M '{url_to_extract}' ist valide und wird verwendet.")
else:
# Wenn force_reeval aktiv ist und Validierung fehlschlägt, trotzdem verwenden?
# Oder neue Suche starten? Aktuell: Neue Suche starten.
logging.warning(f" -> Vorhandene URL aus M '{page_from_m.title}' ist NICHT valide. Starte neue Suche...")
validated_page = self.wiki_scraper.search_company_article(company_name, website_url) # self. hinzufügen
if validated_page:
url_to_extract = validated_page.url
else:
final_wiki_data = {'url': 'Kein Artikel gefunden', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'}
wiki_data_updated_in_this_run = True
except wikipedia.exceptions.PageError:
logging.warning(f" -> Seite für vorhandene URL aus M '{url_in_m}' nicht gefunden (PageError). Starte neue Suche...")
validated_page = self.wiki_scraper.search_company_article(company_name, website_url) # self. hinzufügen
if validated_page: url_to_extract = validated_page.url
else: final_wiki_data = {'url': 'Kein Artikel gefunden', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'}; wiki_data_updated_in_this_run = True
except wikipedia.exceptions.DisambiguationError as e_disamb_m:
logging.info(f" -> Vorhandene URL aus M '{url_in_m}' ist eine Begriffsklärung. Starte Suche...")
validated_page = self.wiki_scraper.search_company_article(company_name, website_url) # self. hinzufügen
if validated_page: url_to_extract = validated_page.url
else: final_wiki_data = {'url': 'Kein Artikel gefunden', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'}; wiki_data_updated_in_this_run = True
except Exception as e_val_m:
logging.error(f" -> Fehler beim Prüfen der URL aus M '{url_in_m}': {e_val_m}. Starte neue Suche...")
validated_page = self.wiki_scraper.search_company_article(company_name, website_url) # self. hinzufügen
if validated_page: url_to_extract = validated_page.url
else: final_wiki_data = {'url': 'Kein Artikel gefunden', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'}; wiki_data_updated_in_this_run = True
else:
# Wenn M leer oder 'k.A.' ist, starte neue Suche
logging.info(f" -> Spalte M leer oder 'k.A.'. Starte Wikipedia-Suche für '{company_name}'...")
validated_page = self.wiki_scraper.search_company_article(company_name, website_url) # self. hinzufügen
if validated_page:
url_to_extract = validated_page.url
else: # Wenn Suche erfolglos
final_wiki_data = {'url': 'Kein Artikel gefunden', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'}
wiki_data_updated_in_this_run = True
# --- ENDE NEUE LOGIK ---
# --- Datenextraktion ---
if url_to_extract:
logging.info(f" -> Extrahiere Daten von URL: {url_to_extract}...")
# Verwende die wiki_scraper Instanz der Klasse
extracted_data = self.wiki_scraper.extract_company_data(url_to_extract)
if extracted_data:
final_wiki_data = extracted_data
wiki_data_updated_in_this_run = True
logging.info(f" -> Datenextraktion erfolgreich.")
else:
logging.error(f" -> Fehler bei Datenextraktion von {url_to_extract}. Setze Daten auf 'k.A.'")
final_wiki_data = {'url': url_to_extract, 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'}
wiki_data_updated_in_this_run = True
# --- Sheet Updates für M-R und AN ---
# Schreibe IMMER das Ergebnis von final_wiki_data
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki URL"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('url', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Absatz"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('first_paragraph', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Branche"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('branche', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Umsatz"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('umsatz', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Mitarbeiter"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('mitarbeiter', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Kategorien"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('categories', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wikipedia Timestamp"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
# Setze S zurück, wenn nötig
if status_s_indicates_reparse or force_reeval or (url_in_m != final_wiki_data.get('url')):
s_idx = COLUMN_MAP.get("Chat Wiki Konsistenzprüfung")
if s_idx is not None:
s_let = self.sheet_handler._get_col_letter(s_idx + 1)
updates.append({'range': f'{s_let}{row_num_in_sheet}', 'values': [["?"]]})
logging.info(f" -> Status S zurückgesetzt auf '?' für erneute Verifikation.")
elif process_wiki:
logging.debug(f"Zeile {row_num_in_sheet}: Überspringe Wikipedia Verarbeitung (AN vorhanden, kein S=X(Copied) und kein Re-Eval).")
# --- 3. ChatGPT Evaluationen (Branch etc.) ---
chat_ts_ao_missing = not get_cell_value("Timestamp letzte Prüfung").strip()
run_chat_eval = process_chatgpt and (force_reeval or chat_ts_ao_missing or wiki_data_updated_in_this_run)
if run_chat_eval:
logging.info(f"Zeile {row_num_in_sheet}: Starte ChatGPT Evaluationen (Grund: {'Re-Eval' if force_reeval else f'AO fehlt? {chat_ts_ao_missing}, Wiki gerade aktualisiert? {wiki_data_updated_in_this_run}'})...")
any_processing_done = True
# Annahme: evaluate_branche_chatgpt existiert und nutzt logging
branch_result = evaluate_branche_chatgpt(
crm_branche, crm_beschreibung,
final_wiki_data.get('branche', 'k.A.'),
final_wiki_data.get('categories', 'k.A.'),
website_summary # Kommt aus Schritt 1 oder initialen Werten
)
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get('branch', 'Fehler')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get('consistency', 'Fehler')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begründung Abweichung Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get('justification', 'Fehler')]]})
# --- Hier Platz für weitere ChatGPT-Calls ---
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Prüfung"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
elif process_chatgpt:
logging.debug(f"Zeile {row_num_in_sheet}: Überspringe ChatGPT Evaluationen (AO vorhanden, Wiki nicht gerade aktualisiert und kein Re-Eval).")
# --- 4. Abschließende Updates ---
if any_processing_done:
# Annahme: Config ist verfügbar
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num_in_sheet}', 'values': [[Config.VERSION]]})
# --- 5. Batch Update für diese Zeile ---
if updates:
logging.info(f"Zeile {row_num_in_sheet}: Sende Batch-Update mit {len(updates)} Operationen...")
success = self.sheet_handler.batch_update_cells(updates) # Annahme: nutzt logging
if not success: logging.error(f"Zeile {row_num_in_sheet}: FEHLER beim Batch-Update.")
else:
# Wenn M leer oder 'k.A.' ist, starte neue Suche
logging.info(f" -> Spalte M leer oder 'k.A.'. Starte Wikipedia-Suche für '{company_name}'...")
validated_page = self.wiki_scraper.search_company_article(company_name, website_url)
if validated_page:
url_to_extract = validated_page.url
else: # Wenn Suche erfolglos
url_to_extract = None
final_wiki_data = {'url': 'Kein Artikel gefunden', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'}
wiki_data_updated_in_this_run = True # Wird überschrieben
# --- ENDE NEUE LOGIK ---
if not any_processing_done:
logging.info(f"Zeile {row_num_in_sheet}: Keine Updates zum Schreiben (alle Schritte übersprungen).")
# --- Datenextraktion (nur wenn eine URL zum Extrahieren gefunden/bestimmt wurde) ---
if url_to_extract:
logging.info(f" -> Extrahiere Daten von URL: {url_to_extract}...")
extracted_data = self.wiki_scraper.extract_company_data(url_to_extract)
# Nur wenn die Extraktion erfolgreich war (nicht None zurückgab)
if extracted_data:
final_wiki_data = extracted_data
wiki_data_updated_in_this_run = True
logging.info(f" -> Datenextraktion erfolgreich.")
else:
# Fehler wurde von extract_company_data geloggt
logging.error(f" -> Fehler bei Datenextraktion von {url_to_extract}. Setze Daten auf 'k.A.'")
final_wiki_data = {'url': url_to_extract, 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'}
wiki_data_updated_in_this_run = True # Markieren, dass überschrieben wird
# --- Sheet Updates für M-R und AN ---
# Schreibe IMMER das Ergebnis von final_wiki_data, auch wenn es "k.A." ist
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki URL"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('url', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Absatz"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('first_paragraph', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Branche"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('branche', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Umsatz"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('umsatz', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Mitarbeiter"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('mitarbeiter', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wiki Kategorien"] + 1)}{row_num_in_sheet}', 'values': [[final_wiki_data.get('categories', 'k.A.')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Wikipedia Timestamp"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]}) # Setze AN Timestamp
# Setze S zurück, wenn Trigger 'X(Copied)' war, Re-Eval erzwungen wurde, oder URL sich geändert hat
if status_s_indicates_reparse or force_reeval or (url_in_m != final_wiki_data.get('url')):
s_idx = COLUMN_MAP.get("Chat Wiki Konsistenzprüfung")
if s_idx is not None:
s_let = self.sheet_handler._get_col_letter(s_idx + 1)
updates.append({'range': f'{s_let}{row_num_in_sheet}', 'values': [["?"]]})
logging.info(f" -> Status S zurückgesetzt auf '?' für erneute Verifikation.")
elif process_wiki:
logging.debug(f"Zeile {row_num_in_sheet}: Überspringe Wikipedia Verarbeitung (AN vorhanden, kein S=X(Copied) und kein Re-Eval).")
# final_wiki_data behält die initial gelesenen Werte
# --- 3. ChatGPT Evaluationen (Branch etc.) ---
chat_ts_ao_missing = not get_cell_value("Timestamp letzte Prüfung").strip()
run_chat_eval = process_chatgpt and (force_reeval or chat_ts_ao_missing or wiki_data_updated_in_this_run)
if run_chat_eval:
logging.info(f"Zeile {row_num_in_sheet}: Starte ChatGPT Evaluationen (Grund: {'Re-Eval' if force_reeval else f'AO fehlt? {chat_ts_ao_missing}, Wiki gerade aktualisiert? {wiki_data_updated_in_this_run}'})...")
any_processing_done = True
# Annahme: evaluate_branche_chatgpt existiert und nutzt logging
branch_result = evaluate_branche_chatgpt(
crm_branche, crm_beschreibung,
final_wiki_data.get('branche', 'k.A.'),
final_wiki_data.get('categories', 'k.A.'),
website_summary # Kommt aus Schritt 1 oder initialen Werten
)
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Vorschlag Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get('branch', 'Fehler')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Konsistenz Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get('consistency', 'Fehler')]]})
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Chat Begründung Abweichung Branche"] + 1)}{row_num_in_sheet}', 'values': [[branch_result.get('justification', 'Fehler')]]})
# --- Hier Platz für weitere ChatGPT-Calls ---
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Timestamp letzte Prüfung"] + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
elif process_chatgpt:
logging.debug(f"Zeile {row_num_in_sheet}: Überspringe ChatGPT Evaluationen (AO vorhanden, Wiki nicht gerade aktualisiert und kein Re-Eval).")
# --- 4. Abschließende Updates ---
if any_processing_done:
logging.info(f"--- Verarbeitung für Zeile {row_num_in_sheet} abgeschlossen ---")
# Annahme: Config ist verfügbar
updates.append({'range': f'{self.sheet_handler._get_col_letter(COLUMN_MAP["Version"] + 1)}{row_num_in_sheet}', 'values': [[Config.VERSION]]})
# --- 5. Batch Update für diese Zeile ---
if updates:
logging.info(f"Zeile {row_num_in_sheet}: Sende Batch-Update mit {len(updates)} Operationen...")
success = self.sheet_handler.batch_update_cells(updates) # Annahme: nutzt logging
if not success: logging.error(f"Zeile {row_num_in_sheet}: FEHLER beim Batch-Update.")
else:
if not any_processing_done:
logging.info(f"Zeile {row_num_in_sheet}: Keine Updates zum Schreiben (alle Schritte übersprungen).")
logging.info(f"--- Verarbeitung für Zeile {row_num_in_sheet} abgeschlossen ---")
# Annahme: Config ist verfügbar
time.sleep(max(0.1, getattr(Config, 'RETRY_DELAY', 5) / 20)) # Kleine Pause
# Methode zur sequenziellen Verarbeitung (ruft _process_single_row ohne force_reeval)
def process_rows_sequentially(self, start_data_index, num_rows_to_process,
process_wiki=True, process_chatgpt=True, process_website=True):
""" Verarbeitet Zeilen sequentiell ab einem Startindex. """
# Annahme: sheet_handler wurde im __init__ gesetzt
if not self.sheet_handler or not self.sheet_handler.sheet_values:
logging.error("Sheet Handler nicht verfügbar oder keine Daten geladen in process_rows_sequentially.")
return
data_rows = self.sheet_handler.get_data() # Daten ohne Header
header_rows = 5 # Annahme
if start_data_index >= len(data_rows):
logging.warning(f"Startindex {start_data_index} liegt hinter der letzten Datenzeile ({len(data_rows)}). Keine Verarbeitung.")
return
end_row_index = min(start_data_index + num_rows_to_process, len(data_rows))
actual_rows_to_process = end_row_index - start_data_index
if actual_rows_to_process <= 0:
logging.info("Keine Zeilen zur sequenziellen Verarbeitung übrig.")
return
logging.info(f"Verarbeite {actual_rows_to_process} Zeilen sequenziell (Daten-Index {start_data_index} bis {end_row_index - 1})...")
for i in range(start_data_index, end_row_index):
if i >= len(data_rows):
logging.warning(f"WARNUNG: Index {i} überschreitet Datenlänge ({len(data_rows)}). Breche Schleife ab.")
break
row_data = data_rows[i]
row_num_in_sheet = i + header_rows + 1
# Rufe die zentrale Verarbeitungsmethode auf, OHNE force_reeval
try:
self._process_single_row(row_num_in_sheet, row_data,
process_wiki, process_chatgpt, process_website,
force_reeval=False) # HIER ist der Unterschied zu reeval
except Exception as e_seq:
# Fange Fehler bei der Verarbeitung einzelner Zeilen ab, um den Lauf nicht zu stoppen
logging.exception(f"Fehler bei der sequenziellen Verarbeitung von Zeile {row_num_in_sheet}: {e_seq}")
# Optional: Markiere die Zeile im Sheet als fehlerhaft?
logging.info(f"Sequenzielle Verarbeitung von {actual_rows_to_process} Zeilen abgeschlossen.")
time.sleep(max(0.1, getattr(Config, 'RETRY_DELAY', 5) / 20))
# Methode für den Re-Eval Modus (ruft _process_single_row MIT force_reeval)
def process_reevaluation_rows(self, row_limit=None, clear_flag=True):
@@ -4144,6 +4179,7 @@ def _process_single_row(self, row_num_in_sheet, row_data,
rows_actually_processed = []
for task in rows_to_process:
# Limit-Prüfung VOR der Verarbeitung
if row_limit is not None and processed_count >= row_limit:
logging.info(f"Zeilenlimit ({row_limit}) für Re-Evaluation erreicht. Breche weitere Verarbeitung ab.")
break
@@ -4166,6 +4202,7 @@ def _process_single_row(self, row_num_in_sheet, row_data,
except Exception as e_proc:
logging.exception(f"FEHLER bei Re-Evaluation von Zeile {row_num}: {e_proc}")
# Lösche Flags am Ende
if clear_flag and updates_clear_flag:
logging.info(f"Lösche ReEval-Flags für {len(updates_clear_flag)} erfolgreich verarbeitete Zeilen ({rows_actually_processed})...")
success = self.sheet_handler.batch_update_cells(updates_clear_flag)
@@ -4174,12 +4211,10 @@ def _process_single_row(self, row_num_in_sheet, row_data,
logging.info(f"Re-Evaluierung abgeschlossen. {processed_count} Zeilen verarbeitet (Limit war: {row_limit}, Gefunden: {found_count}).")
# Methode für SERP API Website Lookup
def process_serp_website_lookup_for_empty(self):
""" Neuer Modus 22: Füllt fehlende Websites via SERP API. """
""" Füllt fehlende Websites via SERP API. """
logging.info("Starte Modus: SERP API Website Lookup für leere Zellen in Spalte D.")
# Annahme: sheet_handler wurde initialisiert
if not self.sheet_handler.load_data(): return
data_rows = self.sheet_handler.get_data()
header_rows = 5
@@ -4199,10 +4234,12 @@ def _process_single_row(self, row_num_in_sheet, row_data,
updates = []
for i, row in enumerate(data_rows):
row_num_in_sheet = i + header_rows + 1
current_website = row[website_col_idx] if len(row) > website_col_idx else ""
current_website = ""
if len(row) > website_col_idx: current_website = row[website_col_idx]
if not current_website or current_website.strip().lower() == "k.a.":
company_name = row[name_col_idx] if len(row) > name_col_idx else ""
company_name = ""
if len(row) > name_col_idx: company_name = row[name_col_idx]
if not company_name:
logging.warning(f"Zeile {row_num_in_sheet}: Übersprungen (kein Firmenname für Lookup).")
continue
@@ -4210,7 +4247,7 @@ def _process_single_row(self, row_num_in_sheet, row_data,
logging.info(f"Zeile {row_num_in_sheet}: Suche Website für '{company_name}'...")
# Annahme: serp_website_lookup existiert und nutzt logging
new_website = serp_website_lookup(company_name)
time.sleep(getattr(Config, 'RETRY_DELAY', 5) * 0.3) # Kurze Pause
time.sleep(getattr(Config, 'RETRY_DELAY', 5) * 0.3)
if new_website != "k.A.":
updates.append({'range': f'{website_col_letter}{row_num_in_sheet}', 'values': [[new_website]]})
@@ -4218,20 +4255,17 @@ def _process_single_row(self, row_num_in_sheet, row_data,
rows_processed += 1
else:
logging.info(f"Zeile {row_num_in_sheet}: Keine Website gefunden.")
# Optional: Limit für diesen Modus?
# if row_limit is not None and rows_processed >= row_limit: break
if updates:
logging.info(f"Sende Batch-Update für {rows_processed} gefundene Websites...")
self.sheet_handler.batch_update_cells(updates)
else:
logging.info("Keine fehlenden Websites gefunden zum Aktualisieren.")
logging.info(f"Modus 'website_lookup' abgeschlossen. {rows_processed} Websites ergänzt.")
# Methode für experimentelle Website Details
def process_website_details_for_marked_rows(self):
""" Neuer Modus 23 (EXPERIMENTELL): Extrahiert Website-Details für markierte Zeilen. """
""" EXPERIMENTELL: Extrahiert Website-Details für markierte Zeilen. """
logging.info("Starte Modus (EXPERIMENTELL): Website Detail Extraction für Zeilen mit 'x' in Spalte A.")
if not self.sheet_handler.load_data(): return
data_rows = self.sheet_handler.get_data()
@@ -4241,9 +4275,11 @@ def _process_single_row(self, row_num_in_sheet, row_data,
try:
reeval_col_idx = COLUMN_MAP["ReEval Flag"]
website_col_idx = COLUMN_MAP["CRM Website"]
details_col_idx = COLUMN_MAP["Website Rohtext"] # Nutze AR für Details? Besser neue Spalte!
details_col_idx = COLUMN_MAP.get("Website Details", None) # Versuche neue Spalte oder Fallback
if details_col_idx is None:
details_col_idx = COLUMN_MAP["Website Rohtext"] # Fallback auf AR
logging.warning("Keine Spalte 'Website Details' in COLUMN_MAP, nutze 'Website Rohtext' (AR).")
details_col_letter = self.sheet_handler._get_col_letter(details_col_idx + 1)
# at_col_letter = self.sheet_handler._get_col_letter(COLUMN_MAP["Website Scrape Timestamp"] + 1) # Für Timestamp
except KeyError as e:
logging.critical(f"FEHLER: Benötigte Spalte '{e}' für Modus nicht in COLUMN_MAP.")
return
@@ -4255,14 +4291,15 @@ def _process_single_row(self, row_num_in_sheet, row_data,
for i, row in enumerate(data_rows):
row_num_in_sheet = i + header_rows + 1
if len(row) > reeval_col_idx and row[reeval_col_idx].strip().lower() == "x":
website_url = row[website_col_idx] if len(row) > website_col_idx else ""
website_url = ""
if len(row) > website_col_idx: website_url = row[website_col_idx]
if not website_url or website_url.strip().lower() == "k.a.":
logging.warning(f"Zeile {row_num_in_sheet}: Keine gültige Website in Spalte D vorhanden, überspringe.")
continue
logging.info(f"Zeile {row_num_in_sheet}: Extrahiere Website Details von {website_url}...")
# Annahme: Funktion scrape_website_details existiert
try:
# Annahme: Funktion scrape_website_details existiert
details = scrape_website_details(website_url)
except NameError:
logging.error("Funktion 'scrape_website_details' ist nicht definiert!")
@@ -4271,22 +4308,17 @@ def _process_single_row(self, row_num_in_sheet, row_data,
logging.exception(f"Fehler bei scrape_website_details für {website_url}: {e_detail}")
details = f"FEHLER: {e_detail}"
updates.append({'range': f'{details_col_letter}{row_num_in_sheet}', 'values': [[str(details)]]}) # In AR schreiben
# Optional: Timestamp in AT setzen
# updates.append({'range': f'{at_col_letter}{row_num_in_sheet}', 'values': [[datetime.now().strftime("%Y-%m-%d %H:%M:%S")]]})
updates.append({'range': f'{details_col_letter}{row_num_in_sheet}', 'values': [[str(details)]]})
rows_processed += 1
time.sleep(getattr(Config, 'RETRY_DELAY', 5) * 0.2) # Kleine Pause
time.sleep(getattr(Config, 'RETRY_DELAY', 5) * 0.2)
if updates:
logging.info(f"Sende Batch-Update für {rows_processed} Detail-Extraktionen...")
self.sheet_handler.batch_update_cells(updates)
else:
logging.info("Keine Zeilen mit 'x' gefunden für Detail-Extraktion.")
logging.info(f"Modus 'website_details' abgeschlossen. {rows_processed} Zeilen verarbeitet.")
# Methode zur Datenvorbereitung für ML
def prepare_data_for_modeling(self):
"""
@@ -4294,19 +4326,18 @@ def _process_single_row(self, row_num_in_sheet, row_data,
bereitet sie für das Decision Tree Modell vor.
"""
logging.info("Starte Datenvorbereitung für Modellierung...")
# Annahme: sheet_handler ist initialisiert und hat Daten geladen
if not self.sheet_handler or not self.sheet_handler.sheet_values:
logging.error("Fehler: Sheet Handler nicht initialisiert oder keine Daten geladen für prepare_data.")
if not self.sheet_handler.load_data(): # Versuch nachzuladen
if not self.sheet_handler.load_data():
logging.critical("Konnte Daten auch nach erneutem Versuch nicht laden.")
return None
all_data = self.sheet_handler.sheet_values # Verwende die Daten aus dem Handler
all_data = self.sheet_handler.sheet_values
if len(all_data) <= 5:
logging.error("Fehler: Nicht genügend Datenzeilen im Sheet gefunden für Modellierung.")
return None
try: # Fange Fehler beim Zugriff auf Header ab
try:
headers = all_data[0]
except IndexError:
logging.critical("FEHLER: Sheet scheint leer zu sein, keine Header gefunden.")
@@ -4316,26 +4347,23 @@ def _process_single_row(self, row_num_in_sheet, row_data,
df = pd.DataFrame(data_rows, columns=headers)
logging.info(f"DataFrame für Modellierung erstellt mit {len(df)} Zeilen und {len(df.columns)} Spalten.")
# Notwendige Schlüssel aus COLUMN_MAP holen
try:
col_keys = {
"name": "CRM Name", "branche": "CRM Branche", "umsatz_crm": "CRM Umsatz",
"umsatz_wiki": "Wiki Umsatz", "ma_crm": "CRM Anzahl Mitarbeiter",
"ma_wiki": "Wiki Mitarbeiter", "techniker": "CRM Anzahl Techniker" # ANPASSEN WENN NÖTIG
"ma_wiki": "Wiki Mitarbeiter", "techniker": "CRM Anzahl Techniker"
}
col_indices = {key: COLUMN_MAP[val] for key, val in col_keys.items()}
# Erstelle Liste der Spaltennamen basierend auf den Headern im Sheet
cols_to_select = [headers[idx] for idx in col_indices.values()]
# Mapping von echtem Header zu internem Namen
rename_map = {headers[idx]: key for key, idx in col_indices.items()}
except KeyError as e:
logging.critical(f"FEHLER: Konnte Mapping für Schlüssel '{e}' in COLUMN_MAP nicht finden.")
logging.critical(f"FEHLER: Konnte Mapping für Schlüssel '{e}' nicht in COLUMN_MAP finden.")
return None
except IndexError as e:
logging.critical(f"FEHLER: Spaltenindex aus COLUMN_MAP ({e}) außerhalb der Grenzen der Header-Zeile ({len(headers)} Spalten). Prüfe COLUMN_MAP!")
return None
try: # Fange Fehler beim Auswählen der Spalten ab
try:
df_subset = df[cols_to_select].copy()
df_subset.rename(columns=rename_map, inplace=True)
except KeyError as e:
@@ -4344,12 +4372,13 @@ def _process_single_row(self, row_num_in_sheet, row_data,
logging.info(f"Benötigte Spalten für Modellierung ausgewählt und umbenannt: {list(df_subset.columns)}")
# --- Konsolidierung (wie in vorheriger Antwort, mit Logging) ---
# --- Konsolidierung ---
def get_valid_numeric(value_str):
# ... (Implementierung wie zuletzt, mit Apostroph-Entfernung) ...
if value_str is None or pd.isna(value_str) or str(value_str).strip() == '': return np.nan
original_value = value_str
try:
cleaned_str = str(value_str).replace('.', '').replace("'", "").replace(',', '.') # Auch Apostroph entfernen
cleaned_str = str(value_str).replace('.', '').replace("'", "").replace(',', '.')
val = float(cleaned_str)
return val if val > 0 else np.nan
except (ValueError, TypeError):
@@ -4373,10 +4402,10 @@ def _process_single_row(self, row_num_in_sheet, row_data,
if crm_col not in df_subset.columns: df_subset[crm_col] = np.nan
wiki_numeric = df_subset[wiki_col].apply(get_valid_numeric)
crm_numeric = df_subset[crm_col].apply(get_valid_numeric)
df_subset[final_col] = np.where(wiki_numeric.notna(), wiki_numeric, crm_numeric) # Vereinfacht: Wiki > CRM, sonst NaN
df_subset[final_col] = np.where(wiki_numeric.notna(), wiki_numeric, crm_numeric)
logging.info(f" -> {df_subset[final_col].notna().sum()} gültige '{final_col}' Werte erstellt.")
# --- Zielvariable vorbereiten (wie in vorheriger Antwort, mit Logging) ---
# --- Zielvariable vorbereiten ---
techniker_col = "techniker"
logging.info(f"Verarbeite Zielvariable '{techniker_col}'...")
df_subset['Anzahl_Servicetechniker_Numeric'] = pd.to_numeric(df_subset[techniker_col], errors='coerce')
@@ -4388,25 +4417,25 @@ def _process_single_row(self, row_num_in_sheet, row_data,
logging.info(f"Verbleibende Zeilen für Modellierung: {filtered_rows}")
if filtered_rows == 0: logging.error("FEHLER: Keine Zeilen mit gültiger Technikerzahl übrig!"); return None
# --- Techniker-Buckets erstellen (wie gehabt) ---
# --- Techniker-Buckets erstellen ---
bins = [-1, 0, 19, 49, 99, 249, 499, float('inf')]
labels = ['Bucket_1_(0)', 'Bucket_2_(<20)', 'Bucket_3_(<50)', 'Bucket_4_(<100)', 'Bucket_5_(<250)', 'Bucket_6_(<500)', 'Bucket_7_(>499)']
df_filtered['Techniker_Bucket'] = pd.cut(df_filtered['Anzahl_Servicetechniker_Numeric'], bins=bins, labels=labels, right=True)
logging.info("Techniker-Buckets erstellt.")
logging.debug(f"Verteilung der Buckets:\n{df_filtered['Techniker_Bucket'].value_counts(normalize=True).round(3)}")
# --- Kategoriale Features (Branche) (wie gehabt) ---
# --- Kategoriale Features (Branche) ---
branche_col = "branche"
logging.info(f"Verarbeite kategoriales Feature '{branche_col}' für One-Hot Encoding...")
df_filtered[branche_col] = df_filtered[branche_col].astype(str).fillna('Unbekannt').str.strip()
df_encoded = pd.get_dummies(df_filtered, columns=[branche_col], prefix='Branche', dummy_na=False)
logging.info(f"One-Hot Encoding für '{branche_col}' durchgeführt.")
# --- Finale Auswahl (wie gehabt) ---
# --- Finale Auswahl ---
feature_columns = [col for col in df_encoded.columns if col.startswith('Branche_')]
feature_columns.extend(['Finaler_Umsatz', 'Finaler_Mitarbeiter'])
target_column = 'Techniker_Bucket'
original_data_cols = ['name', 'Anzahl_Servicetechniker_Numeric'] # 'name' statt 'CRM Name' intern
original_data_cols = ['name', 'Anzahl_Servicetechniker_Numeric']
df_model_ready = df_encoded[original_data_cols + feature_columns + [target_column]].copy()
for col in ['Finaler_Umsatz', 'Finaler_Mitarbeiter']:
df_model_ready[col] = pd.to_numeric(df_model_ready[col], errors='coerce')