v1.7.7: Implement Parent Account Suggestion via ChatGPT

- Neuer Modus 'suggest_parents' für die Generierung von Vorschlägen zu Muttergesellschaften.
- Nutzt ChatGPT zur Analyse von Unternehmensdaten (CRM, Wiki, Website-Zusammenfassung).
- Schreibt Vorschläge in Spalte O ('System Vorschlag Parent Account'), setzt Status P auf '?' und Timestamp Q.
- Implementiert parallele OpenAI-Anfragen für Batch-Verarbeitung.
- Automatische Startzeilenermittlung basierend auf leerer Spalte O.
- Optionale Neubewertung von Zeilen mit Status P = '?'.
This commit is contained in:
2025-05-26 10:54:41 +00:00
parent 04734153c2
commit d3ebcf93b4

View File

@@ -8,7 +8,7 @@ von Unternehmensdaten, primär aus einem Google Sheet, ergänzt durch Web Scrapi
Wikipedia, OpenAI (ChatGPT) und SerpAPI (Google Search, LinkedIn).
Autor: Christian Godelmann
Version: v1.7.6
Version: v1.7.7
Hinweis zur Struktur:
Dieser Code wird in logischen Bloecken uebermittelt. Fuegen Sie die Bloecke
@@ -107,7 +107,7 @@ PATTERNS_FILE_JSON = "technician_patterns.json" # Neu (Empfohlen)
# --- Globale Konfiguration Klasse ---
class Config:
"""Zentrale Konfigurationseinstellungen."""
VERSION = "v1.7.6"
VERSION = "v1.7.7"
LANG = "de" # Sprache fuer Wikipedia etc.
# ACHTUNG: SHEET_URL ist hier ein Platzhalter. Ersetzen Sie ihn durch Ihre tatsaechliche URL.
SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo" # <<< ERSETZEN SIE DIES!
@@ -7996,8 +7996,8 @@ class DataProcessor:
# Innerhalb der DataProcessor Klasse
# Innerhalb der DataProcessor Klasse (ersetzen Sie Ihre bestehende Version vollständig hiermit)
# Innerhalb der DataProcessor Klasse
# Innerhalb der DataProcessor Klasse (ersetzen Sie Ihre bestehende Version vollständig hiermit)
def _get_numeric_value_for_plausi(self, value_str, is_umsatz=False):
logger = logging.getLogger(__name__ + "._get_numeric_value_for_plausi")
@@ -8292,6 +8292,298 @@ class DataProcessor:
self.logger.info(f"Plausibilitäts-Check-Lauf (mit Konsolidierung) beendet. {processed_rows_count} Zeilen mit Plausi-Checks versehen, {skipped_count} Zeilen initial übersprungen.")
# ==========================================================================
# === Batch Processing Methods (Parent Account Suggestion) ==============
# ==========================================================================
def _suggest_parent_account_openai_task(self, task_data, openai_semaphore):
"""
Fragt ChatGPT nach einem Parent Account für ein einzelnes Unternehmen.
Läuft in einem separaten Thread für den Parent-Suggestion-Batch.
Args:
task_data (dict): Enthält die Daten für die Zeile
(row_num, crm_name, crm_website, crm_beschreibung,
wiki_url, wiki_absatz, wiki_kategorien, website_zusammenfassung).
openai_semaphore (threading.Semaphore): Semaphore zur Begrenzung gleichzeitiger OpenAI-Calls.
Returns:
dict: {'row_num': int, 'suggested_parent': str, 'justification': str, 'error': str or None}
"""
logger = logging.getLogger(__name__ + ".suggest_parent_task")
row_num = task_data['row_num']
suggested_parent = "k.A."
justification = "Keine Begründung erhalten."
error_msg = None
# Bereinige Input-Daten für den Prompt
crm_name = str(task_data.get('crm_name', 'N/A')).strip()
crm_website = str(task_data.get('crm_website', 'N/A')).strip()
crm_beschreibung = str(task_data.get('crm_beschreibung', 'N/A')).strip()[:800] # Gekürzt
wiki_url = str(task_data.get('wiki_url', 'N/A')).strip()
wiki_absatz = str(task_data.get('wiki_absatz', 'N/A')).strip()[:800] # Gekürzt
wiki_kategorien = str(task_data.get('wiki_kategorien', 'N/A')).strip()[:500] # Gekürzt
website_zusammenfassung = str(task_data.get('website_zusammenfassung', 'N/A')).strip()[:800] # Gekürzt
prompt_parts = [
"Du bist ein Wirtschaftsanalyst und recherchierst Unternehmensstrukturen.",
"Basierend auf den folgenden Informationen, identifiziere bitte den Namen der direkten Muttergesellschaft oder des übergeordneten Konzerns für das genannte Unternehmen.",
"Wenn keine klare Muttergesellschaft ersichtlich ist oder das Unternehmen selbständig zu sein scheint, antworte mit 'k.A.' für den Parent Account.",
"Gib deine Antwort ausschließlich im folgenden Format aus (keine Einleitung, kein Schlusssatz):",
"Vorgeschlagener Parent Account: <Name des Parent Accounts oder k.A.>",
"Begründung: <Sehr kurze Begründung für deinen Vorschlag oder warum keiner gemacht werden kann. Erwähne die Informationsquelle, falls möglich (z.B. Wikipedia Infobox, Website).>",
"\n--- Unternehmensinformationen ---",
f"Unternehmen: {crm_name}",
]
if crm_website and crm_website.lower() != "n/a":
prompt_parts.append(f"Website: {crm_website}")
if crm_beschreibung and crm_beschreibung.lower() != "n/a":
prompt_parts.append(f"CRM Beschreibung: {crm_beschreibung}")
if wiki_url and "wikipedia.org" in wiki_url.lower():
prompt_parts.append(f"Wikipedia URL: {wiki_url}")
if wiki_absatz and wiki_absatz.lower() != "n/a":
prompt_parts.append(f"Wikipedia Absatz: {wiki_absatz}")
if wiki_kategorien and wiki_kategorien.lower() != "n/a":
prompt_parts.append(f"Wikipedia Kategorien: {wiki_kategorien}")
if website_zusammenfassung and website_zusammenfassung.lower() != "n/a" and not website_zusammenfassung.startswith("k.A. (Fehler"):
prompt_parts.append(f"Website Zusammenfassung: {website_zusammenfassung}")
prompt_parts.append("\nBitte gib NUR die Antwort im oben genannten Format.")
prompt = "\n".join(prompt_parts)
# Token Count (optional, zur Info)
# try:
# pt_count = token_count(prompt)
# logger.debug(f"Zeile {row_num}: Prompt für Parent Suggestion ({pt_count} Tokens): {prompt[:200]}...")
# except Exception: pass
try:
with openai_semaphore:
# call_openai_chat ist mit @retry_on_failure dekoriert
raw_chat_response = call_openai_chat(prompt, temperature=0.1, model=getattr(Config, 'TOKEN_MODEL', 'gpt-3.5-turbo'))
if raw_chat_response:
parsed_parent = "k.A."
parsed_justification = "Keine Begründung extrahiert."
parent_match = re.search(r"Vorgeschlagener Parent Account:\s*(.*)", raw_chat_response, re.IGNORECASE)
if parent_match:
parsed_parent = parent_match.group(1).strip()
if not parsed_parent or parsed_parent.lower() == "k.a.": # Sicherstellen, dass "k.A." korrekt übernommen wird
parsed_parent = "k.A."
justification_match = re.search(r"Begründung:\s*(.*)", raw_chat_response, re.IGNORECASE)
if justification_match:
parsed_justification = justification_match.group(1).strip()
suggested_parent = parsed_parent
justification = parsed_justification
logger.debug(f"Zeile {row_num}: ChatGPT Parent Vorschlag='{suggested_parent}', Begründung='{justification[:100]}...'")
else:
error_msg = "Leere Antwort von OpenAI erhalten."
logger.warning(f"Zeile {row_num}: {error_msg}")
justification = error_msg
except Exception as e:
error_msg = f"Fehler bei OpenAI Call für Parent Suggestion (Zeile {row_num}): {type(e).__name__} - {str(e)[:100]}"
logger.error(error_msg)
justification = error_msg
return {"row_num": row_num, "suggested_parent": suggested_parent, "justification": justification, "error": error_msg}
def process_parent_suggestion_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None, re_evaluate_question_mark=False):
"""
Batch-Prozess zur Generierung von Parent-Account-Vorschlägen mittels ChatGPT.
Schreibt Ergebnisse in Spalten O, P, Q.
Args:
start_sheet_row (int, optional): Die 1-basierte Startzeile.
end_sheet_row (int, optional): Die 1-basierte Endzeile.
limit (int, optional): Maximale Anzahl zu verarbeitender Zeilen.
re_evaluate_question_mark (bool, optional): Wenn True, werden auch Zeilen mit '?'
in Spalte P (Parent Vorschlag Status) erneut bewertet.
"""
self.logger.info(f"Starte Parent Account Suggestion Batch. Bereich: {start_sheet_row if start_sheet_row else 'Start'}-{end_sheet_row if end_sheet_row else 'Ende'}, Limit: {limit if limit else 'Unbegrenzt'}, Re-Eval ?: {re_evaluate_question_mark}")
# --- Daten laden und Startzeile ermitteln ---
col_o_key = "System Vorschlag Parent Account"
col_p_key = "Parent Vorschlag Status"
col_q_key = "Parent Vorschlag Timestamp"
if start_sheet_row is None:
self.logger.info(f"Automatische Ermittlung der Startzeile basierend auf leerem '{col_o_key}'...")
start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key=col_o_key, min_sheet_row=7)
if start_data_index_no_header == -1:
self.logger.error("FEHLER bei autom. Startzeilenermittlung. Breche ab.")
return
start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1
self.logger.info(f"Automatisch ermittelte Startzeile: {start_sheet_row}")
else:
if not self.sheet_handler.load_data():
self.logger.error("FEHLER beim Laden der Daten für Parent Suggestion Batch.")
return
all_data = self.sheet_handler.get_all_data_with_headers()
header_rows = self.sheet_handler._header_rows
total_sheet_rows = len(all_data)
if end_sheet_row is None: end_sheet_row = total_sheet_rows
self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}.")
if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows:
self.logger.info("Start liegt nach Ende oder außerhalb des Sheets. Keine Verarbeitung.")
return
# --- Indizes ---
required_keys = [
col_o_key, col_p_key, col_q_key, "CRM Name", "CRM Website", "CRM Beschreibung",
"Wiki URL", "Wiki Absatz", "Wiki Kategorien", "Website Zusammenfassung", "Version"
]
if not all(key in COLUMN_MAP for key in required_keys):
missing = [k for k in required_keys if k not in COLUMN_MAP]
self.logger.critical(f"FEHLER: Spaltenschlüssel für Parent Suggestion Batch fehlen: {missing}. Abbruch.")
return
col_o_letter = self.sheet_handler._get_col_letter(COLUMN_MAP[col_o_key] + 1)
col_p_letter = self.sheet_handler._get_col_letter(COLUMN_MAP[col_p_key] + 1)
col_q_letter = self.sheet_handler._get_col_letter(COLUMN_MAP[col_q_key] + 1)
# Begründung kann optional in eine neue Spalte oder hier ins Log
# Fürs Erste nur Logging der Begründung.
# --- Konfiguration für Parallelverarbeitung ---
openai_sem = threading.Semaphore(getattr(Config, 'OPENAI_CONCURRENCY_LIMIT', 3))
max_workers = getattr(Config, 'MAX_BRANCH_WORKERS', 10) # Wiederverwendung der Branch Worker Config
processing_batch_size = getattr(Config, 'PROCESSING_BRANCH_BATCH_SIZE', 10) # Batch-Größe für Tasks
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50)
tasks_for_current_openai_batch = []
all_sheet_updates = []
processed_count = 0
skipped_count = 0
# Funktion zum Verarbeiten und Schreiben eines Batches
def _execute_and_write_openai_batch(current_tasks):
nonlocal processed_count # Um processed_count im äußeren Scope zu modifizieren
if not current_tasks:
return
batch_start_log_row = current_tasks[0]['row_num']
batch_end_log_row = current_tasks[-1]['row_num']
self.logger.debug(f"\n--- Starte Parent Suggestion OpenAI Batch ({len(current_tasks)} Tasks, Zeilen {batch_start_log_row}-{batch_end_log_row}) ---")
batch_results_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_task_map = {executor.submit(self._suggest_parent_account_openai_task, task, openai_sem): task for task in current_tasks}
for future in concurrent.futures.as_completed(future_to_task_map):
task_info_orig = future_to_task_map[future]
try:
result_data = future.result()
batch_results_list.append(result_data)
except Exception as e_future:
self.logger.error(f"Exception im Future für Parent Suggestion Zeile {task_info_orig['row_num']}: {e_future}")
batch_results_list.append({
"row_num": task_info_orig['row_num'],
"suggested_parent": "FEHLER_TASK",
"justification": str(e_future)[:150],
"error": str(e_future)
})
self.logger.debug(f" OpenAI Batch ({batch_start_log_row}-{batch_end_log_row}) abgeschlossen. {len(batch_results_list)} Ergebnisse erhalten.")
if batch_results_list:
now_ts_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version_str = getattr(Config, 'VERSION', 'unknown')
updates_for_this_batch = []
for res_item in batch_results_list:
rn = res_item['row_num']
parent_val = res_item.get('suggested_parent', 'k.A. (Fehler)')
updates_for_this_batch.append({'range': f'{col_o_letter}{rn}', 'values': [[parent_val]]})
# Nur auf "?" setzen, wenn ein Vorschlag gemacht wurde (nicht "k.A." oder Fehler)
status_val = "?" if parent_val and parent_val.lower() != "k.a." and not parent_val.startswith("FEHLER") else ""
updates_for_this_batch.append({'range': f'{col_p_letter}{rn}', 'values': [[status_val]]})
updates_for_this_batch.append({'range': f'{col_q_letter}{rn}', 'values': [[now_ts_str]]})
# Optional: Begründung in eine neue Spalte schreiben
# updates_for_this_batch.append({'range': f'{BEGRUENDUNG_SPALTE_LETTER}{rn}', 'values': [[res_item.get('justification', '')]]})
# Logge die Begründung, auch wenn sie nicht ins Sheet geschrieben wird
if res_item.get('justification'):
self.logger.debug(f"Zeile {rn} - Parent Begründung: {res_item.get('justification')[:200]}...")
all_sheet_updates.extend(updates_for_this_batch)
processed_count += len(current_tasks) # Erhöhe processed_count hier
# Sheet Updates in Batches senden
if len(all_sheet_updates) >= update_batch_row_limit * 3: # 3 Spalten pro Update (O, P, Q)
self.logger.info(f"Sende Batch-Updates für Parent Suggestions ({len(all_sheet_updates)//3} Zeilen)...")
self.sheet_handler.batch_update_cells(all_sheet_updates)
all_sheet_updates.clear()
# Pause nach dem Batch
time.sleep(getattr(Config, 'RETRY_DELAY', 5) * 0.5)
# Ende der Hilfsfunktion _execute_and_write_openai_batch
# Hauptschleife über die Zeilen
for i in range(start_sheet_row, end_sheet_row + 1):
if limit is not None and processed_count >= limit:
self.logger.info(f"Verarbeitungslimit ({limit}) für Parent Suggestions erreicht.")
break
row_idx_list = i - 1
if row_idx_list >= total_sheet_rows: break
row = all_data[row_idx_list]
if not any(cell and str(cell).strip() for cell in row):
skipped_count += 1
continue
# Kriterien für Verarbeitung
val_o = self._get_cell_value_safe(row, col_o_key).strip()
val_p = self._get_cell_value_safe(row, col_p_key).strip()
# val_q = self._get_cell_value_safe(row, col_q_key).strip() # Timestamp nicht direkt für Auswahl relevant
needs_processing = False
if not val_o or val_o.lower() == "k.a.": # Spalte O ist leer oder k.A.
needs_processing = True
elif re_evaluate_question_mark and val_p == "?": # Neubewertung für Status "?"
needs_processing = True
if not needs_processing:
skipped_count += 1
continue
# Daten für Task sammeln
task_data = {
"row_num": i,
"crm_name": self._get_cell_value_safe(row, "CRM Name"),
"crm_website": self._get_cell_value_safe(row, "CRM Website"),
"crm_beschreibung": self._get_cell_value_safe(row, "CRM Beschreibung"),
"wiki_url": self._get_cell_value_safe(row, "Wiki URL"),
"wiki_absatz": self._get_cell_value_safe(row, "Wiki Absatz"),
"wiki_kategorien": self._get_cell_value_safe(row, "Wiki Kategorien"),
"website_zusammenfassung": self._get_cell_value_safe(row, "Website Zusammenfassung")
}
tasks_for_current_openai_batch.append(task_data)
if len(tasks_for_current_openai_batch) >= processing_batch_size:
_execute_and_write_openai_batch(tasks_for_current_openai_batch)
tasks_for_current_openai_batch.clear()
# Letzten Batch verarbeiten
if tasks_for_current_openai_batch:
_execute_and_write_openai_batch(tasks_for_current_openai_batch)
# Letzte Sheet Updates senden
if all_sheet_updates:
self.logger.info(f"Sende finale Batch-Updates für Parent Suggestions ({len(all_sheet_updates)//3} Zeilen)...")
self.sheet_handler.batch_update_cells(all_sheet_updates)
self.logger.info(f"Parent Account Suggestion Batch abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen übersprungen.")
# ==========================================================================
# === Utility Methods (ML Data Prep & Training) ============================
# ==========================================================================
@@ -9990,6 +10282,7 @@ def main():
"website_scraping", # Uebereinstimmend mit process_website_scraping_batch (Block 27)
"summarize_website", # Uebereinstimmend mit process_summarization_batch (Block 28)
"branch_eval", # Uebereinstimmend mit process_branch_batch (Block 29)
"suggest_parents",
],
"Sequentielle Verarbeitung (Zeilenweise)": [
"full_run", # Nutzt process_rows_sequentially (Block 24)
@@ -10575,6 +10868,15 @@ def main():
limit=final_limit_to_use # VERWENDE final_limit_to_use
)
elif selected_mode == "suggest_parents": # <<< NEUER ELIF-BLOCK
data_processor.process_parent_suggestion_batch(
start_sheet_row=args.start_sheet_row,
end_sheet_row=args.end_sheet_row,
limit=final_limit_to_use, # Nutzt das ggf. interaktiv abgefragte Limit
re_evaluate_question_mark=True # Beispiel: Standardmäßig Fragezeichen neu bewerten
# Sie können hierfür auch ein CLI Argument hinzufügen
)
# ---- Modus nicht gefunden (sollte durch Validierung oben abgefangen werden) ----
else:
# Dieser Zweig sollte aufgrund der Validierung am Anfang nie erreicht werden.