sync_manager.py hinzugefügt
This commit is contained in:
185
sync_manager.py
Normal file
185
sync_manager.py
Normal file
@@ -0,0 +1,185 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
sync_manager.py
|
||||
|
||||
Modul für den Datenabgleich zwischen einem D365 Excel-Export und dem Google Sheet.
|
||||
Führt einen intelligenten "Full-Sync" durch, um neue, geänderte und
|
||||
gelöschte Datensätze zu identifizieren und zu verarbeiten.
|
||||
|
||||
Enthält Logik für:
|
||||
- Smart-Merging von Feldern (z.B. Website).
|
||||
- Automatisches Setzen des Re-Eval-Flags bei Stammdatenänderungen.
|
||||
- Markieren von archivierten Datensätzen.
|
||||
- Protokollieren von Datenkonflikten.
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
# Importiere die benötigten Konfigurationen
|
||||
from config import COLUMN_ORDER, COLUMN_MAP
|
||||
|
||||
class SyncManager:
|
||||
"""
|
||||
Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet.
|
||||
"""
|
||||
def __init__(self, sheet_handler, d365_export_path):
|
||||
"""
|
||||
Initialisiert den SyncManager.
|
||||
|
||||
Args:
|
||||
sheet_handler: Eine instanziierte GoogleSheetHandler-Klasse.
|
||||
d365_export_path (str): Der Dateipfad zur D365 Excel-Exportdatei.
|
||||
"""
|
||||
self.sheet_handler = sheet_handler
|
||||
self.d365_export_path = d365_export_path
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
# Definiert, welche D365-Spalten welchen GSheet-Spalten entsprechen
|
||||
self.d365_to_gsheet_map = {
|
||||
"Account Name": "CRM Name",
|
||||
"Parent Account": "Parent Account Name",
|
||||
"Website": "CRM Website",
|
||||
"City": "CRM Ort",
|
||||
"Country": "CRM Land",
|
||||
"Description FSM": "CRM Beschreibung",
|
||||
"Branch detail": "CRM Branche",
|
||||
"No. Service Technicians": "CRM Anzahl Techniker",
|
||||
"Annual Revenue (Mio. €)": "CRM Umsatz",
|
||||
"Number of Employees": "CRM Anzahl Mitarbeiter",
|
||||
"Account": "CRM ID" # Annahme: Die GUID-Spalte in D365 heißt 'Account'
|
||||
}
|
||||
|
||||
# Definiert die Merge-Strategien für GSheet-Spalten
|
||||
self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land",
|
||||
"CRM Beschreibung", "CRM Branche", "CRM Anzahl Techniker",
|
||||
"CRM Umsatz", "CRM Anzahl Mitarbeiter"]
|
||||
self.smart_merge_cols = ["CRM Website"]
|
||||
|
||||
def _load_data(self):
|
||||
"""Lädt Daten aus der D365-Exportdatei und dem Google Sheet."""
|
||||
self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...")
|
||||
try:
|
||||
# Lade alle Daten als String, um Formatierungsfehler (besonders bei GUIDs) zu vermeiden
|
||||
self.d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('')
|
||||
# Umbenennen der GUID-Spalte basierend auf dem Mapping
|
||||
d365_guid_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == "CRM ID"), None)
|
||||
if d365_guid_col and d365_guid_col in self.d365_df.columns:
|
||||
self.d365_df.rename(columns={d365_guid_col: "CRM ID"}, inplace=True)
|
||||
else:
|
||||
raise ValueError("GUID-Spalte ('Account' oder entsprechend) nicht in der D365-Exportdatei gefunden.")
|
||||
|
||||
self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip()
|
||||
|
||||
except FileNotFoundError:
|
||||
self.logger.critical(f"FEHLER: D365-Exportdatei nicht gefunden unter: {self.d365_export_path}")
|
||||
return False
|
||||
|
||||
self.logger.info("Lade bestehende Daten aus dem Google Sheet...")
|
||||
self.gsheet_df = self.sheet_handler.get_all_data_as_dataframe()
|
||||
if self.gsheet_df is None:
|
||||
self.logger.error("Konnte keine Daten aus dem Google Sheet laden.")
|
||||
return False
|
||||
|
||||
self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip()
|
||||
|
||||
self.logger.info(f"{len(self.d365_df)} Datensätze aus D365 geladen, {len(self.gsheet_df)} im Google Sheet vorhanden.")
|
||||
return True
|
||||
|
||||
def run_sync(self):
|
||||
"""Führt den gesamten Synchronisationsprozess aus."""
|
||||
if not self._load_data():
|
||||
return
|
||||
|
||||
d365_ids = set(self.d365_df[self.d365_df['CRM ID'] != '']['CRM ID'])
|
||||
gsheet_ids = set(self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID'])
|
||||
|
||||
new_ids = d365_ids - gsheet_ids
|
||||
deleted_ids = gsheet_ids - d365_ids
|
||||
existing_ids = d365_ids.intersection(gsheet_ids)
|
||||
|
||||
self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} archivierte, {len(existing_ids)} bestehende Accounts.")
|
||||
|
||||
updates_to_batch = []
|
||||
rows_to_append = []
|
||||
|
||||
# 1. Neue Accounts verarbeiten
|
||||
if new_ids:
|
||||
new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)]
|
||||
for _, row in new_accounts_df.iterrows():
|
||||
new_row = [""] * len(COLUMN_ORDER)
|
||||
for d365_col, gsheet_col in self.d365_to_gsheet_map.items():
|
||||
if d365_col in row:
|
||||
col_idx = COLUMN_MAP[gsheet_col]['index']
|
||||
new_row[col_idx] = row[d365_col]
|
||||
rows_to_append.append(new_row)
|
||||
|
||||
# 2. Gelöschte/Archivierte Accounts verarbeiten
|
||||
if deleted_ids:
|
||||
for crm_id in deleted_ids:
|
||||
row_idx = self.gsheet_df[self.gsheet_df['CRM ID'] == crm_id].index[0]
|
||||
updates_to_batch.append({
|
||||
"range": f"{COLUMN_MAP['Archiviert']['Titel']}{row_idx + 2}", # +2 weil 1-basiert und Header
|
||||
"values": [["TRUE"]]
|
||||
})
|
||||
|
||||
# 3. Bestehende Accounts intelligent mergen
|
||||
if existing_ids:
|
||||
# Setze 'CRM ID' als Index für schnellen Zugriff
|
||||
self.d365_df.set_index('CRM ID', inplace=True)
|
||||
self.gsheet_df.set_index('CRM ID', inplace=True)
|
||||
|
||||
for crm_id in existing_ids:
|
||||
d365_row = self.d365_df.loc[crm_id]
|
||||
gsheet_row = self.gsheet_df.loc[crm_id]
|
||||
|
||||
row_updates = {}
|
||||
conflict_messages = []
|
||||
needs_reeval = False
|
||||
|
||||
# Strategie 1: D365 gewinnt immer
|
||||
for gsheet_col in self.d365_wins_cols:
|
||||
d365_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == gsheet_col), None)
|
||||
if d365_col and str(d365_row[d365_col]) != str(gsheet_row[gsheet_col]):
|
||||
row_updates[gsheet_col] = str(d365_row[d365_col])
|
||||
needs_reeval = True
|
||||
|
||||
# Strategie 2: Smart-Merge
|
||||
for gsheet_col in self.smart_merge_cols:
|
||||
d365_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == gsheet_col), None)
|
||||
d365_val = str(d365_row.get(d365_col, ''))
|
||||
gsheet_val = str(gsheet_row.get(gsheet_col, ''))
|
||||
|
||||
if d365_val and not gsheet_val:
|
||||
row_updates[gsheet_col] = d365_val
|
||||
needs_reeval = True
|
||||
elif d365_val and gsheet_val and d365_val != gsheet_val:
|
||||
conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'")
|
||||
|
||||
# Updates und Flags zusammenstellen
|
||||
if conflict_messages:
|
||||
row_updates["SyncConflict"] = "; ".join(conflict_messages)
|
||||
|
||||
if needs_reeval:
|
||||
row_updates["ReEval Flag"] = "x"
|
||||
|
||||
# Batch-Update-Objekte erstellen
|
||||
if row_updates:
|
||||
row_idx = self.gsheet_df.index.get_loc(crm_id)
|
||||
for col_name, value in row_updates.items():
|
||||
updates_to_batch.append({
|
||||
"range": f"{COLUMN_MAP[col_name]['Titel']}{row_idx + 2}",
|
||||
"values": [[value]]
|
||||
})
|
||||
|
||||
# 4. Änderungen ins Google Sheet schreiben
|
||||
if rows_to_append:
|
||||
self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...")
|
||||
self.sheet_handler.append_rows(rows_to_append)
|
||||
|
||||
if updates_to_batch:
|
||||
self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...")
|
||||
self.sheet_handler.batch_update_cells(updates_to_batch)
|
||||
|
||||
self.logger.info("Synchronisation erfolgreich abgeschlossen.")
|
||||
Reference in New Issue
Block a user