diff --git a/sync_manager.py b/sync_manager.py index 79a94d47..16b1cb25 100644 --- a/sync_manager.py +++ b/sync_manager.py @@ -1,206 +1,201 @@ -def main(): +#!/usr/bin/env python3 +""" +sync_manager.py + +Modul für den Datenabgleich zwischen einem D365 Excel-Export und dem Google Sheet. +Führt einen intelligenten "Full-Sync" durch, um neue, geänderte und +gelöschte Datensätze zu identifizieren und zu verarbeiten. + +Enthält Logik für: +- Smart-Merging von Feldern (z.B. Website). +- Automatisches Setzen des Re-Eval-Flags bei Stammdatenänderungen. +- Markieren von archivierten Datensätzen. +- Protokollieren von Datenkonflikten. +""" + +import pandas as pd +import logging +from datetime import datetime + +# Importiere die benötigten Konfigurationen +from config import COLUMN_ORDER, COLUMN_MAP + +class SyncManager: """ - Haupteinstiegspunkt des Skripts. - Verarbeitet Kommandozeilen-Argumente, richtet Logging ein, - initialisiert Komponenten und dispatchet zu den passenden Modi. + Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet. """ - # --- Importe innerhalb der Funktion, um Abhängigkeiten klar zu halten --- - import argparse - import time - import logging - import os # <<< NEU: für Dateipfad-Prüfung - from config import Config, log_module_versions, create_log_filename - from google_sheet_handler import GoogleSheetHandler - from wikipedia_scraper import WikipediaScraper - from data_processor import DataProcessor - from sync_manager import SyncManager # <<< NEU: SyncManager importieren - import helpers - import google_sheet_handler # Für Version-Logging + def __init__(self, sheet_handler, d365_export_path): + """ + Initialisiert den SyncManager. - # --- Argument Parser --- - parser = argparse.ArgumentParser( - description=f"Firmen-Datenanreicherungs-Skript {Config.VERSION}.", - formatter_class=argparse.RawTextHelpFormatter - ) - mode_categories = { - "Daten-Synchronisation": ["sync"], # <<< NEU: Eigene Kategorie für den Sync - "Batch-Verarbeitung": ["wiki_verify", "website_scraping", "summarize_website", "branch_eval", "suggest_parents", "fsm_pitch"], - "Sequentielle Verarbeitung": ["full_run"], - "Re-Evaluation": ["reeval"], - "Dienstprogramme": ["find_wiki_serp", "check_urls", "contacts", "update_wiki_suggestions", "wiki_reextract_missing_an", "website_details", "train_technician_model", "predict_technicians", "alignment", "reparatur_sitz", "plausi_check_data"], - "Kombinierte Läufe": ["combined_all"], - "Spezial-Modi": ["reclassify_branches"], - } - valid_modes = [mode for modes in mode_categories.values() for mode in modes] - mode_help_text = "Betriebsmodus. Waehlen Sie einen der folgenden:\n" - for category, modes in mode_categories.items(): - mode_help_text += f"\n{category}:\n" + "".join([f" - {mode}\n" for mode in modes]) + Args: + sheet_handler: Eine instanziierte GoogleSheetHandler-Klasse. + d365_export_path (str): Der Dateipfad zur D365 Excel-Exportdatei. + """ + self.sheet_handler = sheet_handler + self.d365_export_path = d365_export_path + self.logger = logging.getLogger(__name__) - parser.add_argument("--mode", type=str, help=mode_help_text) - parser.add_argument("--limit", type=int, help="Maximale Anzahl zu verarbeitender Zeilen.", default=None) - parser.add_argument("--start_sheet_row", type=int, help="Startzeile im Sheet (1-basiert).", default=None) - parser.add_argument("--end_sheet_row", type=int, help="Endzeile im Sheet (1-basiert).", default=None) - - valid_steps = ['wiki', 'chat', 'web', 'ml_predict'] - parser.add_argument("--steps", type=str, help=f"Schritte für 'reeval'/'full_run' (z.B. 'wiki,chat'). Optionen: {', '.join(valid_steps)}.", default=','.join(valid_steps)) - parser.add_argument("--min_umsatz", type=float, help="Mindestumsatz in MIO € für 'find_wiki_serp'.", default=200.0) - parser.add_argument("--min_employees", type=int, help="Mindest-MA für 'find_wiki_serp'.", default=500) - - # <<< NEU: Argument für den Pfad der Sync-Datei - parser.add_argument("--sync_file", type=str, help="Pfad zur D365 Excel-Exportdatei für den 'sync'-Modus.", default="d365_export.xlsx") + # Definiert, welche D365-Spalten welchen GSheet-Spalten entsprechen. + # Dies ist das zentrale Mapping, das auf deiner D365-View basiert. + self.d365_to_gsheet_map = { + "Account Name": "CRM Name", + "Parent Account": "Parent Account Name", + "Website": "CRM Website", + "City": "CRM Ort", + "Country": "CRM Land", + "Description FSM": "CRM Beschreibung", + "Branch detail": "CRM Branche", + "No. Service Technicians": "CRM Anzahl Techniker", + "Annual Revenue (Mio. €)": "CRM Umsatz", + "Number of Employees": "CRM Anzahl Mitarbeiter", + # Dies ist die wichtigste Zeile! Annahme: Die GUID-Spalte im Export heißt 'Account'. + # Falls sie anders heißt (z.B. 'Account ID'), muss nur dieser String angepasst werden. + "Account": "CRM ID" + } - args = parser.parse_args() + # Definiert die Merge-Strategien für GSheet-Spalten + self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land", + "CRM Beschreibung", "CRM Branche", "CRM Anzahl Techniker", + "CRM Umsatz", "CRM Anzahl Mitarbeiter"] + self.smart_merge_cols = ["CRM Website"] - # --- Modusauswahl (interaktiv, wenn nicht über CLI) --- - selected_mode = args.mode.lower() if args.mode else None - if not selected_mode: - print("\nBitte waehlen Sie den Betriebsmodus:") - mode_map = {} - counter = 1 - for category, modes in mode_categories.items(): - print(f"\n{category}:") - for mode in modes: - print(f" {counter}: {mode}") - mode_map[str(counter)] = mode - mode_map[mode] = mode - counter += 1 - print("\n 0: Abbrechen") - mode_map['0'] = 'exit' - - while selected_mode is None: - try: - choice = input("Geben Sie den Modusnamen oder die Zahl ein: ").strip().lower() - if choice in mode_map: - selected_mode = mode_map[choice] - if selected_mode == 'exit': - print("Abgebrochen.") - return - else: - print("Ungueltige Eingabe.") - except (EOFError, KeyboardInterrupt): - print("\nAbgebrochen.") - return - - # --- Logging Konfiguration --- - # Definiere hier die Logging-Konstanten, falls sie nicht global sind - LOG_LEVEL = logging.DEBUG if Config.DEBUG else logging.INFO - LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)-25s - %(message)s' - logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT) - logger = logging.getLogger(__name__) - - # --- Logdatei-Konfiguration abschließen --- - log_file_path = create_log_filename(selected_mode) - if log_file_path: - file_handler = logging.FileHandler(log_file_path, mode='a', encoding='utf-8') - file_handler.setLevel(LOG_LEVEL) - file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) - logging.getLogger('').addHandler(file_handler) - - logger.info(f"===== Skript gestartet: Modus '{selected_mode}' =====") - logger.info(f"Projekt-Version (Config): {Config.VERSION}") - logger.info(f"Logdatei: {log_file_path or 'FEHLER - Keine Logdatei'}") - logger.info(f"CLI Argumente: {args}") - - # --- Hauptlogik --- - try: - # --- Vorbereitung & Initialisierung --- - Config.load_api_keys() - - sheet_handler = GoogleSheetHandler() - - # <<< NEU: Früher Ausstieg für den Sync-Modus, da er keine Scraper/Prozessoren braucht - if selected_mode == "sync": - d365_file_path = args.sync_file - if not os.path.exists(d365_file_path): - logger.critical(f"Export-Datei nicht gefunden: {d365_file_path}") - print(f"\n! FEHLER: Die angegebene Sync-Datei wurde nicht gefunden: {d365_file_path}") + def _load_data(self): + """Lädt Daten aus der D365-Exportdatei und dem Google Sheet.""" + self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...") + try: + # Lade alle Daten als String, um Formatierungsfehler (besonders bei GUIDs) zu vermeiden + self.d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('') + + # Finde den Spaltennamen für die GUID in der Export-Datei basierend auf unserem Mapping + d365_guid_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == "CRM ID"), None) + + if d365_guid_col and d365_guid_col in self.d365_df.columns: + self.d365_df.rename(columns={d365_guid_col: "CRM ID"}, inplace=True) else: - sync_manager = SyncManager(sheet_handler, d365_file_path) - sync_manager.run_sync() - else: - # Bisherige Initialisierung für alle anderen Modi - wiki_scraper = WikipediaScraper() - data_processor = DataProcessor(sheet_handler=sheet_handler, wiki_scraper=wiki_scraper) + self.logger.critical(f"FEHLER: Die erwartete GUID-Spalte '{d365_guid_col}' wurde in der D365-Exportdatei nicht gefunden.") + raise ValueError(f"GUID-Spalte ('{d365_guid_col}') nicht in der D365-Exportdatei gefunden.") - # --- Modul-Versionen loggen --- - modules_to_log = { - "DataProcessor": data_processor, - "GoogleSheetHandler": google_sheet_handler, - "WikipediaScraper": wikipedia_scraper, - "Helpers": helpers - } - log_module_versions(modules_to_log) + # Bereinige CRM IDs und entferne Zeilen ohne gültige ID + self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip() + self.d365_df.dropna(subset=['CRM ID'], inplace=True) + self.d365_df = self.d365_df[self.d365_df['CRM ID'] != ''] + + except FileNotFoundError: + self.logger.critical(f"FEHLER: D365-Exportdatei nicht gefunden unter: {self.d365_export_path}") + return False + except Exception as e: + self.logger.critical(f"Ein unerwarteter Fehler ist beim Laden der Excel-Datei aufgetreten: {e}", exc_info=True) + return False + + self.logger.info("Lade bestehende Daten aus dem Google Sheet...") + self.gsheet_df = self.sheet_handler.get_all_data_as_dataframe() + if self.gsheet_df is None: + self.logger.error("Konnte keine Daten aus dem Google Sheet laden.") + return False - if not data_processor.setup(): - logger.critical("Setup des DataProcessors fehlgeschlagen. Das Skript wird beendet.") - return + self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip() - # --- Modus-Dispatching --- - start_time = time.time() - - steps_to_run_set = set(step.strip().lower() for step in args.steps.split(',') if step.strip() in valid_steps) if args.steps else set(valid_steps) + self.logger.info(f"{len(self.d365_df)} gültige Datensätze aus D365 geladen, {len(self.gsheet_df)} im Google Sheet vorhanden.") + return True - if selected_mode == "full_run": - start_row = args.start_sheet_row or sheet_handler.get_start_row_index("Timestamp letzte Pruefung") + sheet_handler._header_rows + 1 - num_to_process = args.limit or (len(sheet_handler.get_all_data_with_headers()) - start_row + 1) - data_processor.process_rows_sequentially( - start_sheet_row=start_row, num_to_process=num_to_process, - process_wiki_steps='wiki' in steps_to_run_set, - process_chatgpt_steps='chat' in steps_to_run_set, - process_website_steps='web' in steps_to_run_set, - process_ml_steps='ml_predict' in steps_to_run_set - ) - elif selected_mode == "reeval": - data_processor.process_reevaluation_rows( - row_limit=args.limit, clear_flag=True, - process_wiki_steps='wiki' in steps_to_run_set, - process_chatgpt_steps='chat' in steps_to_run_set, - process_website_steps='web' in steps_to_run_set, - process_ml_steps='ml_predict' in steps_to_run_set - ) - # ... (alle anderen elif-Blöcke bleiben wie sie sind) ... - elif selected_mode == "reclassify_branches": - data_processor.reclassify_all_branches( - start_sheet_row=args.start_sheet_row, - limit=args.limit - ) - elif selected_mode == "alignment": - alignment_demo(sheet_handler) - elif selected_mode == "train_technician_model": - data_processor.train_technician_model() - elif selected_mode == "predict_technicians": - data_processor.process_predict_technicians( - start_sheet_row=args.start_sheet_row, - limit=args.limit - ) - elif hasattr(data_processor, f"process_{selected_mode}"): - method_to_call = getattr(data_processor, f"process_{selected_mode}") - method_args = {} - if "limit" in method_to_call.__code__.co_varnames: method_args["limit"] = args.limit - if "start_sheet_row" in method_to_call.__code__.co_varnames: method_args["start_sheet_row"] = args.start_sheet_row - if "end_sheet_row" in method_to_call.__code__.co_varnames: method_args["end_sheet_row"] = args.end_sheet_row - if "min_umsatz" in method_to_call.__code__.co_varnames: method_args["min_umsatz"] = args.min_umsatz - if "min_employees" in method_to_call.__code__.co_varnames: method_args["min_employees"] = args.min_employees - method_to_call(**method_args) - elif hasattr(data_processor, f"run_{selected_mode}"): - method_to_call = getattr(data_processor, f"run_{selected_mode}") - method_to_call(start_sheet_row=args.start_sheet_row, end_sheet_row=args.end_sheet_row, limit=args.limit) - else: - logger.error(f"Unbekannter Modus '{selected_mode}' im Dispatcher.") + def run_sync(self): + """Führt den gesamten Synchronisationsprozess aus.""" + if not self._load_data(): + return - duration = time.time() - start_time - logger.info(f"Verarbeitung im Modus '{selected_mode}' abgeschlossen. Dauer: {duration:.2f} Sekunden.") + d365_ids = set(self.d365_df['CRM ID']) + gsheet_ids = set(self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID']) - except (KeyboardInterrupt, EOFError): - logger.warning("Skript durch Benutzer unterbrochen.") - print("\n! Skript wurde manuell beendet.") - except Exception as e: - logger.critical(f"FATAL: Unerwarteter Fehler im Hauptprozess: {e}", exc_info=True) - print(f"\n! Ein kritischer Fehler ist aufgetreten: {e}") - if 'log_file_path' in locals() and log_file_path: - print(f"Bitte pruefen Sie die Logdatei fuer Details: {log_file_path}") - finally: - logger.info(f"===== Skript beendet =====") - logging.shutdown() - if 'selected_mode' in locals() and selected_mode != 'exit' and 'log_file_path' in locals() and log_file_path: - print(f"\nVerarbeitung abgeschlossen. Logfile: {log_file_path}") \ No newline at end of file + new_ids = d365_ids - gsheet_ids + deleted_ids = gsheet_ids - d365_ids + existing_ids = d365_ids.intersection(gsheet_ids) + + self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.") + + updates_to_batch = [] + rows_to_append = [] + + # 1. Neue Accounts verarbeiten + if new_ids: + new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)] + for _, row in new_accounts_df.iterrows(): + new_row_data = [""] * len(COLUMN_ORDER) + for d365_col, gsheet_col in self.d365_to_gsheet_map.items(): + if d365_col in row: + col_idx = COLUMN_MAP[gsheet_col]['index'] + new_row_data[col_idx] = row[d365_col] + rows_to_append.append(new_row_data) + + # 2. Gelöschte/Archivierte Accounts verarbeiten + if deleted_ids: + for crm_id in deleted_ids: + row_indices = self.gsheet_df[self.gsheet_df['CRM ID'] == crm_id].index + if not row_indices.empty: + row_idx = row_indices[0] + updates_to_batch.append({ + "range": f"{COLUMN_MAP['Archiviert']['Titel']}{row_idx + 2}", + "values": [["TRUE"]] + }) + + # 3. Bestehende Accounts intelligent mergen + if existing_ids: + d365_indexed = self.d365_df.set_index('CRM ID') + gsheet_indexed = self.gsheet_df.set_index('CRM ID') + + for crm_id in existing_ids: + d365_row = d365_indexed.loc[crm_id] + gsheet_row = gsheet_indexed.loc[crm_id] + + row_updates = {} + conflict_messages = [] + needs_reeval = False + + # Strategie 1: D365 gewinnt immer + for gsheet_col in self.d365_wins_cols: + d365_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == gsheet_col), None) + if d365_col and d365_col in d365_row and str(d365_row[d365_col]) != str(gsheet_row[gsheet_col]): + row_updates[gsheet_col] = str(d365_row[d365_col]) + needs_reeval = True + + # Strategie 2: Smart-Merge für spezielle Spalten + for gsheet_col in self.smart_merge_cols: + d365_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == gsheet_col), None) + d365_val = str(d365_row.get(d365_col, '')) + gsheet_val = str(gsheet_row.get(gsheet_col, '')) + + if d365_val and not gsheet_val: + row_updates[gsheet_col] = d365_val + needs_reeval = True + elif d365_val and gsheet_val and d365_val != gsheet_val: + conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'") + + # Updates und Flags zusammenstellen + if conflict_messages: + row_updates["SyncConflict"] = "; ".join(conflict_messages) + + if needs_reeval: + row_updates["ReEval Flag"] = "x" + + # Batch-Update-Objekte für die aktuelle Zeile erstellen + if row_updates: + row_idx = gsheet_indexed.index.get_loc(crm_id) + for col_name, value in row_updates.items(): + updates_to_batch.append({ + "range": f"{COLUMN_MAP[col_name]['Titel']}{row_idx + 2}", + "values": [[value]] + }) + + # 4. Änderungen ins Google Sheet schreiben + if rows_to_append: + self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...") + self.sheet_handler.append_rows(rows_to_append) + + if updates_to_batch: + self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...") + self.sheet_handler.batch_update_cells(updates_to_batch) + + if not rows_to_append and not updates_to_batch: + self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.") + + self.logger.info("Synchronisation erfolgreich abgeschlossen.") \ No newline at end of file