From d3ebcf93b4c9a750b64f22aae308c0fd8334d667 Mon Sep 17 00:00:00 2001 From: Floke Date: Mon, 26 May 2025 10:54:41 +0000 Subject: [PATCH] v1.7.7: Implement Parent Account Suggestion via ChatGPT MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Neuer Modus 'suggest_parents' für die Generierung von Vorschlägen zu Muttergesellschaften. - Nutzt ChatGPT zur Analyse von Unternehmensdaten (CRM, Wiki, Website-Zusammenfassung). - Schreibt Vorschläge in Spalte O ('System Vorschlag Parent Account'), setzt Status P auf '?' und Timestamp Q. - Implementiert parallele OpenAI-Anfragen für Batch-Verarbeitung. - Automatische Startzeilenermittlung basierend auf leerer Spalte O. - Optionale Neubewertung von Zeilen mit Status P = '?'. --- brancheneinstufung.py | 310 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 306 insertions(+), 4 deletions(-) diff --git a/brancheneinstufung.py b/brancheneinstufung.py index b86ca465..c70bc090 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -8,7 +8,7 @@ von Unternehmensdaten, primär aus einem Google Sheet, ergänzt durch Web Scrapi Wikipedia, OpenAI (ChatGPT) und SerpAPI (Google Search, LinkedIn). Autor: Christian Godelmann -Version: v1.7.6 +Version: v1.7.7 Hinweis zur Struktur: Dieser Code wird in logischen Bloecken uebermittelt. Fuegen Sie die Bloecke @@ -107,7 +107,7 @@ PATTERNS_FILE_JSON = "technician_patterns.json" # Neu (Empfohlen) # --- Globale Konfiguration Klasse --- class Config: """Zentrale Konfigurationseinstellungen.""" - VERSION = "v1.7.6" + VERSION = "v1.7.7" LANG = "de" # Sprache fuer Wikipedia etc. # ACHTUNG: SHEET_URL ist hier ein Platzhalter. Ersetzen Sie ihn durch Ihre tatsaechliche URL. SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo" # <<< ERSETZEN SIE DIES! @@ -7996,8 +7996,8 @@ class DataProcessor: -# Innerhalb der DataProcessor Klasse -# Innerhalb der DataProcessor Klasse (ersetzen Sie Ihre bestehende Version vollständig hiermit) + # Innerhalb der DataProcessor Klasse + # Innerhalb der DataProcessor Klasse (ersetzen Sie Ihre bestehende Version vollständig hiermit) def _get_numeric_value_for_plausi(self, value_str, is_umsatz=False): logger = logging.getLogger(__name__ + "._get_numeric_value_for_plausi") @@ -8292,6 +8292,298 @@ class DataProcessor: self.logger.info(f"Plausibilitäts-Check-Lauf (mit Konsolidierung) beendet. {processed_rows_count} Zeilen mit Plausi-Checks versehen, {skipped_count} Zeilen initial übersprungen.") + # ========================================================================== + # === Batch Processing Methods (Parent Account Suggestion) ============== + # ========================================================================== + + def _suggest_parent_account_openai_task(self, task_data, openai_semaphore): + """ + Fragt ChatGPT nach einem Parent Account für ein einzelnes Unternehmen. + Läuft in einem separaten Thread für den Parent-Suggestion-Batch. + + Args: + task_data (dict): Enthält die Daten für die Zeile + (row_num, crm_name, crm_website, crm_beschreibung, + wiki_url, wiki_absatz, wiki_kategorien, website_zusammenfassung). + openai_semaphore (threading.Semaphore): Semaphore zur Begrenzung gleichzeitiger OpenAI-Calls. + + Returns: + dict: {'row_num': int, 'suggested_parent': str, 'justification': str, 'error': str or None} + """ + logger = logging.getLogger(__name__ + ".suggest_parent_task") + row_num = task_data['row_num'] + suggested_parent = "k.A." + justification = "Keine Begründung erhalten." + error_msg = None + + # Bereinige Input-Daten für den Prompt + crm_name = str(task_data.get('crm_name', 'N/A')).strip() + crm_website = str(task_data.get('crm_website', 'N/A')).strip() + crm_beschreibung = str(task_data.get('crm_beschreibung', 'N/A')).strip()[:800] # Gekürzt + wiki_url = str(task_data.get('wiki_url', 'N/A')).strip() + wiki_absatz = str(task_data.get('wiki_absatz', 'N/A')).strip()[:800] # Gekürzt + wiki_kategorien = str(task_data.get('wiki_kategorien', 'N/A')).strip()[:500] # Gekürzt + website_zusammenfassung = str(task_data.get('website_zusammenfassung', 'N/A')).strip()[:800] # Gekürzt + + prompt_parts = [ + "Du bist ein Wirtschaftsanalyst und recherchierst Unternehmensstrukturen.", + "Basierend auf den folgenden Informationen, identifiziere bitte den Namen der direkten Muttergesellschaft oder des übergeordneten Konzerns für das genannte Unternehmen.", + "Wenn keine klare Muttergesellschaft ersichtlich ist oder das Unternehmen selbständig zu sein scheint, antworte mit 'k.A.' für den Parent Account.", + "Gib deine Antwort ausschließlich im folgenden Format aus (keine Einleitung, kein Schlusssatz):", + "Vorgeschlagener Parent Account: ", + "Begründung: ", + "\n--- Unternehmensinformationen ---", + f"Unternehmen: {crm_name}", + ] + if crm_website and crm_website.lower() != "n/a": + prompt_parts.append(f"Website: {crm_website}") + if crm_beschreibung and crm_beschreibung.lower() != "n/a": + prompt_parts.append(f"CRM Beschreibung: {crm_beschreibung}") + if wiki_url and "wikipedia.org" in wiki_url.lower(): + prompt_parts.append(f"Wikipedia URL: {wiki_url}") + if wiki_absatz and wiki_absatz.lower() != "n/a": + prompt_parts.append(f"Wikipedia Absatz: {wiki_absatz}") + if wiki_kategorien and wiki_kategorien.lower() != "n/a": + prompt_parts.append(f"Wikipedia Kategorien: {wiki_kategorien}") + if website_zusammenfassung and website_zusammenfassung.lower() != "n/a" and not website_zusammenfassung.startswith("k.A. (Fehler"): + prompt_parts.append(f"Website Zusammenfassung: {website_zusammenfassung}") + + prompt_parts.append("\nBitte gib NUR die Antwort im oben genannten Format.") + prompt = "\n".join(prompt_parts) + + # Token Count (optional, zur Info) + # try: + # pt_count = token_count(prompt) + # logger.debug(f"Zeile {row_num}: Prompt für Parent Suggestion ({pt_count} Tokens): {prompt[:200]}...") + # except Exception: pass + + try: + with openai_semaphore: + # call_openai_chat ist mit @retry_on_failure dekoriert + raw_chat_response = call_openai_chat(prompt, temperature=0.1, model=getattr(Config, 'TOKEN_MODEL', 'gpt-3.5-turbo')) + + if raw_chat_response: + parsed_parent = "k.A." + parsed_justification = "Keine Begründung extrahiert." + + parent_match = re.search(r"Vorgeschlagener Parent Account:\s*(.*)", raw_chat_response, re.IGNORECASE) + if parent_match: + parsed_parent = parent_match.group(1).strip() + if not parsed_parent or parsed_parent.lower() == "k.a.": # Sicherstellen, dass "k.A." korrekt übernommen wird + parsed_parent = "k.A." + + justification_match = re.search(r"Begründung:\s*(.*)", raw_chat_response, re.IGNORECASE) + if justification_match: + parsed_justification = justification_match.group(1).strip() + + suggested_parent = parsed_parent + justification = parsed_justification + logger.debug(f"Zeile {row_num}: ChatGPT Parent Vorschlag='{suggested_parent}', Begründung='{justification[:100]}...'") + else: + error_msg = "Leere Antwort von OpenAI erhalten." + logger.warning(f"Zeile {row_num}: {error_msg}") + justification = error_msg + + except Exception as e: + error_msg = f"Fehler bei OpenAI Call für Parent Suggestion (Zeile {row_num}): {type(e).__name__} - {str(e)[:100]}" + logger.error(error_msg) + justification = error_msg + + return {"row_num": row_num, "suggested_parent": suggested_parent, "justification": justification, "error": error_msg} + + def process_parent_suggestion_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None, re_evaluate_question_mark=False): + """ + Batch-Prozess zur Generierung von Parent-Account-Vorschlägen mittels ChatGPT. + Schreibt Ergebnisse in Spalten O, P, Q. + + Args: + start_sheet_row (int, optional): Die 1-basierte Startzeile. + end_sheet_row (int, optional): Die 1-basierte Endzeile. + limit (int, optional): Maximale Anzahl zu verarbeitender Zeilen. + re_evaluate_question_mark (bool, optional): Wenn True, werden auch Zeilen mit '?' + in Spalte P (Parent Vorschlag Status) erneut bewertet. + """ + self.logger.info(f"Starte Parent Account Suggestion Batch. Bereich: {start_sheet_row if start_sheet_row else 'Start'}-{end_sheet_row if end_sheet_row else 'Ende'}, Limit: {limit if limit else 'Unbegrenzt'}, Re-Eval ?: {re_evaluate_question_mark}") + + # --- Daten laden und Startzeile ermitteln --- + col_o_key = "System Vorschlag Parent Account" + col_p_key = "Parent Vorschlag Status" + col_q_key = "Parent Vorschlag Timestamp" + + if start_sheet_row is None: + self.logger.info(f"Automatische Ermittlung der Startzeile basierend auf leerem '{col_o_key}'...") + start_data_index_no_header = self.sheet_handler.get_start_row_index(check_column_key=col_o_key, min_sheet_row=7) + if start_data_index_no_header == -1: + self.logger.error("FEHLER bei autom. Startzeilenermittlung. Breche ab.") + return + start_sheet_row = start_data_index_no_header + self.sheet_handler._header_rows + 1 + self.logger.info(f"Automatisch ermittelte Startzeile: {start_sheet_row}") + else: + if not self.sheet_handler.load_data(): + self.logger.error("FEHLER beim Laden der Daten für Parent Suggestion Batch.") + return + + all_data = self.sheet_handler.get_all_data_with_headers() + header_rows = self.sheet_handler._header_rows + total_sheet_rows = len(all_data) + + if end_sheet_row is None: end_sheet_row = total_sheet_rows + self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}.") + if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows: + self.logger.info("Start liegt nach Ende oder außerhalb des Sheets. Keine Verarbeitung.") + return + + # --- Indizes --- + required_keys = [ + col_o_key, col_p_key, col_q_key, "CRM Name", "CRM Website", "CRM Beschreibung", + "Wiki URL", "Wiki Absatz", "Wiki Kategorien", "Website Zusammenfassung", "Version" + ] + if not all(key in COLUMN_MAP for key in required_keys): + missing = [k for k in required_keys if k not in COLUMN_MAP] + self.logger.critical(f"FEHLER: Spaltenschlüssel für Parent Suggestion Batch fehlen: {missing}. Abbruch.") + return + + col_o_letter = self.sheet_handler._get_col_letter(COLUMN_MAP[col_o_key] + 1) + col_p_letter = self.sheet_handler._get_col_letter(COLUMN_MAP[col_p_key] + 1) + col_q_letter = self.sheet_handler._get_col_letter(COLUMN_MAP[col_q_key] + 1) + # Begründung kann optional in eine neue Spalte oder hier ins Log + # Fürs Erste nur Logging der Begründung. + + # --- Konfiguration für Parallelverarbeitung --- + openai_sem = threading.Semaphore(getattr(Config, 'OPENAI_CONCURRENCY_LIMIT', 3)) + max_workers = getattr(Config, 'MAX_BRANCH_WORKERS', 10) # Wiederverwendung der Branch Worker Config + processing_batch_size = getattr(Config, 'PROCESSING_BRANCH_BATCH_SIZE', 10) # Batch-Größe für Tasks + update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50) + + + tasks_for_current_openai_batch = [] + all_sheet_updates = [] + processed_count = 0 + skipped_count = 0 + + # Funktion zum Verarbeiten und Schreiben eines Batches + def _execute_and_write_openai_batch(current_tasks): + nonlocal processed_count # Um processed_count im äußeren Scope zu modifizieren + if not current_tasks: + return + + batch_start_log_row = current_tasks[0]['row_num'] + batch_end_log_row = current_tasks[-1]['row_num'] + self.logger.debug(f"\n--- Starte Parent Suggestion OpenAI Batch ({len(current_tasks)} Tasks, Zeilen {batch_start_log_row}-{batch_end_log_row}) ---") + + batch_results_list = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_task_map = {executor.submit(self._suggest_parent_account_openai_task, task, openai_sem): task for task in current_tasks} + for future in concurrent.futures.as_completed(future_to_task_map): + task_info_orig = future_to_task_map[future] + try: + result_data = future.result() + batch_results_list.append(result_data) + except Exception as e_future: + self.logger.error(f"Exception im Future für Parent Suggestion Zeile {task_info_orig['row_num']}: {e_future}") + batch_results_list.append({ + "row_num": task_info_orig['row_num'], + "suggested_parent": "FEHLER_TASK", + "justification": str(e_future)[:150], + "error": str(e_future) + }) + + self.logger.debug(f" OpenAI Batch ({batch_start_log_row}-{batch_end_log_row}) abgeschlossen. {len(batch_results_list)} Ergebnisse erhalten.") + + if batch_results_list: + now_ts_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_version_str = getattr(Config, 'VERSION', 'unknown') + updates_for_this_batch = [] + + for res_item in batch_results_list: + rn = res_item['row_num'] + parent_val = res_item.get('suggested_parent', 'k.A. (Fehler)') + + updates_for_this_batch.append({'range': f'{col_o_letter}{rn}', 'values': [[parent_val]]}) + # Nur auf "?" setzen, wenn ein Vorschlag gemacht wurde (nicht "k.A." oder Fehler) + status_val = "?" if parent_val and parent_val.lower() != "k.a." and not parent_val.startswith("FEHLER") else "" + updates_for_this_batch.append({'range': f'{col_p_letter}{rn}', 'values': [[status_val]]}) + updates_for_this_batch.append({'range': f'{col_q_letter}{rn}', 'values': [[now_ts_str]]}) + # Optional: Begründung in eine neue Spalte schreiben + # updates_for_this_batch.append({'range': f'{BEGRUENDUNG_SPALTE_LETTER}{rn}', 'values': [[res_item.get('justification', '')]]}) + + # Logge die Begründung, auch wenn sie nicht ins Sheet geschrieben wird + if res_item.get('justification'): + self.logger.debug(f"Zeile {rn} - Parent Begründung: {res_item.get('justification')[:200]}...") + + + all_sheet_updates.extend(updates_for_this_batch) + processed_count += len(current_tasks) # Erhöhe processed_count hier + + # Sheet Updates in Batches senden + if len(all_sheet_updates) >= update_batch_row_limit * 3: # 3 Spalten pro Update (O, P, Q) + self.logger.info(f"Sende Batch-Updates für Parent Suggestions ({len(all_sheet_updates)//3} Zeilen)...") + self.sheet_handler.batch_update_cells(all_sheet_updates) + all_sheet_updates.clear() + + # Pause nach dem Batch + time.sleep(getattr(Config, 'RETRY_DELAY', 5) * 0.5) + # Ende der Hilfsfunktion _execute_and_write_openai_batch + + # Hauptschleife über die Zeilen + for i in range(start_sheet_row, end_sheet_row + 1): + if limit is not None and processed_count >= limit: + self.logger.info(f"Verarbeitungslimit ({limit}) für Parent Suggestions erreicht.") + break + + row_idx_list = i - 1 + if row_idx_list >= total_sheet_rows: break + row = all_data[row_idx_list] + + if not any(cell and str(cell).strip() for cell in row): + skipped_count += 1 + continue + + # Kriterien für Verarbeitung + val_o = self._get_cell_value_safe(row, col_o_key).strip() + val_p = self._get_cell_value_safe(row, col_p_key).strip() + # val_q = self._get_cell_value_safe(row, col_q_key).strip() # Timestamp nicht direkt für Auswahl relevant + + needs_processing = False + if not val_o or val_o.lower() == "k.a.": # Spalte O ist leer oder k.A. + needs_processing = True + elif re_evaluate_question_mark and val_p == "?": # Neubewertung für Status "?" + needs_processing = True + + if not needs_processing: + skipped_count += 1 + continue + + # Daten für Task sammeln + task_data = { + "row_num": i, + "crm_name": self._get_cell_value_safe(row, "CRM Name"), + "crm_website": self._get_cell_value_safe(row, "CRM Website"), + "crm_beschreibung": self._get_cell_value_safe(row, "CRM Beschreibung"), + "wiki_url": self._get_cell_value_safe(row, "Wiki URL"), + "wiki_absatz": self._get_cell_value_safe(row, "Wiki Absatz"), + "wiki_kategorien": self._get_cell_value_safe(row, "Wiki Kategorien"), + "website_zusammenfassung": self._get_cell_value_safe(row, "Website Zusammenfassung") + } + tasks_for_current_openai_batch.append(task_data) + + if len(tasks_for_current_openai_batch) >= processing_batch_size: + _execute_and_write_openai_batch(tasks_for_current_openai_batch) + tasks_for_current_openai_batch.clear() + + # Letzten Batch verarbeiten + if tasks_for_current_openai_batch: + _execute_and_write_openai_batch(tasks_for_current_openai_batch) + + # Letzte Sheet Updates senden + if all_sheet_updates: + self.logger.info(f"Sende finale Batch-Updates für Parent Suggestions ({len(all_sheet_updates)//3} Zeilen)...") + self.sheet_handler.batch_update_cells(all_sheet_updates) + + self.logger.info(f"Parent Account Suggestion Batch abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen übersprungen.") + + + # ========================================================================== # === Utility Methods (ML Data Prep & Training) ============================ # ========================================================================== @@ -9990,6 +10282,7 @@ def main(): "website_scraping", # Uebereinstimmend mit process_website_scraping_batch (Block 27) "summarize_website", # Uebereinstimmend mit process_summarization_batch (Block 28) "branch_eval", # Uebereinstimmend mit process_branch_batch (Block 29) + "suggest_parents", ], "Sequentielle Verarbeitung (Zeilenweise)": [ "full_run", # Nutzt process_rows_sequentially (Block 24) @@ -10575,6 +10868,15 @@ def main(): limit=final_limit_to_use # VERWENDE final_limit_to_use ) + elif selected_mode == "suggest_parents": # <<< NEUER ELIF-BLOCK + data_processor.process_parent_suggestion_batch( + start_sheet_row=args.start_sheet_row, + end_sheet_row=args.end_sheet_row, + limit=final_limit_to_use, # Nutzt das ggf. interaktiv abgefragte Limit + re_evaluate_question_mark=True # Beispiel: Standardmäßig Fragezeichen neu bewerten + # Sie können hierfür auch ein CLI Argument hinzufügen + ) + # ---- Modus nicht gefunden (sollte durch Validierung oben abgefangen werden) ---- else: # Dieser Zweig sollte aufgrund der Validierung am Anfang nie erreicht werden.