sync_manager.py aktualisiert

This commit is contained in:
2025-08-28 18:37:36 +00:00
parent 44fbb618fa
commit 968438abcf

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python3 #!/usr/-bin/env python3
""" """
sync_manager.py sync_manager.py
@@ -9,7 +9,7 @@ gelöschte Datensätze zu identifizieren und zu verarbeiten.
import pandas as pd import pandas as pd
import logging import logging
import re import re, unicodedata
from collections import defaultdict from collections import defaultdict
from config import COLUMN_ORDER, COLUMN_MAP, Config from config import COLUMN_ORDER, COLUMN_MAP, Config
@@ -38,6 +38,7 @@ class SyncStatistics:
] ]
if self.field_updates: if self.field_updates:
report.append("| Feld-Updates im Detail:") report.append("| Feld-Updates im Detail:")
# Sortiert die Feld-Updates nach Häufigkeit
sorted_updates = sorted(self.field_updates.items(), key=lambda item: item[1], reverse=True) sorted_updates = sorted(self.field_updates.items(), key=lambda item: item[1], reverse=True)
for field, count in sorted_updates: for field, count in sorted_updates:
report.append(f"| - {field:<25} | {count} mal") report.append(f"| - {field:<25} | {count} mal")
@@ -82,83 +83,117 @@ class SyncManager:
"CRM Anzahl Mitarbeiter", "CRM Beschreibung"] "CRM Anzahl Mitarbeiter", "CRM Beschreibung"]
self.smart_merge_cols = ["CRM Website"] self.smart_merge_cols = ["CRM Website"]
def _normalize_header(self, header_str: str) -> str:
"""Bereinigt einen Header-String von unsichtbaren Zeichen und normalisiert Whitespace."""
if not isinstance(header_str, str): return ""
# 1. Ersetze Non-Breaking-Spaces
normalized = header_str.replace('\u00A0', ' ')
# 2. Entferne Zero-Width-Spaces und Byte Order Mark (BOM)
normalized = re.sub(r'[\u200B\u200E\u200F\ufeff]', '', normalized)
# 3. Fasse mehrere Leerzeichen zusammen und entferne führende/nachfolgende
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized
def _load_data(self): def _load_data(self):
"""Lädt und bereitet die Daten aus D365 und Google Sheets vor.""" """
self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...") Lädt Daten aus D365-Export und Google Sheet.
try: WICHTIG: Header aus dem GSheet werden normalisiert und auf kanonische Namen (COLUMN_ORDER) gemappt,
temp_d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('') damit unsichtbare Zeichen (NBSP, Zero-Width, BOM etc.) keine Schatten-Spalten erzeugen.
for d365_col in self.d365_to_gsheet_map.keys(): """
if d365_col not in temp_d365_df.columns: self.logger.info("Starte _load_data()...")
raise ValueError(f"Erwartete Spalte '{d365_col}' nicht in der D365-Exportdatei gefunden.")
self.d365_df = temp_d365_df[list(self.d365_to_gsheet_map.keys())].copy()
self.d365_df.rename(columns=self.d365_to_gsheet_map, inplace=True)
self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip().str.lower()
self.d365_df = self.d365_df[self.d365_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)]
except Exception as e:
self.logger.critical(f"Fehler beim Laden der Excel-Datei: {e}", exc_info=True)
return False
self.logger.info("Lade bestehende Daten aus dem Google Sheet...") # 1) D365-Daten laden (unverändert)
try: self.logger.debug("Lade D365-Export...")
all_data_with_headers = self.sheet_handler.get_all_data_with_headers() self.d365_df = self._load_d365_export() # erwartet bestehende Implementierung
if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows: if self.d365_df is None or self.d365_df.empty:
self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER) self.logger.warning("D365-DataFrame ist leer oder None.")
# 2) Google Sheet Rohdaten holen (mit Headern)
self.logger.debug("Lade Google Sheet Rohdaten (inkl. Header)...")
all_data_with_headers = self.sheet_handler.get_all_data_with_headers()
if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows:
self.logger.error("Google Sheet enthält keine gültige Header-Zeile.")
self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER)
return
actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1]
data_rows = all_data_with_headers[self.sheet_handler._header_rows:]
# Debug: zeige die Roh-Header repräsentiert (um unsichtbare Zeichen sichtbar zu machen)
self.logger.debug("Roh-Header (repr): " + " | ".join(repr(h) for h in actual_header))
# 3) Header-Normalisierung
def _norm_header(s: str) -> str:
if s is None:
return ""
s = str(s)
# NBSP -> Space, Zero-Width/RTL/BOM entfernen
s = s.replace("\u00A0", " ").replace("\u200B", "").replace("\u200E", "").replace("\u200F", "").replace("\ufeff", "")
# Control/Format-Zeichen entfernen
s = "".join(ch for ch in s if unicodedata.category(ch) not in ("Cf", "Cc", "Cs"))
# Whitespace normalisieren
s = re.sub(r"\s+", " ", s).strip()
return s
norm_header = [_norm_header(h) for h in actual_header]
# 4) Duplikate in den (normalisierten) Headern eindeutig machen
seen = {}
unique_norm_header = []
for h in norm_header:
n = seen.get(h, 0)
unique_norm_header.append(h if n == 0 else f"{h}__dup{n}")
seen[h] = n + 1
# 5) Datenzeilen auf Header-Länge bringen + zu Strings casten (robust ggü. zu kurzen Zeilen)
fixed_rows = []
target_len = len(unique_norm_header)
for r in data_rows:
if len(r) < target_len:
r = r + [''] * (target_len - len(r))
else: else:
# --- HIER IST DER FINALE FIX --- r = r[:target_len]
header_raw = all_data_with_headers[self.sheet_handler._header_rows - 1] fixed_rows.append([str(v) for v in r])
data_rows = all_data_with_headers[self.sheet_handler._header_rows:]
# 1. Normalisiere die gelesenen Header temp_df = pd.DataFrame(fixed_rows, columns=unique_norm_header)
header_normalized = [self._normalize_header(h) for h in header_raw]
# 2. Härtung: Logge Abweichungen für zukünftige Analysen # 6) Mapping: normalisierte Header -> kanonische Spaltennamen (COLUMN_ORDER)
for raw, norm in zip(header_raw, header_normalized): canon_map = {_norm_header(c): c for c in COLUMN_ORDER} # z. B. {"CRM Anzahl Techniker": "CRM Anzahl Techniker", ...}
if raw != norm:
self.logger.debug(f"Header normalisiert: {repr(raw)} -> '{norm}'")
# 3. Erstelle das DataFrame mit den normalisierten Headern rename_map = {}
temp_df = pd.DataFrame(data_rows, columns=header_normalized) unmapped_cols = []
for col in list(temp_df.columns):
base = col.split("__dup")[0] # Duplikatsuffix entfernen
if base in canon_map:
rename_map[col] = canon_map[base]
else:
unmapped_cols.append(col)
# 4. Stelle sicher, dass alle Spalten aus unserer Config existieren if rename_map:
for col_name in COLUMN_ORDER: temp_df.rename(columns=rename_map, inplace=True)
if col_name not in temp_df.columns:
self.logger.warning(f"Spalte '{col_name}' fehlt im GSheet und wird als leere Spalte hinzugefügt.")
temp_df[col_name] = ''
# 5. Reduziere auf die korrekte Reihenfolge und fülle leere Zellen # Debug: nicht gemappte Spalten melden (einmalig extrem hilfreich zur Ursachenanalyse)
self.gsheet_df = temp_df[COLUMN_ORDER].fillna('') if unmapped_cols:
self.logger.warning(
"Folgende GSheet-Spalten konnten NICHT auf COLUMN_ORDER gemappt werden "
"(vermutlich fremde/alte/abweichende Header): "
+ ", ".join([f"{c!r}" for c in unmapped_cols])
)
except Exception as e: # 7) Fehlende Spalten (gegenüber COLUMN_ORDER) hinzufügen
self.logger.critical(f"Fehler beim Laden/Umwandeln der GSheet-Daten: {e}", exc_info=True) for col_name in COLUMN_ORDER:
return False if col_name not in temp_df.columns:
temp_df[col_name] = ""
# Konvertiere ALLES im finalen DataFrame zu Strings, um Typenkonflikte zu vermeiden # 8) Final in die gewünschte Spaltenreihenfolge bringen
self.gsheet_df = self.gsheet_df.astype(str) self.gsheet_df = temp_df[COLUMN_ORDER]
self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip().str.lower() # 9) Optional: Sanity-Check auf das bekannte Problemfeld
initial_row_count = len(self.gsheet_df) if "CRM Anzahl Techniker" in self.gsheet_df.columns:
self.gsheet_df = self.gsheet_df[self.gsheet_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)] # Beispielhafte Debug-Ausgabe für den vom User genannten GUID-Datensatz
if initial_row_count > len(self.gsheet_df): guid_col = "accountid" if "accountid" in self.gsheet_df.columns else None
self.logger.info(f"GSheet-Daten bereinigt: {initial_row_count - len(self.gsheet_df)} Zeilen ohne gültige GUID entfernt.") if guid_col:
probe_guid = "0f68a69d-e330-ec11-b6e6-000d3adbc80e"
probe_row = self.gsheet_df[self.gsheet_df[guid_col] == probe_guid]
if not probe_row.empty:
val = probe_row.iloc[0]["CRM Anzahl Techniker"]
self.logger.info(
f"Sanity-Check: GSheet['CRM Anzahl Techniker'] für {probe_guid} -> {val!r} (Typ: {type(val)})"
)
self.logger.info(f"{len(self.d365_df)} gültige Datensätze aus D365 geladen, {len(self.gsheet_df)} gültige Datensätze im Google Sheet.") self.logger.info("_load_data() abgeschlossen.")
return True
def run_sync(self): def run_sync(self):
"""Führt den gesamten Synchronisationsprozess aus.""" """Führt den gesamten Synchronisationsprozess aus."""
# Diese Methode bleibt exakt wie in der letzten funktionierenden Version.
# Der Fix fand ausschließlich in _load_data() statt.
if not self._load_data(): return if not self._load_data(): return
self.target_sheet_name = self.sheet_handler.get_main_sheet_name() self.target_sheet_name = self.sheet_handler.get_main_sheet_name()
@@ -174,10 +209,7 @@ class SyncManager:
self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).") self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).")
existing_ids = d365_ids.intersection(gsheet_ids) existing_ids = d365_ids.intersection(gsheet_ids)
self.stats.new_accounts = len(new_ids) self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.")
self.stats.archived_accounts = len(deleted_ids)
self.stats.existing_accounts = len(existing_ids)
self.logger.info(f"Sync-Analyse: {self.stats.new_accounts} neue, {self.stats.archived_accounts} zu archivierende, {self.stats.existing_accounts} bestehende Accounts.")
updates_to_batch, rows_to_append = [], [] updates_to_batch, rows_to_append = [], []
@@ -193,56 +225,68 @@ class SyncManager:
if existing_ids: if existing_ids:
d365_indexed = self.d365_df.set_index('CRM ID') d365_indexed = self.d365_df.set_index('CRM ID')
# --- KORREKTE DATENQUELLE VERWENDEN ---
gsheet_to_update_df = self.gsheet_df[self.gsheet_df['CRM ID'].isin(existing_ids)] gsheet_to_update_df = self.gsheet_df[self.gsheet_df['CRM ID'].isin(existing_ids)]
for original_row_index, gsheet_row in gsheet_to_update_df.iterrows(): for original_row_index, gsheet_row in gsheet_to_update_df.iterrows():
crm_id = gsheet_row['CRM ID'] crm_id = gsheet_row['CRM ID']
if crm_id not in d365_indexed.index: continue if crm_id not in d365_indexed.index: continue
d365_row = d365_indexed.loc[crm_id] d365_row = d365_indexed.loc[crm_id]
row_updates, conflict_messages, needs_reeval = {}, [], False row_updates, conflict_messages, needs_reeval = {}, [], False
for gsheet_col in self.d365_wins_cols: for gsheet_col in self.d365_wins_cols:
d365_val = str(d365_row[gsheet_col]).strip() d365_val = str(d365_row[gsheet_col]).strip()
gsheet_val = str(gsheet_row[gsheet_col]).strip() gsheet_val = str(gsheet_row[gsheet_col]).strip()
trigger_update = False trigger_update = False
if gsheet_col == 'CRM Land': if gsheet_col == 'CRM Land':
d365_code_lower, gsheet_val_lower = d365_val.lower(), gsheet_val.lower() d365_code_lower = d365_val.lower()
gsheet_val_lower = gsheet_val.lower()
d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower() d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower()
if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower: if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Anzahl Techniker': elif gsheet_col == 'CRM Anzahl Techniker':
if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass
elif d365_val != gsheet_val: trigger_update = True elif d365_val != gsheet_val: trigger_update = True
elif gsheet_col == 'CRM Branche': elif gsheet_col == 'CRM Branche':
if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val: if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Umsatz': elif gsheet_col == 'CRM Umsatz':
if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val: if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Anzahl Mitarbeiter': elif gsheet_col == 'CRM Anzahl Mitarbeiter':
if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val: if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Beschreibung':
if gsheet_row['Website Zusammenfassung'] == '' and d365_val != gsheet_val:
trigger_update = True
else: else:
if d365_val != gsheet_val: trigger_update = True if d365_val != gsheet_val: trigger_update = True
if trigger_update: if trigger_update:
row_updates[gsheet_col] = d365_val; needs_reeval = True row_updates[gsheet_col] = d365_val
self.logger.debug(f"Update für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'") needs_reeval = True
self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'")
for gsheet_col in self.smart_merge_cols: for gsheet_col in self.smart_merge_cols:
d365_val = str(d365_row.get(gsheet_col, '')).strip() d365_val = str(d365_row.get(gsheet_col, '')).strip()
gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip() gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip()
if d365_val and not gsheet_val: if d365_val and not gsheet_val:
row_updates[gsheet_col] = d365_val; needs_reeval = True row_updates[gsheet_col] = d365_val
needs_reeval = True
elif d365_val and gsheet_val and d365_val != gsheet_val: elif d365_val and gsheet_val and d365_val != gsheet_val:
conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'") conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'")
if conflict_messages:
row_updates["SyncConflict"] = "; ".join(conflict_messages) if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages)
self.stats.conflict_accounts.add(crm_id)
for msg in conflict_messages: self.stats.field_conflicts[msg.split('_CONFLICT')[0]] += 1
if needs_reeval: row_updates["ReEval Flag"] = "x" if needs_reeval: row_updates["ReEval Flag"] = "x"
if row_updates: if row_updates:
self.stats.accounts_to_update.add(crm_id)
for field in row_updates.keys(): self.stats.field_updates[field] += 1
sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1 sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1
for col_name, value in row_updates.items(): for col_name, value in row_updates.items():
updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] }) updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] })
@@ -250,13 +294,14 @@ class SyncManager:
if rows_to_append: if rows_to_append:
self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...") self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...")
self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append) self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append)
if updates_to_batch: if updates_to_batch:
self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...") self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...")
self.sheet_handler.batch_update_cells(updates_to_batch) self.sheet_handler.batch_update_cells(updates_to_batch)
report = self.stats.generate_report() if not rows_to_append and not updates_to_batch:
self.logger.info(report) self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.")
print(report)
self.logger.info("Synchronisation erfolgreich abgeschlossen.") self.logger.info("Synchronisation erfolgreich abgeschlossen.")
def debug_sync(self, debug_id=None): def debug_sync(self, debug_id=None):