Files
Brancheneinstufung2/sync_manager.py
2025-08-27 18:59:02 +00:00

214 lines
10 KiB
Python

#!/usr/bin/env python3
"""
sync_manager.py
Modul für den Datenabgleich zwischen einem D365 Excel-Export und dem Google Sheet.
Führt einen intelligenten "Full-Sync" durch, um neue, geänderte und
gelöschte Datensätze zu identifizieren und zu verarbeiten.
Enthält Logik für:
- Smart-Merging von Feldern (z.B. Website).
- Automatisches Setzen des Re-Eval-Flags bei Stammdatenänderungen.
- Markieren von archivierten Datensätzen.
- Protokollieren von Datenkonflikten.
"""
import pandas as pd
import logging
from datetime import datetime
# Importiere die benötigten Konfigurationen
from config import COLUMN_ORDER, COLUMN_MAP
class SyncManager:
"""
Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet.
"""
def __init__(self, sheet_handler, d365_export_path):
"""
Initialisiert den SyncManager.
Args:
sheet_handler: Eine instanziierte GoogleSheetHandler-Klasse.
d365_export_path (str): Der Dateipfad zur D365 Excel-Exportdatei.
"""
self.sheet_handler = sheet_handler
self.d365_export_path = d365_export_path
self.logger = logging.getLogger(__name__)
# Definiert, welche D365-Spalten welchen GSheet-Spalten entsprechen.
# Dies ist das zentrale Mapping, das auf deiner D365-View basiert.
self.d365_to_gsheet_map = {
"Account Name": "CRM Name",
"Parent Account": "Parent Account Name",
"Website": "CRM Website",
"City": "CRM Ort",
"Country": "CRM Land",
"Description FSM": "CRM Beschreibung",
"Branch detail": "CRM Branche",
"No. Service Technicians": "CRM Anzahl Techniker",
"Annual Revenue (Mio. €)": "CRM Umsatz",
"Number of Employees": "CRM Anzahl Mitarbeiter",
# Dies ist die wichtigste Zeile! Annahme: Die GUID-Spalte im Export heißt 'Account'.
# Falls sie anders heißt (z.B. 'Account ID'), muss nur dieser String angepasst werden.
"GUID": "CRM ID"
}
# Definiert die Merge-Strategien für GSheet-Spalten
self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land",
"CRM Beschreibung", "CRM Branche", "CRM Anzahl Techniker",
"CRM Umsatz", "CRM Anzahl Mitarbeiter"]
self.smart_merge_cols = ["CRM Website"]
def _load_data(self):
"""Lädt Daten aus der D365-Exportdatei und dem Google Sheet."""
self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...")
try:
# Lade alle Daten als String, um Formatierungsfehler (besonders bei GUIDs) zu vermeiden
self.d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('')
# Finde den Spaltennamen für die GUID in der Export-Datei basierend auf unserem Mapping
d365_guid_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == "CRM ID"), None)
if d365_guid_col and d365_guid_col in self.d365_df.columns:
self.d365_df.rename(columns={d365_guid_col: "CRM ID"}, inplace=True)
else:
self.logger.critical(f"FEHLER: Die erwartete GUID-Spalte '{d365_guid_col}' wurde in der D365-Exportdatei nicht gefunden.")
raise ValueError(f"GUID-Spalte ('{d365_guid_col}') nicht in der D365-Exportdatei gefunden.")
# Bereinige CRM IDs und entferne Zeilen ohne gültige ID
self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip()
self.d365_df.dropna(subset=['CRM ID'], inplace=True)
self.d365_df = self.d365_df[self.d365_df['CRM ID'] != '']
except FileNotFoundError:
self.logger.critical(f"FEHLER: D365-Exportdatei nicht gefunden unter: {self.d365_export_path}")
return False
except Exception as e:
self.logger.critical(f"Ein unerwarteter Fehler ist beim Laden der Excel-Datei aufgetreten: {e}", exc_info=True)
return False
self.logger.info("Lade bestehende Daten aus dem Google Sheet...")
# --- KORREKTURBLOCK START ---
try:
# 1. Rufe die korrekte, existierende Methode auf. Annahme: sie gibt [header, zeile1, zeile2, ...] zurück
all_data_with_headers = self.sheet_handler.get_all_data_with_headers()
if not all_data_with_headers or len(all_data_with_headers) < 2:
self.logger.warning("Google Sheet scheint leer zu sein oder enthält nur Header. Erstelle leeres DataFrame.")
self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER)
else:
# 2. Wandle die Liste von Listen in ein DataFrame um.
# Wir nutzen COLUMN_ORDER aus der config, um die Spaltennamen zu garantieren.
self.gsheet_df = pd.DataFrame(all_data_with_headers[1:], columns=COLUMN_ORDER)
except Exception as e:
self.logger.critical(f"Fehler beim Laden oder Umwandeln der Google Sheet Daten: {e}", exc_info=True)
return False
# --- KORREKTURBLOCK ENDE ---
self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip()
self.logger.info(f"{len(self.d365_df)} gültige Datensätze aus D365 geladen, {len(self.gsheet_df)} im Google Sheet vorhanden.")
return True
def run_sync(self):
"""Führt den gesamten Synchronisationsprozess aus."""
if not self._load_data():
return
d365_ids = set(self.d365_df['CRM ID'])
gsheet_ids = set(self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID'])
new_ids = d365_ids - gsheet_ids
deleted_ids = gsheet_ids - d365_ids
existing_ids = d365_ids.intersection(gsheet_ids)
self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.")
updates_to_batch = []
rows_to_append = []
# 1. Neue Accounts verarbeiten
if new_ids:
new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)]
for _, row in new_accounts_df.iterrows():
new_row_data = [""] * len(COLUMN_ORDER)
for d365_col, gsheet_col in self.d365_to_gsheet_map.items():
if d365_col in row:
col_idx = COLUMN_MAP[gsheet_col]['index']
new_row_data[col_idx] = row[d365_col]
rows_to_append.append(new_row_data)
# 2. Gelöschte/Archivierte Accounts verarbeiten
if deleted_ids:
for crm_id in deleted_ids:
row_indices = self.gsheet_df[self.gsheet_df['CRM ID'] == crm_id].index
if not row_indices.empty:
row_idx = row_indices[0]
updates_to_batch.append({
"range": f"{COLUMN_MAP['Archiviert']['Titel']}{row_idx + 2}",
"values": [["TRUE"]]
})
# 3. Bestehende Accounts intelligent mergen
if existing_ids:
d365_indexed = self.d365_df.set_index('CRM ID')
gsheet_indexed = self.gsheet_df.set_index('CRM ID')
for crm_id in existing_ids:
d365_row = d365_indexed.loc[crm_id]
gsheet_row = gsheet_indexed.loc[crm_id]
row_updates = {}
conflict_messages = []
needs_reeval = False
# Strategie 1: D365 gewinnt immer
for gsheet_col in self.d365_wins_cols:
d365_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == gsheet_col), None)
if d365_col and d365_col in d365_row and str(d365_row[d365_col]) != str(gsheet_row[gsheet_col]):
row_updates[gsheet_col] = str(d365_row[d365_col])
needs_reeval = True
# Strategie 2: Smart-Merge für spezielle Spalten
for gsheet_col in self.smart_merge_cols:
d365_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == gsheet_col), None)
d365_val = str(d365_row.get(d365_col, ''))
gsheet_val = str(gsheet_row.get(gsheet_col, ''))
if d365_val and not gsheet_val:
row_updates[gsheet_col] = d365_val
needs_reeval = True
elif d365_val and gsheet_val and d365_val != gsheet_val:
conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'")
# Updates und Flags zusammenstellen
if conflict_messages:
row_updates["SyncConflict"] = "; ".join(conflict_messages)
if needs_reeval:
row_updates["ReEval Flag"] = "x"
# Batch-Update-Objekte für die aktuelle Zeile erstellen
if row_updates:
row_idx = gsheet_indexed.index.get_loc(crm_id)
for col_name, value in row_updates.items():
updates_to_batch.append({
"range": f"{COLUMN_MAP[col_name]['Titel']}{row_idx + 2}",
"values": [[value]]
})
# 4. Änderungen ins Google Sheet schreiben
if rows_to_append:
self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...")
# --- KORREKTUR HIER ---
self.sheet_handler.append_rows(values=rows_to_append)
if updates_to_batch:
self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...")
self.sheet_handler.batch_update_cells(updates_to_batch)
if not rows_to_append and not updates_to_batch:
self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.")
self.logger.info("Synchronisation erfolgreich abgeschlossen.")