Files
Brancheneinstufung2/sync_manager.py
2025-08-28 18:42:36 +00:00

463 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/-bin/env python3
"""
sync_manager.py
Modul für den Datenabgleich zwischen einem D365 Excel-Export und dem Google Sheet.
Führt einen intelligenten "Full-Sync" durch, um neue, geänderte und
gelöschte Datensätze zu identifizieren und zu verarbeiten.
"""
import pandas as pd
import logging
import re, unicodedata
from collections import defaultdict
from config import COLUMN_ORDER, COLUMN_MAP, Config
class SyncStatistics:
"""Eine einfache Klasse zum Sammeln von Statistiken während des Sync-Prozesses."""
def __init__(self):
self.new_accounts = 0
self.existing_accounts = 0
self.archived_accounts = 0
self.accounts_to_update = set()
self.field_updates = defaultdict(int)
self.conflict_accounts = set()
self.field_conflicts = defaultdict(int)
def generate_report(self):
report = [
"\n" + "="*50,
" Sync-Prozess Abschlussbericht",
"="*50,
f"| Neue Accounts hinzugefügt: | {self.new_accounts}",
f"| Bestehende Accounts analysiert: | {self.existing_accounts}",
f"| Accounts für Archivierung markiert:| {self.archived_accounts}",
"-"*50,
f"| Accounts mit Updates gesamt: | {len(self.accounts_to_update)}",
]
if self.field_updates:
report.append("| Feld-Updates im Detail:")
# Sortiert die Feld-Updates nach Häufigkeit
sorted_updates = sorted(self.field_updates.items(), key=lambda item: item[1], reverse=True)
for field, count in sorted_updates:
report.append(f"| - {field:<25} | {count} mal")
else:
report.append("| Keine Feld-Updates durchgeführt.")
report.append("-" * 50)
report.append(f"| Accounts mit Konflikten: | {len(self.conflict_accounts)}")
if self.field_conflicts:
report.append("| Feld-Konflikte im Detail:")
sorted_conflicts = sorted(self.field_conflicts.items(), key=lambda item: item[1], reverse=True)
for field, count in sorted_conflicts:
report.append(f"| - {field:<25} | {count} mal")
else:
report.append("| Keine Konflikte festgestellt.")
report.append("="*50)
return "\n".join(report)
class SyncManager:
"""
Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet.
"""
def __init__(self, sheet_handler, d365_export_path):
self.sheet_handler = sheet_handler
self.d365_export_path = d365_export_path
self.logger = logging.getLogger(__name__)
self.stats = SyncStatistics()
self.target_sheet_name = None
self.d365_to_gsheet_map = {
"Account Name": "CRM Name", "Parent Account": "Parent Account Name",
"Website": "CRM Website", "City": "CRM Ort", "Country": "CRM Land",
"Description FSM": "CRM Beschreibung", "Branch detail": "CRM Branche",
"No. Service Technicians": "CRM Anzahl Techniker",
"Annual Revenue (Mio. €)": "CRM Umsatz",
"Number of Employees": "CRM Anzahl Mitarbeiter", "GUID": "CRM ID"
}
self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land",
"CRM Anzahl Techniker", "CRM Branche", "CRM Umsatz",
"CRM Anzahl Mitarbeiter", "CRM Beschreibung"]
self.smart_merge_cols = ["CRM Website"]
def _load_data(self):
"""Lädt und bereitet die Daten aus D365 (Excel) und Google Sheets vor. Hart gegen „verschmutzte“ Header im Sheet."""
# ----------------------------
# D365-EXPORT LADEN (Excel)
# ----------------------------
self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...")
try:
# Alles als String laden und NaN -> '' setzen, damit Vergleiche stabil sind
temp_d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('')
# Erwartete Spalten aus dem D365-Export prüfen
for d365_col in self.d365_to_gsheet_map.keys():
if d365_col not in temp_d365_df.columns:
raise ValueError(f"Erwartete Spalte '{d365_col}' nicht in der D365-Exportdatei gefunden.")
# Auf die relevanten Spalten reduzieren und auf GSheet-Namen umbenennen
self.d365_df = temp_d365_df[list(self.d365_to_gsheet_map.keys())].copy()
self.d365_df.rename(columns=self.d365_to_gsheet_map, inplace=True)
# GUID-Format vereinheitlichen (lowercase, Trim) und nur gültige GUIDs behalten
if 'CRM ID' not in self.d365_df.columns:
raise ValueError("Nach dem Umbenennen fehlt die Spalte 'CRM ID' im D365-DataFrame.")
self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip().str.lower()
self.d365_df = self.d365_df[self.d365_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)]
# Leere DataFrames vermeiden: fehlende Spalten aus COLUMN_ORDER ergänzen
for col_name in COLUMN_ORDER:
if col_name not in self.d365_df.columns:
self.d365_df[col_name] = ''
except Exception as e:
self.logger.critical(f"Fehler beim Laden der Excel-Datei: {e}", exc_info=True)
return False
# ----------------------------
# GOOGLE SHEET LADEN + HEADER NORMALISIEREN
# ----------------------------
self.logger.info("Lade bestehende Daten aus dem Google Sheet...")
try:
all_data_with_headers = self.sheet_handler.get_all_data_with_headers()
if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows:
# Kein valider Header -> leeres DF mit korrekter Spaltenreihenfolge
self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER)
else:
actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1]
data_rows = all_data_with_headers[self.sheet_handler._header_rows:]
# Header im Log als repr ausgeben, um unsichtbare Zeichen später schnell zu finden
try:
self.logger.debug("Roh-Header (repr): " + " | ".join(repr(h) for h in actual_header))
except Exception:
pass
# ---- Header-Normalisierung (NBSP, Zero-Width, BOM, überflüssige Spaces) ----
def _norm_header(s: str) -> str:
if s is None:
return ""
s = str(s)
s = s.replace("\u00A0", " ") # NBSP -> Space
s = s.replace("\u200B", "").replace("\u200E", "").replace("\u200F", "").replace("\ufeff", "") # ZWSP/RTL/BOM raus
# Control/Format Zeichen entfernen
s = "".join(ch for ch in s if unicodedata.category(ch) not in ("Cf", "Cc", "Cs"))
# Whitespace normalisieren
s = re.sub(r"\s+", " ", s).strip()
return s
norm_header = [_norm_header(h) for h in actual_header]
# Evtl. doppelte (normalisierte) Header technisch eindeutig machen
seen = {}
unique_norm_header = []
for h in norm_header:
n = seen.get(h, 0)
unique_norm_header.append(h if n == 0 else f"{h}__dup{n}")
seen[h] = n + 1
# Datenzeilen auf Header-Länge bringen und direkt zu Strings casten
fixed_rows = []
target_len = len(unique_norm_header)
for r in data_rows:
if len(r) < target_len:
r = r + [''] * (target_len - len(r))
else:
r = r[:target_len]
fixed_rows.append([str(v) for v in r])
temp_df = pd.DataFrame(fixed_rows, columns=unique_norm_header)
# Kanonische Namen (COLUMN_ORDER) vorbereiten: normalisiert -> Original
canon_map = {_norm_header(c): c for c in COLUMN_ORDER}
# Spalten umbenennen (normalisierte -> kanonische Namen) und unmappbare loggen
rename_map = {}
unmapped_cols = []
for col in list(temp_df.columns):
base = col.split("__dup")[0] # Duplikatsuffix entfernen
if base in canon_map:
rename_map[col] = canon_map[base]
else:
unmapped_cols.append(col)
if rename_map:
temp_df.rename(columns=rename_map, inplace=True)
if unmapped_cols:
self.logger.warning(
"Folgende GSheet-Spalten konnten NICHT auf COLUMN_ORDER gemappt werden "
"(vermutlich fremde/alte/abweichende Header): "
+ ", ".join([f"{c!r}" for c in unmapped_cols])
)
# Fehlende Spalten (gegenüber COLUMN_ORDER) ergänzen
for col_name in COLUMN_ORDER:
if col_name not in temp_df.columns:
temp_df[col_name] = ""
# Final in gewünschte Reihenfolge bringen
self.gsheet_df = temp_df[COLUMN_ORDER]
# Sanity-Check für den gemeldeten Fall (nur Info-Log)
try:
if "CRM Anzahl Techniker" in self.gsheet_df.columns and "CRM ID" in self.gsheet_df.columns:
probe_guid = "0f68a69d-e330-ec11-b6e6-000d3adbc80e"
probe_row = self.gsheet_df[self.gsheet_df["CRM ID"].str.lower() == probe_guid]
if not probe_row.empty:
val = probe_row.iloc[0]["CRM Anzahl Techniker"]
self.logger.info(
f"Sanity-Check: GSheet['CRM Anzahl Techniker'] für {probe_guid} -> {val!r} (Typ: {type(val)})"
)
except Exception:
# Nur zur Sicherheit Sync soll nicht am Check scheitern
pass
except Exception as e:
self.logger.critical(f"Fehler beim Laden/Umwandeln der GSheet-Daten: {e}", exc_info=True)
return False
# ----------------------------
# ZIEL-SHEET ERMITTELN & SYNC-BASIS BESTIMMEN
# ----------------------------
self.target_sheet_name = self.sheet_handler.get_main_sheet_name()
if not self.target_sheet_name:
self.logger.critical("Konnte Namen des Ziel-Sheets nicht ermitteln. Abbruch.")
return False
# IDs bestimmen (nur auf gefüllte CRM IDs)
d365_ids = set(self.d365_df['CRM ID'].dropna()) if 'CRM ID' in self.d365_df.columns else set()
gsheet_ids = set(self.gsheet_df['CRM ID'].dropna()) if 'CRM ID' in self.gsheet_df.columns else set()
new_ids = d365_ids - gsheet_ids
existing_ids = d365_ids.intersection(gsheet_ids)
# Archivierung wird (wie bisher) übersprungen Teil-Export angenommen
deleted_ids = set()
self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).")
self.logger.info(
f"Sync-Basis: {len(new_ids)} neu, {len(existing_ids)} vorhanden, {len(deleted_ids)} gelöscht (übersprungen)."
)
# Ergebnisse in Objekt speichern
self.new_ids = new_ids
self.existing_ids = existing_ids
self.deleted_ids = deleted_ids
return True
def run_sync(self):
"""Führt den gesamten Synchronisationsprozess aus."""
if not self._load_data(): return
self.target_sheet_name = self.sheet_handler.get_main_sheet_name()
if not self.target_sheet_name:
self.logger.critical("Konnte Namen des Ziel-Sheets nicht ermitteln. Abbruch.")
return
d365_ids = set(self.d365_df['CRM ID'].dropna())
gsheet_ids = set(self.gsheet_df['CRM ID'].dropna())
new_ids = d365_ids - gsheet_ids
deleted_ids = set()
self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).")
existing_ids = d365_ids.intersection(gsheet_ids)
self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.")
updates_to_batch, rows_to_append = [], []
if new_ids:
new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)]
for _, row in new_accounts_df.iterrows():
new_row_data = [""] * len(COLUMN_ORDER)
for gsheet_col in self.d365_to_gsheet_map.values():
if gsheet_col in row:
col_idx = COLUMN_MAP[gsheet_col]['index']
new_row_data[col_idx] = row[gsheet_col]
rows_to_append.append(new_row_data)
if existing_ids:
d365_indexed = self.d365_df.set_index('CRM ID')
# --- KORREKTE DATENQUELLE VERWENDEN ---
gsheet_to_update_df = self.gsheet_df[self.gsheet_df['CRM ID'].isin(existing_ids)]
for original_row_index, gsheet_row in gsheet_to_update_df.iterrows():
crm_id = gsheet_row['CRM ID']
if crm_id not in d365_indexed.index: continue
d365_row = d365_indexed.loc[crm_id]
row_updates, conflict_messages, needs_reeval = {}, [], False
for gsheet_col in self.d365_wins_cols:
d365_val = str(d365_row[gsheet_col]).strip()
gsheet_val = str(gsheet_row[gsheet_col]).strip()
trigger_update = False
if gsheet_col == 'CRM Land':
d365_code_lower = d365_val.lower()
gsheet_val_lower = gsheet_val.lower()
d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower()
if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower:
trigger_update = True
elif gsheet_col == 'CRM Anzahl Techniker':
if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass
elif d365_val != gsheet_val: trigger_update = True
elif gsheet_col == 'CRM Branche':
if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val:
trigger_update = True
elif gsheet_col == 'CRM Umsatz':
if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val:
trigger_update = True
elif gsheet_col == 'CRM Anzahl Mitarbeiter':
if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val:
trigger_update = True
else:
if d365_val != gsheet_val: trigger_update = True
if trigger_update:
row_updates[gsheet_col] = d365_val
needs_reeval = True
self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'")
for gsheet_col in self.smart_merge_cols:
d365_val = str(d365_row.get(gsheet_col, '')).strip()
gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip()
if d365_val and not gsheet_val:
row_updates[gsheet_col] = d365_val
needs_reeval = True
elif d365_val and gsheet_val and d365_val != gsheet_val:
conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'")
if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages)
if needs_reeval: row_updates["ReEval Flag"] = "x"
if row_updates:
sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1
for col_name, value in row_updates.items():
updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] })
if rows_to_append:
self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...")
self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append)
if updates_to_batch:
self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...")
self.sheet_handler.batch_update_cells(updates_to_batch)
if not rows_to_append and not updates_to_batch:
self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.")
self.logger.info("Synchronisation erfolgreich abgeschlossen.")
def debug_sync(self, debug_id=None):
"""
Führt eine Analyse des Sync-Prozesses durch. Ohne debug_id wird eine
allgemeine Statistik ausgegeben. Mit debug_id wird eine Tiefenanalyse
für einen einzelnen Datensatz durchgeführt.
"""
self.logger.info("========== START SYNC-DEBUG-MODUS ==========")
# Lade die Rohdaten, aber brich die _load_data Funktion noch nicht ab
self.logger.info("Lade Rohdaten aus Google Sheet für Tiefenanalyse...")
try:
all_data_with_headers = self.sheet_handler.get_all_data_with_headers()
if not all_data_with_headers:
self.logger.error("Debug abgebrochen, Google Sheet ist leer.")
return
except Exception as e:
self.logger.error(f"Debug abgebrochen, Fehler beim Laden der Rohdaten: {e}")
return
if not debug_id:
# Führe den Rest von _load_data aus für die allgemeine Statistik
if not self._load_data():
self.logger.error("Debug abgebrochen, da das Laden der Daten fehlschlug.")
return
self.logger.info("Keine spezifische ID angegeben. Führe allgemeine Statistik-Analyse durch.")
d365_ids = set(self.d365_df['CRM ID'])
gsheet_ids = set(self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID'].dropna())
self.logger.info("\n--- Set-Analyse (Vergleich) ---")
self.logger.info(f"Anzahl neuer IDs: {len(d365_ids - gsheet_ids)}")
self.logger.info(f"Anzahl zu archivierender IDs: {len(gsheet_ids - d365_ids)}")
self.logger.info(f"Größe der Schnittmenge: {len(d365_ids.intersection(gsheet_ids))}")
self.logger.info("========== ENDE SYNC-DEBUG-MODUS ==========")
return
# --- TIEFENANALYSE FÜR EINE SPEZIFISCHE ID ---
self.logger.info(f"\n--- Tiefenanalyse für CRM ID: {debug_id} ---")
debug_id_lower = debug_id.lower().strip()
# 1. Finde die Roh-Zeile im Google Sheet
self.logger.info("\n--- Rohdaten-Analyse aus Google Sheet ---")
header = all_data_with_headers[self.sheet_handler._header_rows - 1]
crm_id_index = -1
try:
# Finde den Index der 'CRM ID' Spalte im Header
crm_id_index = header.index("CRM ID")
except ValueError:
self.logger.error("Spalte 'CRM ID' nicht im Header des Google Sheets gefunden!")
found_raw_row = None
if crm_id_index != -1:
for i, row in enumerate(all_data_with_headers[self.sheet_handler._header_rows:]):
# Stelle sicher, dass die Zeile lang genug ist
if len(row) > crm_id_index:
if str(row[crm_id_index]).lower().strip() == debug_id_lower:
found_raw_row = row
self.logger.info(f"Roh-Zeile gefunden bei Index {i} (nach Header):")
self.logger.info(found_raw_row)
break
if not found_raw_row:
self.logger.warning("ID in den Rohdaten des Google Sheets nicht gefunden.")
# 2. Führe jetzt die normale Datenverarbeitung durch, um das DataFrame zu bekommen
if not self._load_data():
self.logger.error("Debug abgebrochen, da das Laden der Daten fehlschlug.")
return
# 3. Analyse der DataFrames (wie gehabt)
d365_row = self.d365_df[self.d365_df['CRM ID'] == debug_id_lower]
if d365_row.empty:
self.logger.warning("ID in D365-Export nicht gefunden.")
else:
self.logger.info("\nDatensatz aus D365-Export (nach Verarbeitung):")
self.logger.info(d365_row.to_dict('records')[0])
gsheet_row = self.gsheet_df[self.gsheet_df['CRM ID'] == debug_id_lower]
if gsheet_row.empty:
self.logger.warning("ID im Google Sheet DataFrame nicht gefunden (nach Bereinigung).")
else:
self.logger.info("\nDatensatz aus Google Sheet (nach Verarbeitung zu DataFrame):")
self.logger.info(gsheet_row.to_dict('records')[0])
# 4. Direkter Vergleich des kritischen Feldes
if not d365_row.empty and not gsheet_row.empty:
self.logger.info("\n--- Direkter Feld-Vergleich: CRM Anzahl Techniker ---")
d365_val = d365_row.iloc[0]['CRM Anzahl Techniker']
gsheet_val = gsheet_row.iloc[0]['CRM Anzahl Techniker']
self.logger.info(f"Wert aus D365: '{d365_val}' (Typ: {type(d365_val)})")
self.logger.info(f"Wert aus GSheet DataFrame: '{gsheet_val}' (Typ: {type(gsheet_val)})")
if str(d365_val).strip() != str(gsheet_val).strip():
self.logger.info("--> Ergebnis: Werte sind UNTERSCHIEDLICH.")
else:
self.logger.info("--> Ergebnis: Werte sind IDENTISCH.")
self.logger.info("========== ENDE SYNC-DEBUG-MODUS ==========")