From 048559e11f31f47992d1b669178af48a1acda4d6 Mon Sep 17 00:00:00 2001 From: Floke Date: Wed, 27 Aug 2025 16:02:32 +0000 Subject: [PATCH] sync_manager.py aktualisiert --- sync_manager.py | 401 +++++++++++++++++++++++------------------------- 1 file changed, 192 insertions(+), 209 deletions(-) diff --git a/sync_manager.py b/sync_manager.py index f46d00bc..79a94d47 100644 --- a/sync_manager.py +++ b/sync_manager.py @@ -1,223 +1,206 @@ -#!/usr/bin/env python3 -""" -sync_manager.py - -Modul für den Datenabgleich zwischen einem D365 Excel-Export und dem Google Sheet. -Führt einen intelligenten "Full-Sync" durch, um neue, geänderte und -gelöschte Datensätze zu identifizieren und zu verarbeiten. - -Enthält Logik für: -- Smart-Merging von Feldern (z.B. Website). -- Automatisches Setzen des Re-Eval-Flags bei Stammdatenänderungen. -- Markieren von archivierten Datensätzen. -- Protokollieren von Datenkonflikten. -""" - -import pandas as pd -import logging -from datetime import datetime - -# Importiere die benötigten Konfigurationen -from config import COLUMN_ORDER, COLUMN_MAP - -class SyncManager: +def main(): """ - Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet. + Haupteinstiegspunkt des Skripts. + Verarbeitet Kommandozeilen-Argumente, richtet Logging ein, + initialisiert Komponenten und dispatchet zu den passenden Modi. """ -def __init__(self, sheet_handler, d365_export_path, target_sheet_name='Tabelle1'): - """ - Initialisiert den SyncManager. + # --- Importe innerhalb der Funktion, um Abhängigkeiten klar zu halten --- + import argparse + import time + import logging + import os # <<< NEU: für Dateipfad-Prüfung + from config import Config, log_module_versions, create_log_filename + from google_sheet_handler import GoogleSheetHandler + from wikipedia_scraper import WikipediaScraper + from data_processor import DataProcessor + from sync_manager import SyncManager # <<< NEU: SyncManager importieren + import helpers + import google_sheet_handler # Für Version-Logging - Args: - sheet_handler: Eine instanziierte GoogleSheetHandler-Klasse. - d365_export_path (str): Der Dateipfad zur D365 Excel-Exportdatei. - target_sheet_name (str): Der Name des Ziel-Tabellenblatts. - """ - self.sheet_handler = sheet_handler - self.d365_export_path = d365_export_path - self.target_sheet_name = target_sheet_name - self.logger = logging.getLogger(__name__) + # --- Argument Parser --- + parser = argparse.ArgumentParser( + description=f"Firmen-Datenanreicherungs-Skript {Config.VERSION}.", + formatter_class=argparse.RawTextHelpFormatter + ) + mode_categories = { + "Daten-Synchronisation": ["sync"], # <<< NEU: Eigene Kategorie für den Sync + "Batch-Verarbeitung": ["wiki_verify", "website_scraping", "summarize_website", "branch_eval", "suggest_parents", "fsm_pitch"], + "Sequentielle Verarbeitung": ["full_run"], + "Re-Evaluation": ["reeval"], + "Dienstprogramme": ["find_wiki_serp", "check_urls", "contacts", "update_wiki_suggestions", "wiki_reextract_missing_an", "website_details", "train_technician_model", "predict_technicians", "alignment", "reparatur_sitz", "plausi_check_data"], + "Kombinierte Läufe": ["combined_all"], + "Spezial-Modi": ["reclassify_branches"], + } + valid_modes = [mode for modes in mode_categories.values() for mode in modes] + mode_help_text = "Betriebsmodus. Waehlen Sie einen der folgenden:\n" + for category, modes in mode_categories.items(): + mode_help_text += f"\n{category}:\n" + "".join([f" - {mode}\n" for mode in modes]) - # Definiert, welche D365-Spalten welchen GSheet-Spalten entsprechen - self.d365_to_gsheet_map = { - "Account Name": "CRM Name", - "Parent Account": "Parent Account Name", - "Website": "CRM Website", - "City": "CRM Ort", - "Country": "CRM Land", - "Description FSM": "CRM Beschreibung", - "Branch detail": "CRM Branche", - "No. Service Technicians": "CRM Anzahl Techniker", - "Annual Revenue (Mio. €)": "CRM Umsatz", - "Number of Employees": "CRM Anzahl Mitarbeiter", - "GUID": "CRM ID" # <<< KORRIGIERT: Passt jetzt zu deiner Export-Datei - } + parser.add_argument("--mode", type=str, help=mode_help_text) + parser.add_argument("--limit", type=int, help="Maximale Anzahl zu verarbeitender Zeilen.", default=None) + parser.add_argument("--start_sheet_row", type=int, help="Startzeile im Sheet (1-basiert).", default=None) + parser.add_argument("--end_sheet_row", type=int, help="Endzeile im Sheet (1-basiert).", default=None) + + valid_steps = ['wiki', 'chat', 'web', 'ml_predict'] + parser.add_argument("--steps", type=str, help=f"Schritte für 'reeval'/'full_run' (z.B. 'wiki,chat'). Optionen: {', '.join(valid_steps)}.", default=','.join(valid_steps)) + parser.add_argument("--min_umsatz", type=float, help="Mindestumsatz in MIO € für 'find_wiki_serp'.", default=200.0) + parser.add_argument("--min_employees", type=int, help="Mindest-MA für 'find_wiki_serp'.", default=500) + + # <<< NEU: Argument für den Pfad der Sync-Datei + parser.add_argument("--sync_file", type=str, help="Pfad zur D365 Excel-Exportdatei für den 'sync'-Modus.", default="d365_export.xlsx") - # Definiert die Merge-Strategien für GSheet-Spalten - self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land", - "CRM Beschreibung", "CRM Branche", "CRM Anzahl Techniker", - "CRM Umsatz", "CRM Anzahl Mitarbeiter"] - self.smart_merge_cols = ["CRM Website"] + args = parser.parse_args() - def _load_data(self): - """Lädt Daten aus der D365-Exportdatei und dem Google Sheet.""" - self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...") - try: - # Lade alle Daten als String, um Formatierungsfehler (besonders bei GUIDs) zu vermeiden - self.d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('') - - # Finde den D365-Spaltennamen für unsere 'CRM ID' - d365_guid_col_name = next((k for k, v in self.d365_to_gsheet_map.items() if v == "CRM ID"), None) + # --- Modusauswahl (interaktiv, wenn nicht über CLI) --- + selected_mode = args.mode.lower() if args.mode else None + if not selected_mode: + print("\nBitte waehlen Sie den Betriebsmodus:") + mode_map = {} + counter = 1 + for category, modes in mode_categories.items(): + print(f"\n{category}:") + for mode in modes: + print(f" {counter}: {mode}") + mode_map[str(counter)] = mode + mode_map[mode] = mode + counter += 1 + print("\n 0: Abbrechen") + mode_map['0'] = 'exit' - if d365_guid_col_name and d365_guid_col_name in self.d365_df.columns: - # Umbenennen der Spalte für die interne Verarbeitung - self.d365_df.rename(columns={d365_guid_col_name: "CRM ID"}, inplace=True) + while selected_mode is None: + try: + choice = input("Geben Sie den Modusnamen oder die Zahl ein: ").strip().lower() + if choice in mode_map: + selected_mode = mode_map[choice] + if selected_mode == 'exit': + print("Abgebrochen.") + return + else: + print("Ungueltige Eingabe.") + except (EOFError, KeyboardInterrupt): + print("\nAbgebrochen.") + return + + # --- Logging Konfiguration --- + # Definiere hier die Logging-Konstanten, falls sie nicht global sind + LOG_LEVEL = logging.DEBUG if Config.DEBUG else logging.INFO + LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s' + logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT) + logger = logging.getLogger(__name__) + + # --- Logdatei-Konfiguration abschließen --- + log_file_path = create_log_filename(selected_mode) + if log_file_path: + file_handler = logging.FileHandler(log_file_path, mode='a', encoding='utf-8') + file_handler.setLevel(LOG_LEVEL) + file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) + logging.getLogger('').addHandler(file_handler) + + logger.info(f"===== Skript gestartet: Modus '{selected_mode}' =====") + logger.info(f"Projekt-Version (Config): {Config.VERSION}") + logger.info(f"Logdatei: {log_file_path or 'FEHLER - Keine Logdatei'}") + logger.info(f"CLI Argumente: {args}") + + # --- Hauptlogik --- + try: + # --- Vorbereitung & Initialisierung --- + Config.load_api_keys() + + sheet_handler = GoogleSheetHandler() + + # <<< NEU: Früher Ausstieg für den Sync-Modus, da er keine Scraper/Prozessoren braucht + if selected_mode == "sync": + d365_file_path = args.sync_file + if not os.path.exists(d365_file_path): + logger.critical(f"Export-Datei nicht gefunden: {d365_file_path}") + print(f"\n! FEHLER: Die angegebene Sync-Datei wurde nicht gefunden: {d365_file_path}") else: - self.logger.critical(f"Die erwartete GUID-Spalte '{d365_guid_col_name}' wurde nicht in der D365-Exportdatei gefunden.") - raise ValueError("GUID-Spalte nicht in der D365-Exportdatei gefunden.") - - self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip() - - except FileNotFoundError: - self.logger.critical(f"FEHLER: D365-Exportdatei nicht gefunden unter: {self.d365_export_path}") - return False - except Exception as e: - self.logger.critical(f"FEHLER beim Lesen der Excel-Datei: {e}", exc_info=True) - return False - - self.logger.info("Lade bestehende Daten aus dem Google Sheet...") - # KORREKTUR: Verwende die existierende Methode und erstelle den DataFrame manuell - all_gsheet_data = self.sheet_handler.get_all_data_with_headers() - - if not all_gsheet_data or len(all_gsheet_data) < 2: - self.logger.warning("Google Sheet ist leer oder enthält nur eine Header-Zeile. Erstelle leeren DataFrame.") - self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER) + sync_manager = SyncManager(sheet_handler, d365_file_path) + sync_manager.run_sync() else: - header = all_gsheet_data[0] - # Stelle sicher, dass die Spaltenanzahl konsistent ist - max_cols = len(header) - data_rows = [row[:max_cols] for row in all_gsheet_data[1:]] - self.gsheet_df = pd.DataFrame(data_rows, columns=header) - self.gsheet_df.fillna('', inplace=True) + # Bisherige Initialisierung für alle anderen Modi + wiki_scraper = WikipediaScraper() + data_processor = DataProcessor(sheet_handler=sheet_handler, wiki_scraper=wiki_scraper) - if 'CRM ID' not in self.gsheet_df.columns: - self.logger.critical("Spalte 'CRM ID' nicht im Google Sheet gefunden. Abgleich nicht möglich.") - return False + # --- Modul-Versionen loggen --- + modules_to_log = { + "DataProcessor": data_processor, + "GoogleSheetHandler": google_sheet_handler, + "WikipediaScraper": wikipedia_scraper, + "Helpers": helpers + } + log_module_versions(modules_to_log) + + if not data_processor.setup(): + logger.critical("Setup des DataProcessors fehlgeschlagen. Das Skript wird beendet.") + return - self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip() + # --- Modus-Dispatching --- + start_time = time.time() + + steps_to_run_set = set(step.strip().lower() for step in args.steps.split(',') if step.strip() in valid_steps) if args.steps else set(valid_steps) - self.logger.info(f"{len(self.d365_df)} Datensätze aus D365 geladen, {len(self.gsheet_df)} im Google Sheet vorhanden.") - return True + if selected_mode == "full_run": + start_row = args.start_sheet_row or sheet_handler.get_start_row_index("Timestamp letzte Pruefung") + sheet_handler._header_rows + 1 + num_to_process = args.limit or (len(sheet_handler.get_all_data_with_headers()) - start_row + 1) + data_processor.process_rows_sequentially( + start_sheet_row=start_row, num_to_process=num_to_process, + process_wiki_steps='wiki' in steps_to_run_set, + process_chatgpt_steps='chat' in steps_to_run_set, + process_website_steps='web' in steps_to_run_set, + process_ml_steps='ml_predict' in steps_to_run_set + ) + elif selected_mode == "reeval": + data_processor.process_reevaluation_rows( + row_limit=args.limit, clear_flag=True, + process_wiki_steps='wiki' in steps_to_run_set, + process_chatgpt_steps='chat' in steps_to_run_set, + process_website_steps='web' in steps_to_run_set, + process_ml_steps='ml_predict' in steps_to_run_set + ) + # ... (alle anderen elif-Blöcke bleiben wie sie sind) ... + elif selected_mode == "reclassify_branches": + data_processor.reclassify_all_branches( + start_sheet_row=args.start_sheet_row, + limit=args.limit + ) + elif selected_mode == "alignment": + alignment_demo(sheet_handler) + elif selected_mode == "train_technician_model": + data_processor.train_technician_model() + elif selected_mode == "predict_technicians": + data_processor.process_predict_technicians( + start_sheet_row=args.start_sheet_row, + limit=args.limit + ) + elif hasattr(data_processor, f"process_{selected_mode}"): + method_to_call = getattr(data_processor, f"process_{selected_mode}") + method_args = {} + if "limit" in method_to_call.__code__.co_varnames: method_args["limit"] = args.limit + if "start_sheet_row" in method_to_call.__code__.co_varnames: method_args["start_sheet_row"] = args.start_sheet_row + if "end_sheet_row" in method_to_call.__code__.co_varnames: method_args["end_sheet_row"] = args.end_sheet_row + if "min_umsatz" in method_to_call.__code__.co_varnames: method_args["min_umsatz"] = args.min_umsatz + if "min_employees" in method_to_call.__code__.co_varnames: method_args["min_employees"] = args.min_employees + method_to_call(**method_args) + elif hasattr(data_processor, f"run_{selected_mode}"): + method_to_call = getattr(data_processor, f"run_{selected_mode}") + method_to_call(start_sheet_row=args.start_sheet_row, end_sheet_row=args.end_sheet_row, limit=args.limit) + else: + logger.error(f"Unbekannter Modus '{selected_mode}' im Dispatcher.") - def run_sync(self): - """Führt den gesamten Synchronisationsprozess aus.""" - print("DEBUG: run_sync() wurde aufgerufen. Versuche jetzt, Daten zu laden.") # <<< HINZUGEFÜGT - if not self._load_data(): - print("DEBUG: _load_data() ist fehlgeschlagen oder hat False zurückgegeben. Breche ab.") # <<< HINZUGEFÜGT - return + duration = time.time() - start_time + logger.info(f"Verarbeitung im Modus '{selected_mode}' abgeschlossen. Dauer: {duration:.2f} Sekunden.") - print("DEBUG: _load_data() war erfolgreich. Beginne mit der Analyse der IDs.") # <<< HINZUGEFÜGT - d365_ids = set(self.d365_df[self.d365_df['CRM ID'] != '']['CRM ID']) - gsheet_ids = set(self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID']) - - new_ids = d365_ids - gsheet_ids - deleted_ids = gsheet_ids - d365_ids - existing_ids = d365_ids.intersection(gsheet_ids) - - self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} archivierte, {len(existing_ids)} bestehende Accounts.") - print(f"DEBUG: Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} archivierte, {len(existing_ids)} bestehende Accounts.") # <<< HINZUGEFÜGT - - updates_to_batch = [] - rows_to_append = [] - - # 1. Neue Accounts verarbeiten - if new_ids: - new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)] - for _, row in new_accounts_df.iterrows(): - new_row = [""] * len(COLUMN_ORDER) - for d365_col, gsheet_col in self.d365_to_gsheet_map.items(): - if d365_col in row: - col_idx = COLUMN_MAP[gsheet_col]['index'] - new_row[col_idx] = row[d365_col] - rows_to_append.append(new_row) - - # 2. Gelöschte/Archivierte Accounts verarbeiten - if deleted_ids: - for crm_id in deleted_ids: - row_idx_list = self.gsheet_df[self.gsheet_df['CRM ID'] == crm_id].index.tolist() - if row_idx_list: - row_idx = row_idx_list[0] - updates_to_batch.append({ - "range": f"{COLUMN_MAP['Archiviert']['Titel']}{row_idx + 2}", # +2 weil 1-basiert und Header - "values": [["TRUE"]] - }) - - # 3. Bestehende Accounts intelligent mergen - if existing_ids: - # Setze 'CRM ID' als Index für schnellen Zugriff - if self.d365_df.index.name != 'CRM ID': - self.d365_df.set_index('CRM ID', inplace=True) - if self.gsheet_df.index.name != 'CRM ID': - self.gsheet_df.set_index('CRM ID', inplace=True) - - - for crm_id in existing_ids: - d365_row = self.d365_df.loc[crm_id] - gsheet_row = self.gsheet_df.loc[crm_id] - - row_updates = {} - conflict_messages = [] - needs_reeval = False - - # Strategie 1: D365 gewinnt immer - for gsheet_col in self.d365_wins_cols: - d365_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == gsheet_col), None) - if d365_col and str(d365_row[d365_col]) != str(gsheet_row[gsheet_col]): - row_updates[gsheet_col] = str(d365_row[d365_col]) - needs_reeval = True - - # Strategie 2: Smart-Merge - for gsheet_col in self.smart_merge_cols: - d365_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == gsheet_col), None) - d365_val = str(d365_row.get(d365_col, '')) - gsheet_val = str(gsheet_row.get(gsheet_col, '')) - - if d365_val and not gsheet_val: - row_updates[gsheet_col] = d365_val - needs_reeval = True - elif d365_val and gsheet_val and d365_val != gsheet_val: - conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'") - - # Updates und Flags zusammenstellen - if conflict_messages: - row_updates["SyncConflict"] = "; ".join(conflict_messages) - - if needs_reeval: - row_updates["ReEval Flag"] = "x" - - # Batch-Update-Objekte erstellen - if row_updates: - row_idx_list = self.gsheet_df.index.get_loc(crm_id) - # Sicherstellen, dass row_idx ein Integer ist, kein Slice oder Array - row_idx = row_idx_list if isinstance(row_idx_list, int) else row_idx_list.start - - for col_name, value in row_updates.items(): - updates_to_batch.append({ - "range": f"{COLUMN_MAP[col_name]['Titel']}{row_idx + 2}", - "values": [[value]] - }) - - # 4. Änderungen ins Google Sheet schreiben - if rows_to_append: - self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet in '{self.target_sheet_name}' hinzu...") - self.sheet_handler.append_rows(values=rows_to_append, sheet_name=self.target_sheet_name) - - if updates_to_batch: - self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...") - self.sheet_handler.batch_update_cells(updates=updates_to_batch) - - if not rows_to_append and not updates_to_batch: - print("DEBUG: Keine neuen Zeilen oder Updates zum Senden gefunden.") # <<< HINZUGEFÜGT - - print("DEBUG: run_sync() am Ende angelangt.") # <<< HINZUGEFÜGT - self.logger.info("Synchronisation erfolgreich abgeschlossen.") + except (KeyboardInterrupt, EOFError): + logger.warning("Skript durch Benutzer unterbrochen.") + print("\n! Skript wurde manuell beendet.") + except Exception as e: + logger.critical(f"FATAL: Unerwarteter Fehler im Hauptprozess: {e}", exc_info=True) + print(f"\n! Ein kritischer Fehler ist aufgetreten: {e}") + if 'log_file_path' in locals() and log_file_path: + print(f"Bitte pruefen Sie die Logdatei fuer Details: {log_file_path}") + finally: + logger.info(f"===== Skript beendet =====") + logging.shutdown() + if 'selected_mode' in locals() and selected_mode != 'exit' and 'log_file_path' in locals() and log_file_path: + print(f"\nVerarbeitung abgeschlossen. Logfile: {log_file_path}") \ No newline at end of file