Files
Brancheneinstufung2/sync_manager.py
2025-08-28 18:37:36 +00:00

403 lines
19 KiB
Python

#!/usr/-bin/env python3
"""
sync_manager.py
Modul für den Datenabgleich zwischen einem D365 Excel-Export und dem Google Sheet.
Führt einen intelligenten "Full-Sync" durch, um neue, geänderte und
gelöschte Datensätze zu identifizieren und zu verarbeiten.
"""
import pandas as pd
import logging
import re, unicodedata
from collections import defaultdict
from config import COLUMN_ORDER, COLUMN_MAP, Config
class SyncStatistics:
"""Eine einfache Klasse zum Sammeln von Statistiken während des Sync-Prozesses."""
def __init__(self):
self.new_accounts = 0
self.existing_accounts = 0
self.archived_accounts = 0
self.accounts_to_update = set()
self.field_updates = defaultdict(int)
self.conflict_accounts = set()
self.field_conflicts = defaultdict(int)
def generate_report(self):
report = [
"\n" + "="*50,
" Sync-Prozess Abschlussbericht",
"="*50,
f"| Neue Accounts hinzugefügt: | {self.new_accounts}",
f"| Bestehende Accounts analysiert: | {self.existing_accounts}",
f"| Accounts für Archivierung markiert:| {self.archived_accounts}",
"-"*50,
f"| Accounts mit Updates gesamt: | {len(self.accounts_to_update)}",
]
if self.field_updates:
report.append("| Feld-Updates im Detail:")
# Sortiert die Feld-Updates nach Häufigkeit
sorted_updates = sorted(self.field_updates.items(), key=lambda item: item[1], reverse=True)
for field, count in sorted_updates:
report.append(f"| - {field:<25} | {count} mal")
else:
report.append("| Keine Feld-Updates durchgeführt.")
report.append("-" * 50)
report.append(f"| Accounts mit Konflikten: | {len(self.conflict_accounts)}")
if self.field_conflicts:
report.append("| Feld-Konflikte im Detail:")
sorted_conflicts = sorted(self.field_conflicts.items(), key=lambda item: item[1], reverse=True)
for field, count in sorted_conflicts:
report.append(f"| - {field:<25} | {count} mal")
else:
report.append("| Keine Konflikte festgestellt.")
report.append("="*50)
return "\n".join(report)
class SyncManager:
"""
Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet.
"""
def __init__(self, sheet_handler, d365_export_path):
self.sheet_handler = sheet_handler
self.d365_export_path = d365_export_path
self.logger = logging.getLogger(__name__)
self.stats = SyncStatistics()
self.target_sheet_name = None
self.d365_to_gsheet_map = {
"Account Name": "CRM Name", "Parent Account": "Parent Account Name",
"Website": "CRM Website", "City": "CRM Ort", "Country": "CRM Land",
"Description FSM": "CRM Beschreibung", "Branch detail": "CRM Branche",
"No. Service Technicians": "CRM Anzahl Techniker",
"Annual Revenue (Mio. €)": "CRM Umsatz",
"Number of Employees": "CRM Anzahl Mitarbeiter", "GUID": "CRM ID"
}
self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land",
"CRM Anzahl Techniker", "CRM Branche", "CRM Umsatz",
"CRM Anzahl Mitarbeiter", "CRM Beschreibung"]
self.smart_merge_cols = ["CRM Website"]
def _load_data(self):
"""
Lädt Daten aus D365-Export und Google Sheet.
WICHTIG: Header aus dem GSheet werden normalisiert und auf kanonische Namen (COLUMN_ORDER) gemappt,
damit unsichtbare Zeichen (NBSP, Zero-Width, BOM etc.) keine Schatten-Spalten erzeugen.
"""
self.logger.info("Starte _load_data()...")
# 1) D365-Daten laden (unverändert)
self.logger.debug("Lade D365-Export...")
self.d365_df = self._load_d365_export() # erwartet bestehende Implementierung
if self.d365_df is None or self.d365_df.empty:
self.logger.warning("D365-DataFrame ist leer oder None.")
# 2) Google Sheet Rohdaten holen (mit Headern)
self.logger.debug("Lade Google Sheet Rohdaten (inkl. Header)...")
all_data_with_headers = self.sheet_handler.get_all_data_with_headers()
if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows:
self.logger.error("Google Sheet enthält keine gültige Header-Zeile.")
self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER)
return
actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1]
data_rows = all_data_with_headers[self.sheet_handler._header_rows:]
# Debug: zeige die Roh-Header repräsentiert (um unsichtbare Zeichen sichtbar zu machen)
self.logger.debug("Roh-Header (repr): " + " | ".join(repr(h) for h in actual_header))
# 3) Header-Normalisierung
def _norm_header(s: str) -> str:
if s is None:
return ""
s = str(s)
# NBSP -> Space, Zero-Width/RTL/BOM entfernen
s = s.replace("\u00A0", " ").replace("\u200B", "").replace("\u200E", "").replace("\u200F", "").replace("\ufeff", "")
# Control/Format-Zeichen entfernen
s = "".join(ch for ch in s if unicodedata.category(ch) not in ("Cf", "Cc", "Cs"))
# Whitespace normalisieren
s = re.sub(r"\s+", " ", s).strip()
return s
norm_header = [_norm_header(h) for h in actual_header]
# 4) Duplikate in den (normalisierten) Headern eindeutig machen
seen = {}
unique_norm_header = []
for h in norm_header:
n = seen.get(h, 0)
unique_norm_header.append(h if n == 0 else f"{h}__dup{n}")
seen[h] = n + 1
# 5) Datenzeilen auf Header-Länge bringen + zu Strings casten (robust ggü. zu kurzen Zeilen)
fixed_rows = []
target_len = len(unique_norm_header)
for r in data_rows:
if len(r) < target_len:
r = r + [''] * (target_len - len(r))
else:
r = r[:target_len]
fixed_rows.append([str(v) for v in r])
temp_df = pd.DataFrame(fixed_rows, columns=unique_norm_header)
# 6) Mapping: normalisierte Header -> kanonische Spaltennamen (COLUMN_ORDER)
canon_map = {_norm_header(c): c for c in COLUMN_ORDER} # z. B. {"CRM Anzahl Techniker": "CRM Anzahl Techniker", ...}
rename_map = {}
unmapped_cols = []
for col in list(temp_df.columns):
base = col.split("__dup")[0] # Duplikatsuffix entfernen
if base in canon_map:
rename_map[col] = canon_map[base]
else:
unmapped_cols.append(col)
if rename_map:
temp_df.rename(columns=rename_map, inplace=True)
# Debug: nicht gemappte Spalten melden (einmalig extrem hilfreich zur Ursachenanalyse)
if unmapped_cols:
self.logger.warning(
"Folgende GSheet-Spalten konnten NICHT auf COLUMN_ORDER gemappt werden "
"(vermutlich fremde/alte/abweichende Header): "
+ ", ".join([f"{c!r}" for c in unmapped_cols])
)
# 7) Fehlende Spalten (gegenüber COLUMN_ORDER) hinzufügen
for col_name in COLUMN_ORDER:
if col_name not in temp_df.columns:
temp_df[col_name] = ""
# 8) Final in die gewünschte Spaltenreihenfolge bringen
self.gsheet_df = temp_df[COLUMN_ORDER]
# 9) Optional: Sanity-Check auf das bekannte Problemfeld
if "CRM Anzahl Techniker" in self.gsheet_df.columns:
# Beispielhafte Debug-Ausgabe für den vom User genannten GUID-Datensatz
guid_col = "accountid" if "accountid" in self.gsheet_df.columns else None
if guid_col:
probe_guid = "0f68a69d-e330-ec11-b6e6-000d3adbc80e"
probe_row = self.gsheet_df[self.gsheet_df[guid_col] == probe_guid]
if not probe_row.empty:
val = probe_row.iloc[0]["CRM Anzahl Techniker"]
self.logger.info(
f"Sanity-Check: GSheet['CRM Anzahl Techniker'] für {probe_guid} -> {val!r} (Typ: {type(val)})"
)
self.logger.info("_load_data() abgeschlossen.")
def run_sync(self):
"""Führt den gesamten Synchronisationsprozess aus."""
if not self._load_data(): return
self.target_sheet_name = self.sheet_handler.get_main_sheet_name()
if not self.target_sheet_name:
self.logger.critical("Konnte Namen des Ziel-Sheets nicht ermitteln. Abbruch.")
return
d365_ids = set(self.d365_df['CRM ID'].dropna())
gsheet_ids = set(self.gsheet_df['CRM ID'].dropna())
new_ids = d365_ids - gsheet_ids
deleted_ids = set()
self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).")
existing_ids = d365_ids.intersection(gsheet_ids)
self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.")
updates_to_batch, rows_to_append = [], []
if new_ids:
new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)]
for _, row in new_accounts_df.iterrows():
new_row_data = [""] * len(COLUMN_ORDER)
for gsheet_col in self.d365_to_gsheet_map.values():
if gsheet_col in row:
col_idx = COLUMN_MAP[gsheet_col]['index']
new_row_data[col_idx] = row[gsheet_col]
rows_to_append.append(new_row_data)
if existing_ids:
d365_indexed = self.d365_df.set_index('CRM ID')
# --- KORREKTE DATENQUELLE VERWENDEN ---
gsheet_to_update_df = self.gsheet_df[self.gsheet_df['CRM ID'].isin(existing_ids)]
for original_row_index, gsheet_row in gsheet_to_update_df.iterrows():
crm_id = gsheet_row['CRM ID']
if crm_id not in d365_indexed.index: continue
d365_row = d365_indexed.loc[crm_id]
row_updates, conflict_messages, needs_reeval = {}, [], False
for gsheet_col in self.d365_wins_cols:
d365_val = str(d365_row[gsheet_col]).strip()
gsheet_val = str(gsheet_row[gsheet_col]).strip()
trigger_update = False
if gsheet_col == 'CRM Land':
d365_code_lower = d365_val.lower()
gsheet_val_lower = gsheet_val.lower()
d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower()
if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower:
trigger_update = True
elif gsheet_col == 'CRM Anzahl Techniker':
if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass
elif d365_val != gsheet_val: trigger_update = True
elif gsheet_col == 'CRM Branche':
if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val:
trigger_update = True
elif gsheet_col == 'CRM Umsatz':
if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val:
trigger_update = True
elif gsheet_col == 'CRM Anzahl Mitarbeiter':
if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val:
trigger_update = True
else:
if d365_val != gsheet_val: trigger_update = True
if trigger_update:
row_updates[gsheet_col] = d365_val
needs_reeval = True
self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'")
for gsheet_col in self.smart_merge_cols:
d365_val = str(d365_row.get(gsheet_col, '')).strip()
gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip()
if d365_val and not gsheet_val:
row_updates[gsheet_col] = d365_val
needs_reeval = True
elif d365_val and gsheet_val and d365_val != gsheet_val:
conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'")
if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages)
if needs_reeval: row_updates["ReEval Flag"] = "x"
if row_updates:
sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1
for col_name, value in row_updates.items():
updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] })
if rows_to_append:
self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...")
self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append)
if updates_to_batch:
self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...")
self.sheet_handler.batch_update_cells(updates_to_batch)
if not rows_to_append and not updates_to_batch:
self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.")
self.logger.info("Synchronisation erfolgreich abgeschlossen.")
def debug_sync(self, debug_id=None):
"""
Führt eine Analyse des Sync-Prozesses durch. Ohne debug_id wird eine
allgemeine Statistik ausgegeben. Mit debug_id wird eine Tiefenanalyse
für einen einzelnen Datensatz durchgeführt.
"""
self.logger.info("========== START SYNC-DEBUG-MODUS ==========")
# Lade die Rohdaten, aber brich die _load_data Funktion noch nicht ab
self.logger.info("Lade Rohdaten aus Google Sheet für Tiefenanalyse...")
try:
all_data_with_headers = self.sheet_handler.get_all_data_with_headers()
if not all_data_with_headers:
self.logger.error("Debug abgebrochen, Google Sheet ist leer.")
return
except Exception as e:
self.logger.error(f"Debug abgebrochen, Fehler beim Laden der Rohdaten: {e}")
return
if not debug_id:
# Führe den Rest von _load_data aus für die allgemeine Statistik
if not self._load_data():
self.logger.error("Debug abgebrochen, da das Laden der Daten fehlschlug.")
return
self.logger.info("Keine spezifische ID angegeben. Führe allgemeine Statistik-Analyse durch.")
d365_ids = set(self.d365_df['CRM ID'])
gsheet_ids = set(self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID'].dropna())
self.logger.info("\n--- Set-Analyse (Vergleich) ---")
self.logger.info(f"Anzahl neuer IDs: {len(d365_ids - gsheet_ids)}")
self.logger.info(f"Anzahl zu archivierender IDs: {len(gsheet_ids - d365_ids)}")
self.logger.info(f"Größe der Schnittmenge: {len(d365_ids.intersection(gsheet_ids))}")
self.logger.info("========== ENDE SYNC-DEBUG-MODUS ==========")
return
# --- TIEFENANALYSE FÜR EINE SPEZIFISCHE ID ---
self.logger.info(f"\n--- Tiefenanalyse für CRM ID: {debug_id} ---")
debug_id_lower = debug_id.lower().strip()
# 1. Finde die Roh-Zeile im Google Sheet
self.logger.info("\n--- Rohdaten-Analyse aus Google Sheet ---")
header = all_data_with_headers[self.sheet_handler._header_rows - 1]
crm_id_index = -1
try:
# Finde den Index der 'CRM ID' Spalte im Header
crm_id_index = header.index("CRM ID")
except ValueError:
self.logger.error("Spalte 'CRM ID' nicht im Header des Google Sheets gefunden!")
found_raw_row = None
if crm_id_index != -1:
for i, row in enumerate(all_data_with_headers[self.sheet_handler._header_rows:]):
# Stelle sicher, dass die Zeile lang genug ist
if len(row) > crm_id_index:
if str(row[crm_id_index]).lower().strip() == debug_id_lower:
found_raw_row = row
self.logger.info(f"Roh-Zeile gefunden bei Index {i} (nach Header):")
self.logger.info(found_raw_row)
break
if not found_raw_row:
self.logger.warning("ID in den Rohdaten des Google Sheets nicht gefunden.")
# 2. Führe jetzt die normale Datenverarbeitung durch, um das DataFrame zu bekommen
if not self._load_data():
self.logger.error("Debug abgebrochen, da das Laden der Daten fehlschlug.")
return
# 3. Analyse der DataFrames (wie gehabt)
d365_row = self.d365_df[self.d365_df['CRM ID'] == debug_id_lower]
if d365_row.empty:
self.logger.warning("ID in D365-Export nicht gefunden.")
else:
self.logger.info("\nDatensatz aus D365-Export (nach Verarbeitung):")
self.logger.info(d365_row.to_dict('records')[0])
gsheet_row = self.gsheet_df[self.gsheet_df['CRM ID'] == debug_id_lower]
if gsheet_row.empty:
self.logger.warning("ID im Google Sheet DataFrame nicht gefunden (nach Bereinigung).")
else:
self.logger.info("\nDatensatz aus Google Sheet (nach Verarbeitung zu DataFrame):")
self.logger.info(gsheet_row.to_dict('records')[0])
# 4. Direkter Vergleich des kritischen Feldes
if not d365_row.empty and not gsheet_row.empty:
self.logger.info("\n--- Direkter Feld-Vergleich: CRM Anzahl Techniker ---")
d365_val = d365_row.iloc[0]['CRM Anzahl Techniker']
gsheet_val = gsheet_row.iloc[0]['CRM Anzahl Techniker']
self.logger.info(f"Wert aus D365: '{d365_val}' (Typ: {type(d365_val)})")
self.logger.info(f"Wert aus GSheet DataFrame: '{gsheet_val}' (Typ: {type(gsheet_val)})")
if str(d365_val).strip() != str(gsheet_val).strip():
self.logger.info("--> Ergebnis: Werte sind UNTERSCHIEDLICH.")
else:
self.logger.info("--> Ergebnis: Werte sind IDENTISCH.")
self.logger.info("========== ENDE SYNC-DEBUG-MODUS ==========")