From 22f873e1dc743d7144d434a14fe4eb2076abda00 Mon Sep 17 00:00:00 2001 From: Floke Date: Thu, 28 Aug 2025 18:37:36 +0000 Subject: [PATCH] sync_manager.py aktualisiert --- sync_manager.py | 233 +++++++++++++++++++++++++++++------------------- 1 file changed, 139 insertions(+), 94 deletions(-) diff --git a/sync_manager.py b/sync_manager.py index fabb5753..4436a08d 100644 --- a/sync_manager.py +++ b/sync_manager.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3 +#!/usr/-bin/env python3 """ sync_manager.py @@ -9,7 +9,7 @@ gelöschte Datensätze zu identifizieren und zu verarbeiten. import pandas as pd import logging -import re +import re, unicodedata from collections import defaultdict from config import COLUMN_ORDER, COLUMN_MAP, Config @@ -38,6 +38,7 @@ class SyncStatistics: ] if self.field_updates: report.append("| Feld-Updates im Detail:") + # Sortiert die Feld-Updates nach Häufigkeit sorted_updates = sorted(self.field_updates.items(), key=lambda item: item[1], reverse=True) for field, count in sorted_updates: report.append(f"| - {field:<25} | {count} mal") @@ -81,86 +82,120 @@ class SyncManager: "CRM Anzahl Techniker", "CRM Branche", "CRM Umsatz", "CRM Anzahl Mitarbeiter", "CRM Beschreibung"] self.smart_merge_cols = ["CRM Website"] - - def _normalize_header(self, header_str: str) -> str: - """Bereinigt einen Header-String von unsichtbaren Zeichen und normalisiert Whitespace.""" - if not isinstance(header_str, str): return "" - # 1. Ersetze Non-Breaking-Spaces - normalized = header_str.replace('\u00A0', ' ') - # 2. Entferne Zero-Width-Spaces und Byte Order Mark (BOM) - normalized = re.sub(r'[\u200B\u200E\u200F\ufeff]', '', normalized) - # 3. Fasse mehrere Leerzeichen zusammen und entferne führende/nachfolgende - normalized = re.sub(r'\s+', ' ', normalized).strip() - return normalized def _load_data(self): - """Lädt und bereitet die Daten aus D365 und Google Sheets vor.""" - self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...") - try: - temp_d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('') - for d365_col in self.d365_to_gsheet_map.keys(): - if d365_col not in temp_d365_df.columns: - raise ValueError(f"Erwartete Spalte '{d365_col}' nicht in der D365-Exportdatei gefunden.") - self.d365_df = temp_d365_df[list(self.d365_to_gsheet_map.keys())].copy() - self.d365_df.rename(columns=self.d365_to_gsheet_map, inplace=True) - self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip().str.lower() - self.d365_df = self.d365_df[self.d365_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)] - except Exception as e: - self.logger.critical(f"Fehler beim Laden der Excel-Datei: {e}", exc_info=True) - return False + """ + Lädt Daten aus D365-Export und Google Sheet. + WICHTIG: Header aus dem GSheet werden normalisiert und auf kanonische Namen (COLUMN_ORDER) gemappt, + damit unsichtbare Zeichen (NBSP, Zero-Width, BOM etc.) keine Schatten-Spalten erzeugen. + """ + self.logger.info("Starte _load_data()...") - self.logger.info("Lade bestehende Daten aus dem Google Sheet...") - try: - all_data_with_headers = self.sheet_handler.get_all_data_with_headers() - if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows: - self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER) + # 1) D365-Daten laden (unverändert) + self.logger.debug("Lade D365-Export...") + self.d365_df = self._load_d365_export() # erwartet bestehende Implementierung + if self.d365_df is None or self.d365_df.empty: + self.logger.warning("D365-DataFrame ist leer oder None.") + + # 2) Google Sheet Rohdaten holen (mit Headern) + self.logger.debug("Lade Google Sheet Rohdaten (inkl. Header)...") + all_data_with_headers = self.sheet_handler.get_all_data_with_headers() + if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows: + self.logger.error("Google Sheet enthält keine gültige Header-Zeile.") + self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER) + return + + actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1] + data_rows = all_data_with_headers[self.sheet_handler._header_rows:] + + # Debug: zeige die Roh-Header repräsentiert (um unsichtbare Zeichen sichtbar zu machen) + self.logger.debug("Roh-Header (repr): " + " | ".join(repr(h) for h in actual_header)) + + # 3) Header-Normalisierung + def _norm_header(s: str) -> str: + if s is None: + return "" + s = str(s) + # NBSP -> Space, Zero-Width/RTL/BOM entfernen + s = s.replace("\u00A0", " ").replace("\u200B", "").replace("\u200E", "").replace("\u200F", "").replace("\ufeff", "") + # Control/Format-Zeichen entfernen + s = "".join(ch for ch in s if unicodedata.category(ch) not in ("Cf", "Cc", "Cs")) + # Whitespace normalisieren + s = re.sub(r"\s+", " ", s).strip() + return s + + norm_header = [_norm_header(h) for h in actual_header] + + # 4) Duplikate in den (normalisierten) Headern eindeutig machen + seen = {} + unique_norm_header = [] + for h in norm_header: + n = seen.get(h, 0) + unique_norm_header.append(h if n == 0 else f"{h}__dup{n}") + seen[h] = n + 1 + + # 5) Datenzeilen auf Header-Länge bringen + zu Strings casten (robust ggü. zu kurzen Zeilen) + fixed_rows = [] + target_len = len(unique_norm_header) + for r in data_rows: + if len(r) < target_len: + r = r + [''] * (target_len - len(r)) else: - # --- HIER IST DER FINALE FIX --- - header_raw = all_data_with_headers[self.sheet_handler._header_rows - 1] - data_rows = all_data_with_headers[self.sheet_handler._header_rows:] - - # 1. Normalisiere die gelesenen Header - header_normalized = [self._normalize_header(h) for h in header_raw] + r = r[:target_len] + fixed_rows.append([str(v) for v in r]) - # 2. Härtung: Logge Abweichungen für zukünftige Analysen - for raw, norm in zip(header_raw, header_normalized): - if raw != norm: - self.logger.debug(f"Header normalisiert: {repr(raw)} -> '{norm}'") - - # 3. Erstelle das DataFrame mit den normalisierten Headern - temp_df = pd.DataFrame(data_rows, columns=header_normalized) - - # 4. Stelle sicher, dass alle Spalten aus unserer Config existieren - for col_name in COLUMN_ORDER: - if col_name not in temp_df.columns: - self.logger.warning(f"Spalte '{col_name}' fehlt im GSheet und wird als leere Spalte hinzugefügt.") - temp_df[col_name] = '' - - # 5. Reduziere auf die korrekte Reihenfolge und fülle leere Zellen - self.gsheet_df = temp_df[COLUMN_ORDER].fillna('') + temp_df = pd.DataFrame(fixed_rows, columns=unique_norm_header) - except Exception as e: - self.logger.critical(f"Fehler beim Laden/Umwandeln der GSheet-Daten: {e}", exc_info=True) - return False - - # Konvertiere ALLES im finalen DataFrame zu Strings, um Typenkonflikte zu vermeiden - self.gsheet_df = self.gsheet_df.astype(str) - - self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip().str.lower() - initial_row_count = len(self.gsheet_df) - self.gsheet_df = self.gsheet_df[self.gsheet_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)] - if initial_row_count > len(self.gsheet_df): - self.logger.info(f"GSheet-Daten bereinigt: {initial_row_count - len(self.gsheet_df)} Zeilen ohne gültige GUID entfernt.") - - self.logger.info(f"{len(self.d365_df)} gültige Datensätze aus D365 geladen, {len(self.gsheet_df)} gültige Datensätze im Google Sheet.") - return True + # 6) Mapping: normalisierte Header -> kanonische Spaltennamen (COLUMN_ORDER) + canon_map = {_norm_header(c): c for c in COLUMN_ORDER} # z. B. {"CRM Anzahl Techniker": "CRM Anzahl Techniker", ...} + + rename_map = {} + unmapped_cols = [] + for col in list(temp_df.columns): + base = col.split("__dup")[0] # Duplikatsuffix entfernen + if base in canon_map: + rename_map[col] = canon_map[base] + else: + unmapped_cols.append(col) + + if rename_map: + temp_df.rename(columns=rename_map, inplace=True) + + # Debug: nicht gemappte Spalten melden (einmalig extrem hilfreich zur Ursachenanalyse) + if unmapped_cols: + self.logger.warning( + "Folgende GSheet-Spalten konnten NICHT auf COLUMN_ORDER gemappt werden " + "(vermutlich fremde/alte/abweichende Header): " + + ", ".join([f"{c!r}" for c in unmapped_cols]) + ) + + # 7) Fehlende Spalten (gegenüber COLUMN_ORDER) hinzufügen + for col_name in COLUMN_ORDER: + if col_name not in temp_df.columns: + temp_df[col_name] = "" + + # 8) Final in die gewünschte Spaltenreihenfolge bringen + self.gsheet_df = temp_df[COLUMN_ORDER] + + # 9) Optional: Sanity-Check auf das bekannte Problemfeld + if "CRM Anzahl Techniker" in self.gsheet_df.columns: + # Beispielhafte Debug-Ausgabe für den vom User genannten GUID-Datensatz + guid_col = "accountid" if "accountid" in self.gsheet_df.columns else None + if guid_col: + probe_guid = "0f68a69d-e330-ec11-b6e6-000d3adbc80e" + probe_row = self.gsheet_df[self.gsheet_df[guid_col] == probe_guid] + if not probe_row.empty: + val = probe_row.iloc[0]["CRM Anzahl Techniker"] + self.logger.info( + f"Sanity-Check: GSheet['CRM Anzahl Techniker'] für {probe_guid} -> {val!r} (Typ: {type(val)})" + ) + + self.logger.info("_load_data() abgeschlossen.") def run_sync(self): """Führt den gesamten Synchronisationsprozess aus.""" - # Diese Methode bleibt exakt wie in der letzten funktionierenden Version. - # Der Fix fand ausschließlich in _load_data() statt. if not self._load_data(): return - + self.target_sheet_name = self.sheet_handler.get_main_sheet_name() if not self.target_sheet_name: self.logger.critical("Konnte Namen des Ziel-Sheets nicht ermitteln. Abbruch.") @@ -174,13 +209,10 @@ class SyncManager: self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).") existing_ids = d365_ids.intersection(gsheet_ids) - self.stats.new_accounts = len(new_ids) - self.stats.archived_accounts = len(deleted_ids) - self.stats.existing_accounts = len(existing_ids) - self.logger.info(f"Sync-Analyse: {self.stats.new_accounts} neue, {self.stats.archived_accounts} zu archivierende, {self.stats.existing_accounts} bestehende Accounts.") + self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.") updates_to_batch, rows_to_append = [], [] - + if new_ids: new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)] for _, row in new_accounts_df.iterrows(): @@ -193,70 +225,83 @@ class SyncManager: if existing_ids: d365_indexed = self.d365_df.set_index('CRM ID') + + # --- KORREKTE DATENQUELLE VERWENDEN --- gsheet_to_update_df = self.gsheet_df[self.gsheet_df['CRM ID'].isin(existing_ids)] + for original_row_index, gsheet_row in gsheet_to_update_df.iterrows(): crm_id = gsheet_row['CRM ID'] if crm_id not in d365_indexed.index: continue d365_row = d365_indexed.loc[crm_id] + row_updates, conflict_messages, needs_reeval = {}, [], False + for gsheet_col in self.d365_wins_cols: d365_val = str(d365_row[gsheet_col]).strip() gsheet_val = str(gsheet_row[gsheet_col]).strip() + trigger_update = False + if gsheet_col == 'CRM Land': - d365_code_lower, gsheet_val_lower = d365_val.lower(), gsheet_val.lower() + d365_code_lower = d365_val.lower() + gsheet_val_lower = gsheet_val.lower() d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower() if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower: trigger_update = True + elif gsheet_col == 'CRM Anzahl Techniker': if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass elif d365_val != gsheet_val: trigger_update = True + elif gsheet_col == 'CRM Branche': if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val: trigger_update = True + elif gsheet_col == 'CRM Umsatz': if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val: trigger_update = True + elif gsheet_col == 'CRM Anzahl Mitarbeiter': if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val: trigger_update = True - elif gsheet_col == 'CRM Beschreibung': - if gsheet_row['Website Zusammenfassung'] == '' and d365_val != gsheet_val: - trigger_update = True + else: if d365_val != gsheet_val: trigger_update = True + if trigger_update: - row_updates[gsheet_col] = d365_val; needs_reeval = True - self.logger.debug(f"Update für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'") + row_updates[gsheet_col] = d365_val + needs_reeval = True + self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'") + for gsheet_col in self.smart_merge_cols: d365_val = str(d365_row.get(gsheet_col, '')).strip() gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip() + if d365_val and not gsheet_val: - row_updates[gsheet_col] = d365_val; needs_reeval = True + row_updates[gsheet_col] = d365_val + needs_reeval = True elif d365_val and gsheet_val and d365_val != gsheet_val: conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'") - if conflict_messages: - row_updates["SyncConflict"] = "; ".join(conflict_messages) - self.stats.conflict_accounts.add(crm_id) - for msg in conflict_messages: self.stats.field_conflicts[msg.split('_CONFLICT')[0]] += 1 + + if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages) if needs_reeval: row_updates["ReEval Flag"] = "x" + if row_updates: - self.stats.accounts_to_update.add(crm_id) - for field in row_updates.keys(): self.stats.field_updates[field] += 1 sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1 for col_name, value in row_updates.items(): updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] }) - + if rows_to_append: self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...") self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append) + if updates_to_batch: self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...") self.sheet_handler.batch_update_cells(updates_to_batch) - report = self.stats.generate_report() - self.logger.info(report) - print(report) + if not rows_to_append and not updates_to_batch: + self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.") + self.logger.info("Synchronisation erfolgreich abgeschlossen.") def debug_sync(self, debug_id=None):