#!/usr/bin/env python3 """ sync_manager.py Modul für den Datenabgleich zwischen einem D365 Excel-Export und dem Google Sheet. Führt einen intelligenten "Full-Sync" durch, um neue, geänderte und gelöschte Datensätze zu identifizieren und zu verarbeiten. Enthält Logik für: - Smart-Merging von Feldern (z.B. Website). - Automatisches Setzen des Re-Eval-Flags bei Stammdatenänderungen. - Markieren von archivierten Datensätzen. - Protokollieren von Datenkonflikten. """ import pandas as pd import logging from datetime import datetime # Importiere die benötigten Konfigurationen from config import COLUMN_ORDER, COLUMN_MAP class SyncManager: """ Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet. """ def __init__(self, sheet_handler, d365_export_path): """ Initialisiert den SyncManager. Args: sheet_handler: Eine instanziierte GoogleSheetHandler-Klasse. d365_export_path (str): Der Dateipfad zur D365 Excel-Exportdatei. """ print("DEBUG: SyncManager wird initialisiert (__init__ wird ausgeführt).") # <<< HINZUGEFÜGT self.sheet_handler = sheet_handler self.d365_export_path = d365_export_path self.logger = logging.getLogger(__name__) # Definiert, welche D365-Spalten welchen GSheet-Spalten entsprechen self.d365_to_gsheet_map = { "Account Name": "CRM Name", "Parent Account": "Parent Account Name", "Website": "CRM Website", "City": "CRM Ort", "Country": "CRM Land", "Description FSM": "CRM Beschreibung", "Branch detail": "CRM Branche", "No. Service Technicians": "CRM Anzahl Techniker", "Annual Revenue (Mio. €)": "CRM Umsatz", "Number of Employees": "CRM Anzahl Mitarbeiter", "GUID": "CRM ID" # <<< KORRIGIERT: Passt jetzt zu deiner Export-Datei } # Definiert die Merge-Strategien für GSheet-Spalten self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land", "CRM Beschreibung", "CRM Branche", "CRM Anzahl Techniker", "CRM Umsatz", "CRM Anzahl Mitarbeiter"] self.smart_merge_cols = ["CRM Website"] def _load_data(self): """Lädt Daten aus der D365-Exportdatei und dem Google Sheet.""" self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...") try: # Lade alle Daten als String, um Formatierungsfehler (besonders bei GUIDs) zu vermeiden self.d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('') # Finde den D365-Spaltennamen für unsere 'CRM ID' d365_guid_col_name = next((k for k, v in self.d365_to_gsheet_map.items() if v == "CRM ID"), None) if d365_guid_col_name and d365_guid_col_name in self.d365_df.columns: # Umbenennen der Spalte für die interne Verarbeitung self.d365_df.rename(columns={d365_guid_col_name: "CRM ID"}, inplace=True) else: self.logger.critical(f"Die erwartete GUID-Spalte '{d365_guid_col_name}' wurde nicht in der D365-Exportdatei gefunden.") raise ValueError("GUID-Spalte nicht in der D365-Exportdatei gefunden.") self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip() except FileNotFoundError: self.logger.critical(f"FEHLER: D365-Exportdatei nicht gefunden unter: {self.d365_export_path}") return False except Exception as e: self.logger.critical(f"FEHLER beim Lesen der Excel-Datei: {e}", exc_info=True) return False self.logger.info("Lade bestehende Daten aus dem Google Sheet...") # KORREKTUR: Verwende die existierende Methode und erstelle den DataFrame manuell all_gsheet_data = self.sheet_handler.get_all_data_with_headers() if not all_gsheet_data or len(all_gsheet_data) < 2: self.logger.warning("Google Sheet ist leer oder enthält nur eine Header-Zeile. Erstelle leeren DataFrame.") self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER) else: header = all_gsheet_data[0] # Stelle sicher, dass die Spaltenanzahl konsistent ist max_cols = len(header) data_rows = [row[:max_cols] for row in all_gsheet_data[1:]] self.gsheet_df = pd.DataFrame(data_rows, columns=header) self.gsheet_df.fillna('', inplace=True) if 'CRM ID' not in self.gsheet_df.columns: self.logger.critical("Spalte 'CRM ID' nicht im Google Sheet gefunden. Abgleich nicht möglich.") return False self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip() self.logger.info(f"{len(self.d365_df)} Datensätze aus D365 geladen, {len(self.gsheet_df)} im Google Sheet vorhanden.") return True def run_sync(self): """Führt den gesamten Synchronisationsprozess aus.""" print("DEBUG: run_sync() wurde aufgerufen. Versuche jetzt, Daten zu laden.") # <<< HINZUGEFÜGT if not self._load_data(): print("DEBUG: _load_data() ist fehlgeschlagen oder hat False zurückgegeben. Breche ab.") # <<< HINZUGEFÜGT return print("DEBUG: _load_data() war erfolgreich. Beginne mit der Analyse der IDs.") # <<< HINZUGEFÜGT d365_ids = set(self.d365_df[self.d365_df['CRM ID'] != '']['CRM ID']) gsheet_ids = set(self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID']) new_ids = d365_ids - gsheet_ids deleted_ids = gsheet_ids - d365_ids existing_ids = d365_ids.intersection(gsheet_ids) self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} archivierte, {len(existing_ids)} bestehende Accounts.") print(f"DEBUG: Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} archivierte, {len(existing_ids)} bestehende Accounts.") # <<< HINZUGEFÜGT updates_to_batch = [] rows_to_append = [] # 1. Neue Accounts verarbeiten if new_ids: new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)] for _, row in new_accounts_df.iterrows(): new_row = [""] * len(COLUMN_ORDER) for d365_col, gsheet_col in self.d365_to_gsheet_map.items(): if d365_col in row: col_idx = COLUMN_MAP[gsheet_col]['index'] new_row[col_idx] = row[d365_col] rows_to_append.append(new_row) # 2. Gelöschte/Archivierte Accounts verarbeiten if deleted_ids: for crm_id in deleted_ids: row_idx_list = self.gsheet_df[self.gsheet_df['CRM ID'] == crm_id].index.tolist() if row_idx_list: row_idx = row_idx_list[0] updates_to_batch.append({ "range": f"{COLUMN_MAP['Archiviert']['Titel']}{row_idx + 2}", # +2 weil 1-basiert und Header "values": [["TRUE"]] }) # 3. Bestehende Accounts intelligent mergen if existing_ids: # Setze 'CRM ID' als Index für schnellen Zugriff if self.d365_df.index.name != 'CRM ID': self.d365_df.set_index('CRM ID', inplace=True) if self.gsheet_df.index.name != 'CRM ID': self.gsheet_df.set_index('CRM ID', inplace=True) for crm_id in existing_ids: d365_row = self.d365_df.loc[crm_id] gsheet_row = self.gsheet_df.loc[crm_id] row_updates = {} conflict_messages = [] needs_reeval = False # Strategie 1: D365 gewinnt immer for gsheet_col in self.d365_wins_cols: d365_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == gsheet_col), None) if d365_col and str(d365_row[d365_col]) != str(gsheet_row[gsheet_col]): row_updates[gsheet_col] = str(d365_row[d365_col]) needs_reeval = True # Strategie 2: Smart-Merge for gsheet_col in self.smart_merge_cols: d365_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == gsheet_col), None) d365_val = str(d365_row.get(d365_col, '')) gsheet_val = str(gsheet_row.get(gsheet_col, '')) if d365_val and not gsheet_val: row_updates[gsheet_col] = d365_val needs_reeval = True elif d365_val and gsheet_val and d365_val != gsheet_val: conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'") # Updates und Flags zusammenstellen if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages) if needs_reeval: row_updates["ReEval Flag"] = "x" # Batch-Update-Objekte erstellen if row_updates: row_idx_list = self.gsheet_df.index.get_loc(crm_id) # Sicherstellen, dass row_idx ein Integer ist, kein Slice oder Array row_idx = row_idx_list if isinstance(row_idx_list, int) else row_idx_list.start for col_name, value in row_updates.items(): updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{row_idx + 2}", "values": [[value]] }) # 4. Änderungen ins Google Sheet schreiben if rows_to_append: self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...") self.sheet_handler.append_rows(values=rows_to_append) if updates_to_batch: self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...") self.sheet_handler.batch_update_cells(updates=updates_to_batch) if not rows_to_append and not updates_to_batch: print("DEBUG: Keine neuen Zeilen oder Updates zum Senden gefunden.") # <<< HINZUGEFÜGT print("DEBUG: run_sync() am Ende angelangt.") # <<< HINZUGEFÜGT self.logger.info("Synchronisation erfolgreich abgeschlossen.")