#!/usr/-bin/env python3 """ sync_manager.py Modul für den Datenabgleich zwischen einem D365 Excel-Export und dem Google Sheet. Führt einen intelligenten "Full-Sync" durch, um neue, geänderte und gelöschte Datensätze zu identifizieren und zu verarbeiten. """ import pandas as pd import logging from collections import defaultdict from config import COLUMN_ORDER, COLUMN_MAP, Config class SyncStatistics: """Eine einfache Klasse zum Sammeln von Statistiken während des Sync-Prozesses.""" def __init__(self): self.new_accounts = 0 self.existing_accounts = 0 self.archived_accounts = 0 self.accounts_to_update = set() self.field_updates = defaultdict(int) self.conflict_accounts = set() self.field_conflicts = defaultdict(int) def generate_report(self): report = [ "\n" + "="*50, " Sync-Prozess Abschlussbericht", "="*50, f"| Neue Accounts hinzugefügt: | {self.new_accounts}", f"| Bestehende Accounts analysiert: | {self.existing_accounts}", f"| Accounts für Archivierung markiert:| {self.archived_accounts}", "-"*50, f"| Accounts mit Updates gesamt: | {len(self.accounts_to_update)}", ] if self.field_updates: report.append("| Feld-Updates im Detail:") # Sortiert die Feld-Updates nach Häufigkeit sorted_updates = sorted(self.field_updates.items(), key=lambda item: item[1], reverse=True) for field, count in sorted_updates: report.append(f"| - {field:<25} | {count} mal") else: report.append("| Keine Feld-Updates durchgeführt.") report.append("-" * 50) report.append(f"| Accounts mit Konflikten: | {len(self.conflict_accounts)}") if self.field_conflicts: report.append("| Feld-Konflikte im Detail:") sorted_conflicts = sorted(self.field_conflicts.items(), key=lambda item: item[1], reverse=True) for field, count in sorted_conflicts: report.append(f"| - {field:<25} | {count} mal") else: report.append("| Keine Konflikte festgestellt.") report.append("="*50) return "\n".join(report) class SyncManager: """ Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet. """ def __init__(self, sheet_handler, d365_export_path): self.sheet_handler = sheet_handler self.d365_export_path = d365_export_path self.logger = logging.getLogger(__name__) self.stats = SyncStatistics() self.target_sheet_name = None self.d365_to_gsheet_map = { "Account Name": "CRM Name", "Parent Account": "Parent Account Name", "Website": "CRM Website", "City": "CRM Ort", "Country": "CRM Land", "Description FSM": "CRM Beschreibung", "Branch detail": "CRM Branche", "No. Service Technicians": "CRM Anzahl Techniker", "Annual Revenue (Mio. €)": "CRM Umsatz", "Number of Employees": "CRM Anzahl Mitarbeiter", "GUID": "CRM ID" } self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land", "CRM Anzahl Techniker", "CRM Branche", "CRM Umsatz", "CRM Anzahl Mitarbeiter", "CRM Beschreibung"] self.smart_merge_cols = ["CRM Website"] def _load_data(self): """Lädt und bereitet die Daten aus D365 und Google Sheets vor.""" self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...") try: temp_d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('') for d365_col in self.d365_to_gsheet_map.keys(): if d365_col not in temp_d365_df.columns: raise ValueError(f"Erwartete Spalte '{d365_col}' nicht in der D365-Exportdatei gefunden.") self.d365_df = temp_d365_df[list(self.d365_to_gsheet_map.keys())].copy() self.d365_df.rename(columns=self.d365_to_gsheet_map, inplace=True) self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip().str.lower() self.d365_df = self.d365_df[self.d365_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)] except Exception as e: self.logger.critical(f"Fehler beim Laden der Excel-Datei: {e}", exc_info=True) return False self.logger.info("Lade bestehende Daten aus dem Google Sheet...") try: all_data_with_headers = self.sheet_handler.get_all_data_with_headers() if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows: self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER) else: actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1] data_rows = all_data_with_headers[self.sheet_handler._header_rows:] # --- FINALER DATAFRAME-ERSTELLUNGS-FIX --- # Wir erstellen das DataFrame aus einem Dictionary, das ist am robustesten. data_dict = {header_col: [] for header_col in actual_header} for row in data_rows: for i, header_col in enumerate(actual_header): # Fülle fehlende Werte auf, wenn eine Zeile zu kurz ist value = row[i] if i < len(row) else '' data_dict[header_col].append(str(value)) # Sofort zu String konvertieren temp_df = pd.DataFrame(data_dict) for col_name in COLUMN_ORDER: if col_name not in temp_df.columns: temp_df[col_name] = '' self.gsheet_df = temp_df[COLUMN_ORDER] except Exception as e: self.logger.critical(f"Fehler beim Laden/Umwandeln der GSheet-Daten: {e}", exc_info=True) return False self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip().str.lower() initial_row_count = len(self.gsheet_df) self.gsheet_df = self.gsheet_df[self.gsheet_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)] if initial_row_count > len(self.gsheet_df): self.logger.info(f"GSheet-Daten bereinigt: {initial_row_count - len(self.gsheet_df)} Zeilen ohne gültige GUID entfernt.") self.logger.info(f"{len(self.d365_df)} gültige Datensätze aus D365 geladen, {len(self.gsheet_df)} gültige Datensätze im Google Sheet.") return True def run_sync(self): """Führt den gesamten Synchronisationsprozess aus.""" if not self._load_data(): return self.target_sheet_name = self.sheet_handler.get_main_sheet_name() if not self.target_sheet_name: self.logger.critical("Konnte Namen des Ziel-Sheets nicht ermitteln. Abbruch.") return d365_ids = set(self.d365_df['CRM ID'].dropna()) gsheet_ids = set(self.gsheet_df['CRM ID'].dropna()) new_ids = d365_ids - gsheet_ids deleted_ids = set() self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).") existing_ids = d365_ids.intersection(gsheet_ids) # Statistik befüllen self.stats.new_accounts = len(new_ids) self.stats.archived_accounts = len(deleted_ids) self.stats.existing_accounts = len(existing_ids) self.logger.info(f"Sync-Analyse: {self.stats.new_accounts} neue, {self.stats.archived_accounts} zu archivierende, {self.stats.existing_accounts} bestehende Accounts.") updates_to_batch, rows_to_append = [], [] if new_ids: new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)] for _, row in new_accounts_df.iterrows(): new_row_data = [""] * len(COLUMN_ORDER) for gsheet_col in self.d365_to_gsheet_map.values(): if gsheet_col in row: col_idx = COLUMN_MAP[gsheet_col]['index'] new_row_data[col_idx] = row[gsheet_col] rows_to_append.append(new_row_data) if existing_ids: d365_indexed = self.d365_df.set_index('CRM ID') gsheet_to_update_df = self.gsheet_df[self.gsheet_df['CRM ID'].isin(existing_ids)] for original_row_index, gsheet_row in gsheet_to_update_df.iterrows(): crm_id = gsheet_row['CRM ID'] if crm_id not in d365_indexed.index: continue d365_row = d365_indexed.loc[crm_id] row_updates, conflict_messages, needs_reeval = {}, [], False for gsheet_col in self.d365_wins_cols: d365_val = str(d365_row[gsheet_col]).strip() gsheet_val = str(gsheet_row[gsheet_col]).strip() trigger_update = False if gsheet_col == 'CRM Land': d365_code_lower, gsheet_val_lower = d365_val.lower(), gsheet_val.lower() d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower() if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower: trigger_update = True elif gsheet_col == 'CRM Anzahl Techniker': if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass elif d365_val != gsheet_val: trigger_update = True elif gsheet_col == 'CRM Branche': if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val: trigger_update = True elif gsheet_col == 'CRM Umsatz': if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val: trigger_update = True elif gsheet_col == 'CRM Anzahl Mitarbeiter': if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val: trigger_update = True elif gsheet_col == 'CRM Beschreibung': if gsheet_row['Website Zusammenfassung'] == '' and d365_val != gsheet_val: trigger_update = True else: if d365_val != gsheet_val: trigger_update = True if trigger_update: row_updates[gsheet_col] = d365_val; needs_reeval = True self.logger.debug(f"Update für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'") for gsheet_col in self.smart_merge_cols: d365_val = str(d365_row.get(gsheet_col, '')).strip() gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip() if d365_val and not gsheet_val: row_updates[gsheet_col] = d365_val; needs_reeval = True elif d365_val and gsheet_val and d365_val != gsheet_val: conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'") if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages) self.stats.conflict_accounts.add(crm_id) for msg in conflict_messages: self.stats.field_conflicts[msg.split('_CONFLICT')[0]] += 1 if needs_reeval: row_updates["ReEval Flag"] = "x" if row_updates: self.stats.accounts_to_update.add(crm_id) for field in row_updates.keys(): self.stats.field_updates[field] += 1 sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1 for col_name, value in row_updates.items(): updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] }) if rows_to_append: self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...") self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append) if updates_to_batch: self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...") self.sheet_handler.batch_update_cells(updates_to_batch) # Generiere und logge den Abschlussbericht report = self.stats.generate_report() self.logger.info(report) print(report) self.logger.info("Synchronisation erfolgreich abgeschlossen.") def debug_sync(self, debug_id=None): """ Führt eine Analyse des Sync-Prozesses durch. Ohne debug_id wird eine allgemeine Statistik ausgegeben. Mit debug_id wird eine Tiefenanalyse für einen einzelnen Datensatz durchgeführt. """ self.logger.info("========== START SYNC-DEBUG-MODUS ==========") # Lade die Rohdaten, aber brich die _load_data Funktion noch nicht ab self.logger.info("Lade Rohdaten aus Google Sheet für Tiefenanalyse...") try: all_data_with_headers = self.sheet_handler.get_all_data_with_headers() if not all_data_with_headers: self.logger.error("Debug abgebrochen, Google Sheet ist leer.") return except Exception as e: self.logger.error(f"Debug abgebrochen, Fehler beim Laden der Rohdaten: {e}") return if not debug_id: # Führe den Rest von _load_data aus für die allgemeine Statistik if not self._load_data(): self.logger.error("Debug abgebrochen, da das Laden der Daten fehlschlug.") return self.logger.info("Keine spezifische ID angegeben. Führe allgemeine Statistik-Analyse durch.") d365_ids = set(self.d365_df['CRM ID']) gsheet_ids = set(self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID'].dropna()) self.logger.info("\n--- Set-Analyse (Vergleich) ---") self.logger.info(f"Anzahl neuer IDs: {len(d365_ids - gsheet_ids)}") self.logger.info(f"Anzahl zu archivierender IDs: {len(gsheet_ids - d365_ids)}") self.logger.info(f"Größe der Schnittmenge: {len(d365_ids.intersection(gsheet_ids))}") self.logger.info("========== ENDE SYNC-DEBUG-MODUS ==========") return # --- TIEFENANALYSE FÜR EINE SPEZIFISCHE ID --- self.logger.info(f"\n--- Tiefenanalyse für CRM ID: {debug_id} ---") debug_id_lower = debug_id.lower().strip() # 1. Finde die Roh-Zeile im Google Sheet self.logger.info("\n--- Rohdaten-Analyse aus Google Sheet ---") header = all_data_with_headers[self.sheet_handler._header_rows - 1] crm_id_index = -1 try: # Finde den Index der 'CRM ID' Spalte im Header crm_id_index = header.index("CRM ID") except ValueError: self.logger.error("Spalte 'CRM ID' nicht im Header des Google Sheets gefunden!") found_raw_row = None if crm_id_index != -1: for i, row in enumerate(all_data_with_headers[self.sheet_handler._header_rows:]): # Stelle sicher, dass die Zeile lang genug ist if len(row) > crm_id_index: if str(row[crm_id_index]).lower().strip() == debug_id_lower: found_raw_row = row self.logger.info(f"Roh-Zeile gefunden bei Index {i} (nach Header):") self.logger.info(found_raw_row) break if not found_raw_row: self.logger.warning("ID in den Rohdaten des Google Sheets nicht gefunden.") # 2. Führe jetzt die normale Datenverarbeitung durch, um das DataFrame zu bekommen if not self._load_data(): self.logger.error("Debug abgebrochen, da das Laden der Daten fehlschlug.") return # 3. Analyse der DataFrames (wie gehabt) d365_row = self.d365_df[self.d365_df['CRM ID'] == debug_id_lower] if d365_row.empty: self.logger.warning("ID in D365-Export nicht gefunden.") else: self.logger.info("\nDatensatz aus D365-Export (nach Verarbeitung):") self.logger.info(d365_row.to_dict('records')[0]) gsheet_row = self.gsheet_df[self.gsheet_df['CRM ID'] == debug_id_lower] if gsheet_row.empty: self.logger.warning("ID im Google Sheet DataFrame nicht gefunden (nach Bereinigung).") else: self.logger.info("\nDatensatz aus Google Sheet (nach Verarbeitung zu DataFrame):") self.logger.info(gsheet_row.to_dict('records')[0]) # 4. Direkter Vergleich des kritischen Feldes if not d365_row.empty and not gsheet_row.empty: self.logger.info("\n--- Direkter Feld-Vergleich: CRM Anzahl Techniker ---") d365_val = d365_row.iloc[0]['CRM Anzahl Techniker'] gsheet_val = gsheet_row.iloc[0]['CRM Anzahl Techniker'] self.logger.info(f"Wert aus D365: '{d365_val}' (Typ: {type(d365_val)})") self.logger.info(f"Wert aus GSheet DataFrame: '{gsheet_val}' (Typ: {type(gsheet_val)})") if str(d365_val).strip() != str(gsheet_val).strip(): self.logger.info("--> Ergebnis: Werte sind UNTERSCHIEDLICH.") else: self.logger.info("--> Ergebnis: Werte sind IDENTISCH.") self.logger.info("========== ENDE SYNC-DEBUG-MODUS ==========")