#!/usr/bin/env python3 """ sync_manager.py Modul für den Datenabgleich zwischen einem D365 Excel-Export und dem Google Sheet. Führt einen intelligenten "Full-Sync" durch, um neue, geänderte und gelöschte Datensätze zu identifizieren und zu verarbeiten. """ import pandas as pd import logging from datetime import datetime from config import COLUMN_ORDER, COLUMN_MAP class SyncManager: """ Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet. """ def __init__(self, sheet_handler, d365_export_path): self.sheet_handler = sheet_handler self.d365_export_path = d365_export_path self.logger = logging.getLogger(__name__) self.target_sheet_name = None self.d365_to_gsheet_map = { "Account Name": "CRM Name", "Parent Account": "Parent Account Name", "Website": "CRM Website", "City": "CRM Ort", "Country": "CRM Land", "Description FSM": "CRM Beschreibung", "Branch detail": "CRM Branche", "No. Service Technicians": "CRM Anzahl Techniker", "Annual Revenue (Mio. €)": "CRM Umsatz", "Number of Employees": "CRM Anzahl Mitarbeiter", "GUID": "CRM ID" } self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land", "CRM Beschreibung", "CRM Branche", "CRM Anzahl Techniker", "CRM Umsatz", "CRM Anzahl Mitarbeiter"] self.smart_merge_cols = ["CRM Website"] def _load_data(self): """Lädt und bereitet die Daten aus D365 und Google Sheets vor.""" self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...") try: temp_d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('') # --- FINALE, ROBUSTE LOGIK --- # 1. Prüfen, ob alle benötigten Original-Spalten vorhanden sind for d365_col in self.d365_to_gsheet_map.keys(): if d365_col not in temp_d365_df.columns: raise ValueError(f"Erwartete Spalte '{d365_col}' nicht in der D365-Exportdatei gefunden.") # 2. Erstelle ein neues, sauberes DataFrame, das nur die Spalten enthält, die wir brauchen self.d365_df = temp_d365_df[list(self.d365_to_gsheet_map.keys())].copy() # 3. Benenne die Spalten im neuen DataFrame um self.d365_df.rename(columns=self.d365_to_gsheet_map, inplace=True) # 4. Bereinige die GUIDs self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip().str.lower() self.d365_df = self.d365_df[self.d365_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)] except Exception as e: self.logger.critical(f"Fehler beim Laden der Excel-Datei: {e}", exc_info=True) return False self.logger.info("Lade bestehende Daten aus dem Google Sheet...") try: all_data_with_headers = self.sheet_handler.get_all_data_with_headers() if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows: self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER).fillna('') else: actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1] data_rows = all_data_with_headers[self.sheet_handler._header_rows:] temp_df = pd.DataFrame(data_rows) if temp_df.empty: temp_df = pd.DataFrame(columns=actual_header) else: if temp_df.shape[1] > len(actual_header): temp_df = temp_df.iloc[:, :len(actual_header)] temp_df.columns = actual_header temp_df = temp_df.fillna('') for col_name in COLUMN_ORDER: if col_name not in temp_df.columns: temp_df[col_name] = '' self.gsheet_df = temp_df[COLUMN_ORDER] except Exception as e: self.logger.critical(f"Fehler beim Laden/Umwandeln der GSheet-Daten: {e}", exc_info=True) return False self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip().str.lower() initial_row_count = len(self.gsheet_df) self.gsheet_df = self.gsheet_df[self.gsheet_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)] final_row_count = len(self.gsheet_df) if initial_row_count > final_row_count: self.logger.info(f"GSheet-Daten bereinigt: {initial_row_count - final_row_count} Zeilen ohne gültige GUID entfernt.") self.logger.info(f"{len(self.d365_df)} gültige Datensätze aus D365 geladen, {len(self.gsheet_df)} gültige Datensätze im Google Sheet.") return True def run_sync(self): """Führt den gesamten Synchronisationsprozess aus.""" if not self._load_data(): return self.target_sheet_name = self.sheet_handler.get_main_sheet_name() if not self.target_sheet_name: self.logger.critical("Konnte Namen des Ziel-Sheets nicht ermitteln. Abbruch.") return d365_ids = set(self.d365_df['CRM ID'].dropna()) gsheet_ids = set(self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID'].dropna()) new_ids = d365_ids - gsheet_ids deleted_ids = set() self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).") existing_ids = d365_ids.intersection(gsheet_ids) self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.") updates_to_batch, rows_to_append = [], [] if new_ids: new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)] for _, row in new_accounts_df.iterrows(): new_row_data = [""] * len(COLUMN_ORDER) for gsheet_col in self.d365_to_gsheet_map.values(): if gsheet_col in row: col_idx = COLUMN_MAP[gsheet_col]['index'] new_row_data[col_idx] = row[gsheet_col] rows_to_append.append(new_row_data) if existing_ids: d365_indexed = self.d365_df.set_index('CRM ID') gsheet_indexed = self.gsheet_df.set_index('CRM ID') for row_idx, gsheet_row in gsheet_indexed.iterrows(): crm_id = gsheet_row.name if crm_id not in d365_indexed.index: continue d365_row = d365_indexed.loc[crm_id] row_updates, conflict_messages, needs_reeval = {}, [], False for gsheet_col in self.d365_wins_cols: d365_val = str(d365_row[gsheet_col]) gsheet_val = str(gsheet_row[gsheet_col]) # --- NEUE SPEZIAL-LOGIK FÜR LÄNDER --- is_different = False if gsheet_col == 'CRM Land': # Vergleiche den übersetzten Wert, nicht den Code translated_d365_val = Config.COUNTRY_CODE_MAP.get(d365_val.lower(), d365_val) if translated_d365_val.lower() != gsheet_val.lower(): is_different = True else: # Standard-Vergleich für alle anderen Spalten if d365_val != gsheet_val: is_different = True # --- ENDE DER SPEZIAL-LOGIK --- if is_different: row_updates[gsheet_col] = d365_val needs_reeval = True self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'") # (Rest der Funktion bleibt unverändert) for gsheet_col in self.smart_merge_cols: d365_val = str(d365_row.get(gsheet_col, '')) gsheet_val = str(gsheet_row.get(gsheet_col, '')) if d365_val and not gsheet_val: row_updates[gsheet_col] = d365_val needs_reeval = True self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}' (GSheet war leer): D365='{d365_val}'") elif d365_val and gsheet_val and d365_val != gsheet_val: conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'") if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages) if needs_reeval: row_updates["ReEval Flag"] = "x" if row_updates: sheet_row_number = row_idx + self.sheet_handler._header_rows + 1 for col_name, value in row_updates.items(): updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] }) if rows_to_append: self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...") self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append) if updates_to_batch: self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...") self.sheet_handler.batch_update_cells(updates_to_batch) if not rows_to_append and not updates_to_batch: self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.") self.logger.info("Synchronisation erfolgreich abgeschlossen.") def debug_sync(self): """ Führt eine reine Analyse des Sync-Prozesses durch, ohne Daten zu schreiben. Gibt detaillierte Debug-Informationen im Log aus. """ self.logger.info("========== START SYNC-DEBUG-MODUS ==========") if not self._load_data(): self.logger.error("Debug abgebrochen, da das Laden der Daten fehlschlug.") return # 1. Analyse des D365 DataFrames self.logger.info("\n--- D365 DataFrame Analyse ---") d365_ids_series = self.d365_df['CRM ID'].dropna() d365_ids = set(d365_ids_series) self.logger.info(f"Anzahl Zeilen im D365 DataFrame: {len(self.d365_df)}") self.logger.info(f"Anzahl nicht-leerer GUIDs in D365: {len(d365_ids)}") self.logger.info(f"Erste 5 D365 GUIDs:\n{d365_ids_series.head().to_string()}") self.logger.info(f"Letzte 5 D365 GUIDs:\n{d365_ids_series.tail().to_string()}") # 2. Analyse des Google Sheet DataFrames self.logger.info("\n--- Google Sheet DataFrame Analyse ---") gsheet_ids_series = self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID'].dropna() gsheet_ids = set(gsheet_ids_series) self.logger.info(f"Anzahl Zeilen im GSheet DataFrame: {len(self.gsheet_df)}") self.logger.info(f"Anzahl nicht-leerer GUIDs im GSheet: {len(gsheet_ids)}") self.logger.info(f"Erste 5 GSheet GUIDs:\n{gsheet_ids_series.head().to_string()}") self.logger.info(f"Letzte 5 GSheet GUIDs:\n{gsheet_ids_series.tail().to_string()}") # 3. Analyse der Set-Operationen self.logger.info("\n--- Set-Analyse (Vergleich) ---") new_ids = d365_ids - gsheet_ids deleted_ids = gsheet_ids - d365_ids existing_ids = d365_ids.intersection(gsheet_ids) self.logger.info(f"Anzahl neuer IDs (in D365, nicht in GSheet): {len(new_ids)}") self.logger.info(f"Anzahl zu archivierender IDs (in GSheet, nicht in D365): {len(deleted_ids)}") self.logger.info(f"Größe der Schnittmenge (in beiden vorhanden): {len(existing_ids)}") if len(existing_ids) < 90 and len(d365_ids) > 90: self.logger.warning("WARNUNG: Die Schnittmenge ist unerwartet klein. Dies bestätigt den Verdacht eines Matching-Problems!") # Zeige ein paar IDs, die hätten übereinstimmen sollen if gsheet_ids: self.logger.info(f"Beispiel-GUID aus GSheet, die nicht gefunden wurde: {next(iter(gsheet_ids))}") self.logger.info("========== ENDE SYNC-DEBUG-MODUS ==========")