sync_manager.py aktualisiert
This commit is contained in:
238
sync_manager.py
238
sync_manager.py
@@ -84,113 +84,173 @@ class SyncManager:
|
|||||||
self.smart_merge_cols = ["CRM Website"]
|
self.smart_merge_cols = ["CRM Website"]
|
||||||
|
|
||||||
def _load_data(self):
|
def _load_data(self):
|
||||||
"""
|
"""Lädt und bereitet die Daten aus D365 (Excel) und Google Sheets vor. Hart gegen „verschmutzte“ Header im Sheet."""
|
||||||
Lädt Daten aus D365-Export und Google Sheet.
|
# ----------------------------
|
||||||
WICHTIG: Header aus dem GSheet werden normalisiert und auf kanonische Namen (COLUMN_ORDER) gemappt,
|
# D365-EXPORT LADEN (Excel)
|
||||||
damit unsichtbare Zeichen (NBSP, Zero-Width, BOM etc.) keine Schatten-Spalten erzeugen.
|
# ----------------------------
|
||||||
"""
|
self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...")
|
||||||
self.logger.info("Starte _load_data()...")
|
try:
|
||||||
|
# Alles als String laden und NaN -> '' setzen, damit Vergleiche stabil sind
|
||||||
|
temp_d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('')
|
||||||
|
|
||||||
# 1) D365-Daten laden (unverändert)
|
# Erwartete Spalten aus dem D365-Export prüfen
|
||||||
self.logger.debug("Lade D365-Export...")
|
for d365_col in self.d365_to_gsheet_map.keys():
|
||||||
self.d365_df = self._load_d365_export() # erwartet bestehende Implementierung
|
if d365_col not in temp_d365_df.columns:
|
||||||
if self.d365_df is None or self.d365_df.empty:
|
raise ValueError(f"Erwartete Spalte '{d365_col}' nicht in der D365-Exportdatei gefunden.")
|
||||||
self.logger.warning("D365-DataFrame ist leer oder None.")
|
|
||||||
|
|
||||||
# 2) Google Sheet Rohdaten holen (mit Headern)
|
# Auf die relevanten Spalten reduzieren und auf GSheet-Namen umbenennen
|
||||||
self.logger.debug("Lade Google Sheet Rohdaten (inkl. Header)...")
|
self.d365_df = temp_d365_df[list(self.d365_to_gsheet_map.keys())].copy()
|
||||||
all_data_with_headers = self.sheet_handler.get_all_data_with_headers()
|
self.d365_df.rename(columns=self.d365_to_gsheet_map, inplace=True)
|
||||||
if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows:
|
|
||||||
self.logger.error("Google Sheet enthält keine gültige Header-Zeile.")
|
|
||||||
self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER)
|
|
||||||
return
|
|
||||||
|
|
||||||
actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1]
|
# GUID-Format vereinheitlichen (lowercase, Trim) und nur gültige GUIDs behalten
|
||||||
data_rows = all_data_with_headers[self.sheet_handler._header_rows:]
|
if 'CRM ID' not in self.d365_df.columns:
|
||||||
|
raise ValueError("Nach dem Umbenennen fehlt die Spalte 'CRM ID' im D365-DataFrame.")
|
||||||
|
self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip().str.lower()
|
||||||
|
self.d365_df = self.d365_df[self.d365_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)]
|
||||||
|
|
||||||
# Debug: zeige die Roh-Header repräsentiert (um unsichtbare Zeichen sichtbar zu machen)
|
# Leere DataFrames vermeiden: fehlende Spalten aus COLUMN_ORDER ergänzen
|
||||||
self.logger.debug("Roh-Header (repr): " + " | ".join(repr(h) for h in actual_header))
|
for col_name in COLUMN_ORDER:
|
||||||
|
if col_name not in self.d365_df.columns:
|
||||||
|
self.d365_df[col_name] = ''
|
||||||
|
|
||||||
# 3) Header-Normalisierung
|
except Exception as e:
|
||||||
def _norm_header(s: str) -> str:
|
self.logger.critical(f"Fehler beim Laden der Excel-Datei: {e}", exc_info=True)
|
||||||
if s is None:
|
return False
|
||||||
return ""
|
|
||||||
s = str(s)
|
|
||||||
# NBSP -> Space, Zero-Width/RTL/BOM entfernen
|
|
||||||
s = s.replace("\u00A0", " ").replace("\u200B", "").replace("\u200E", "").replace("\u200F", "").replace("\ufeff", "")
|
|
||||||
# Control/Format-Zeichen entfernen
|
|
||||||
s = "".join(ch for ch in s if unicodedata.category(ch) not in ("Cf", "Cc", "Cs"))
|
|
||||||
# Whitespace normalisieren
|
|
||||||
s = re.sub(r"\s+", " ", s).strip()
|
|
||||||
return s
|
|
||||||
|
|
||||||
norm_header = [_norm_header(h) for h in actual_header]
|
# ----------------------------
|
||||||
|
# GOOGLE SHEET LADEN + HEADER NORMALISIEREN
|
||||||
|
# ----------------------------
|
||||||
|
self.logger.info("Lade bestehende Daten aus dem Google Sheet...")
|
||||||
|
try:
|
||||||
|
all_data_with_headers = self.sheet_handler.get_all_data_with_headers()
|
||||||
|
|
||||||
# 4) Duplikate in den (normalisierten) Headern eindeutig machen
|
if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows:
|
||||||
seen = {}
|
# Kein valider Header -> leeres DF mit korrekter Spaltenreihenfolge
|
||||||
unique_norm_header = []
|
self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER)
|
||||||
for h in norm_header:
|
|
||||||
n = seen.get(h, 0)
|
|
||||||
unique_norm_header.append(h if n == 0 else f"{h}__dup{n}")
|
|
||||||
seen[h] = n + 1
|
|
||||||
|
|
||||||
# 5) Datenzeilen auf Header-Länge bringen + zu Strings casten (robust ggü. zu kurzen Zeilen)
|
|
||||||
fixed_rows = []
|
|
||||||
target_len = len(unique_norm_header)
|
|
||||||
for r in data_rows:
|
|
||||||
if len(r) < target_len:
|
|
||||||
r = r + [''] * (target_len - len(r))
|
|
||||||
else:
|
else:
|
||||||
r = r[:target_len]
|
actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1]
|
||||||
fixed_rows.append([str(v) for v in r])
|
data_rows = all_data_with_headers[self.sheet_handler._header_rows:]
|
||||||
|
|
||||||
temp_df = pd.DataFrame(fixed_rows, columns=unique_norm_header)
|
# Header im Log als repr ausgeben, um unsichtbare Zeichen später schnell zu finden
|
||||||
|
try:
|
||||||
|
self.logger.debug("Roh-Header (repr): " + " | ".join(repr(h) for h in actual_header))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
# 6) Mapping: normalisierte Header -> kanonische Spaltennamen (COLUMN_ORDER)
|
# ---- Header-Normalisierung (NBSP, Zero-Width, BOM, überflüssige Spaces) ----
|
||||||
canon_map = {_norm_header(c): c for c in COLUMN_ORDER} # z. B. {"CRM Anzahl Techniker": "CRM Anzahl Techniker", ...}
|
def _norm_header(s: str) -> str:
|
||||||
|
if s is None:
|
||||||
|
return ""
|
||||||
|
s = str(s)
|
||||||
|
s = s.replace("\u00A0", " ") # NBSP -> Space
|
||||||
|
s = s.replace("\u200B", "").replace("\u200E", "").replace("\u200F", "").replace("\ufeff", "") # ZWSP/RTL/BOM raus
|
||||||
|
# Control/Format Zeichen entfernen
|
||||||
|
s = "".join(ch for ch in s if unicodedata.category(ch) not in ("Cf", "Cc", "Cs"))
|
||||||
|
# Whitespace normalisieren
|
||||||
|
s = re.sub(r"\s+", " ", s).strip()
|
||||||
|
return s
|
||||||
|
|
||||||
rename_map = {}
|
norm_header = [_norm_header(h) for h in actual_header]
|
||||||
unmapped_cols = []
|
|
||||||
for col in list(temp_df.columns):
|
|
||||||
base = col.split("__dup")[0] # Duplikatsuffix entfernen
|
|
||||||
if base in canon_map:
|
|
||||||
rename_map[col] = canon_map[base]
|
|
||||||
else:
|
|
||||||
unmapped_cols.append(col)
|
|
||||||
|
|
||||||
if rename_map:
|
# Evtl. doppelte (normalisierte) Header technisch eindeutig machen
|
||||||
temp_df.rename(columns=rename_map, inplace=True)
|
seen = {}
|
||||||
|
unique_norm_header = []
|
||||||
|
for h in norm_header:
|
||||||
|
n = seen.get(h, 0)
|
||||||
|
unique_norm_header.append(h if n == 0 else f"{h}__dup{n}")
|
||||||
|
seen[h] = n + 1
|
||||||
|
|
||||||
# Debug: nicht gemappte Spalten melden (einmalig extrem hilfreich zur Ursachenanalyse)
|
# Datenzeilen auf Header-Länge bringen und direkt zu Strings casten
|
||||||
if unmapped_cols:
|
fixed_rows = []
|
||||||
self.logger.warning(
|
target_len = len(unique_norm_header)
|
||||||
"Folgende GSheet-Spalten konnten NICHT auf COLUMN_ORDER gemappt werden "
|
for r in data_rows:
|
||||||
"(vermutlich fremde/alte/abweichende Header): "
|
if len(r) < target_len:
|
||||||
+ ", ".join([f"{c!r}" for c in unmapped_cols])
|
r = r + [''] * (target_len - len(r))
|
||||||
)
|
else:
|
||||||
|
r = r[:target_len]
|
||||||
|
fixed_rows.append([str(v) for v in r])
|
||||||
|
|
||||||
# 7) Fehlende Spalten (gegenüber COLUMN_ORDER) hinzufügen
|
temp_df = pd.DataFrame(fixed_rows, columns=unique_norm_header)
|
||||||
for col_name in COLUMN_ORDER:
|
|
||||||
if col_name not in temp_df.columns:
|
|
||||||
temp_df[col_name] = ""
|
|
||||||
|
|
||||||
# 8) Final in die gewünschte Spaltenreihenfolge bringen
|
# Kanonische Namen (COLUMN_ORDER) vorbereiten: normalisiert -> Original
|
||||||
self.gsheet_df = temp_df[COLUMN_ORDER]
|
canon_map = {_norm_header(c): c for c in COLUMN_ORDER}
|
||||||
|
|
||||||
# 9) Optional: Sanity-Check auf das bekannte Problemfeld
|
# Spalten umbenennen (normalisierte -> kanonische Namen) und unmappbare loggen
|
||||||
if "CRM Anzahl Techniker" in self.gsheet_df.columns:
|
rename_map = {}
|
||||||
# Beispielhafte Debug-Ausgabe für den vom User genannten GUID-Datensatz
|
unmapped_cols = []
|
||||||
guid_col = "accountid" if "accountid" in self.gsheet_df.columns else None
|
for col in list(temp_df.columns):
|
||||||
if guid_col:
|
base = col.split("__dup")[0] # Duplikatsuffix entfernen
|
||||||
probe_guid = "0f68a69d-e330-ec11-b6e6-000d3adbc80e"
|
if base in canon_map:
|
||||||
probe_row = self.gsheet_df[self.gsheet_df[guid_col] == probe_guid]
|
rename_map[col] = canon_map[base]
|
||||||
if not probe_row.empty:
|
else:
|
||||||
val = probe_row.iloc[0]["CRM Anzahl Techniker"]
|
unmapped_cols.append(col)
|
||||||
self.logger.info(
|
|
||||||
f"Sanity-Check: GSheet['CRM Anzahl Techniker'] für {probe_guid} -> {val!r} (Typ: {type(val)})"
|
if rename_map:
|
||||||
|
temp_df.rename(columns=rename_map, inplace=True)
|
||||||
|
|
||||||
|
if unmapped_cols:
|
||||||
|
self.logger.warning(
|
||||||
|
"Folgende GSheet-Spalten konnten NICHT auf COLUMN_ORDER gemappt werden "
|
||||||
|
"(vermutlich fremde/alte/abweichende Header): "
|
||||||
|
+ ", ".join([f"{c!r}" for c in unmapped_cols])
|
||||||
)
|
)
|
||||||
|
|
||||||
self.logger.info("_load_data() abgeschlossen.")
|
# Fehlende Spalten (gegenüber COLUMN_ORDER) ergänzen
|
||||||
|
for col_name in COLUMN_ORDER:
|
||||||
|
if col_name not in temp_df.columns:
|
||||||
|
temp_df[col_name] = ""
|
||||||
|
|
||||||
|
# Final in gewünschte Reihenfolge bringen
|
||||||
|
self.gsheet_df = temp_df[COLUMN_ORDER]
|
||||||
|
|
||||||
|
# Sanity-Check für den gemeldeten Fall (nur Info-Log)
|
||||||
|
try:
|
||||||
|
if "CRM Anzahl Techniker" in self.gsheet_df.columns and "CRM ID" in self.gsheet_df.columns:
|
||||||
|
probe_guid = "0f68a69d-e330-ec11-b6e6-000d3adbc80e"
|
||||||
|
probe_row = self.gsheet_df[self.gsheet_df["CRM ID"].str.lower() == probe_guid]
|
||||||
|
if not probe_row.empty:
|
||||||
|
val = probe_row.iloc[0]["CRM Anzahl Techniker"]
|
||||||
|
self.logger.info(
|
||||||
|
f"Sanity-Check: GSheet['CRM Anzahl Techniker'] für {probe_guid} -> {val!r} (Typ: {type(val)})"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
# Nur zur Sicherheit – Sync soll nicht am Check scheitern
|
||||||
|
pass
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.critical(f"Fehler beim Laden/Umwandeln der GSheet-Daten: {e}", exc_info=True)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# ----------------------------
|
||||||
|
# ZIEL-SHEET ERMITTELN & SYNC-BASIS BESTIMMEN
|
||||||
|
# ----------------------------
|
||||||
|
self.target_sheet_name = self.sheet_handler.get_main_sheet_name()
|
||||||
|
if not self.target_sheet_name:
|
||||||
|
self.logger.critical("Konnte Namen des Ziel-Sheets nicht ermitteln. Abbruch.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# IDs bestimmen (nur auf gefüllte CRM IDs)
|
||||||
|
d365_ids = set(self.d365_df['CRM ID'].dropna()) if 'CRM ID' in self.d365_df.columns else set()
|
||||||
|
gsheet_ids = set(self.gsheet_df['CRM ID'].dropna()) if 'CRM ID' in self.gsheet_df.columns else set()
|
||||||
|
|
||||||
|
new_ids = d365_ids - gsheet_ids
|
||||||
|
existing_ids = d365_ids.intersection(gsheet_ids)
|
||||||
|
|
||||||
|
# Archivierung wird (wie bisher) übersprungen – Teil-Export angenommen
|
||||||
|
deleted_ids = set()
|
||||||
|
self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).")
|
||||||
|
|
||||||
|
self.logger.info(
|
||||||
|
f"Sync-Basis: {len(new_ids)} neu, {len(existing_ids)} vorhanden, {len(deleted_ids)} gelöscht (übersprungen)."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Ergebnisse in Objekt speichern
|
||||||
|
self.new_ids = new_ids
|
||||||
|
self.existing_ids = existing_ids
|
||||||
|
self.deleted_ids = deleted_ids
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def run_sync(self):
|
def run_sync(self):
|
||||||
"""Führt den gesamten Synchronisationsprozess aus."""
|
"""Führt den gesamten Synchronisationsprozess aus."""
|
||||||
|
|||||||
Reference in New Issue
Block a user