246 lines
12 KiB
Python
246 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
sync_manager.py
|
|
|
|
Modul für den Datenabgleich zwischen einem D365 Excel-Export und dem Google Sheet.
|
|
Führt einen intelligenten "Full-Sync" durch, um neue, geänderte und
|
|
gelöschte Datensätze zu identifizieren und zu verarbeiten.
|
|
"""
|
|
|
|
import pandas as pd
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
from config import COLUMN_ORDER, COLUMN_MAP
|
|
|
|
class SyncManager:
|
|
"""
|
|
Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet.
|
|
"""
|
|
def __init__(self, sheet_handler, d365_export_path):
|
|
self.sheet_handler = sheet_handler
|
|
self.d365_export_path = d365_export_path
|
|
self.logger = logging.getLogger(__name__)
|
|
self.target_sheet_name = None
|
|
|
|
self.d365_to_gsheet_map = {
|
|
"Account Name": "CRM Name",
|
|
"Parent Account": "Parent Account Name",
|
|
"Website": "CRM Website",
|
|
"City": "CRM Ort",
|
|
"Country": "CRM Land",
|
|
"Description FSM": "CRM Beschreibung",
|
|
"Branch detail": "CRM Branche",
|
|
"No. Service Technicians": "CRM Anzahl Techniker",
|
|
"Annual Revenue (Mio. €)": "CRM Umsatz",
|
|
"Number of Employees": "CRM Anzahl Mitarbeiter",
|
|
"GUID": "CRM ID"
|
|
}
|
|
|
|
self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land",
|
|
"CRM Beschreibung", "CRM Branche", "CRM Anzahl Techniker",
|
|
"CRM Umsatz", "CRM Anzahl Mitarbeiter"]
|
|
self.smart_merge_cols = ["CRM Website"]
|
|
|
|
def _load_data(self):
|
|
"""Lädt und bereitet die Daten aus D365 und Google Sheets vor."""
|
|
self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...")
|
|
try:
|
|
self.d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('')
|
|
d365_guid_col = next((k for k, v in self.d365_to_gsheet_map.items() if v == "CRM ID"), None)
|
|
|
|
if d365_guid_col and d365_guid_col in self.d365_df.columns:
|
|
self.d365_df.rename(columns={d365_guid_col: "CRM ID"}, inplace=True)
|
|
else:
|
|
raise ValueError(f"GUID-Spalte ('{d365_guid_col}') nicht in D365-Export gefunden.")
|
|
|
|
self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip().str.lower()
|
|
self.d365_df = self.d365_df[self.d365_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)]
|
|
|
|
except Exception as e:
|
|
self.logger.critical(f"Fehler beim Laden der Excel-Datei: {e}", exc_info=True)
|
|
return False
|
|
|
|
self.logger.info("Lade bestehende Daten aus dem Google Sheet...")
|
|
try:
|
|
all_data_with_headers = self.sheet_handler.get_all_data_with_headers()
|
|
if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows:
|
|
self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER).fillna('')
|
|
else:
|
|
actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1]
|
|
data_rows = all_data_with_headers[self.sheet_handler._header_rows:]
|
|
|
|
temp_df = pd.DataFrame(data_rows)
|
|
if temp_df.empty:
|
|
temp_df = pd.DataFrame(columns=actual_header)
|
|
else:
|
|
if temp_df.shape[1] < len(actual_header):
|
|
missing_cols = len(actual_header) - temp_df.shape[1]
|
|
for i in range(missing_cols): temp_df[f'temp_{i}'] = ''
|
|
temp_df.columns = actual_header
|
|
|
|
temp_df = temp_df.fillna('')
|
|
|
|
for col_name in COLUMN_ORDER:
|
|
if col_name not in temp_df.columns:
|
|
temp_df[col_name] = ''
|
|
|
|
self.gsheet_df = temp_df[COLUMN_ORDER]
|
|
except Exception as e:
|
|
self.logger.critical(f"Fehler beim Laden/Umwandeln der GSheet-Daten: {e}", exc_info=True)
|
|
return False
|
|
|
|
self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip().str.lower()
|
|
|
|
# --- FINALE BEREINIGUNG: Nur echte GUIDs im GSheet DataFrame behalten ---
|
|
initial_row_count = len(self.gsheet_df)
|
|
self.gsheet_df = self.gsheet_df[self.gsheet_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)]
|
|
final_row_count = len(self.gsheet_df)
|
|
self.logger.info(f"GSheet-Daten bereinigt: {initial_row_count - final_row_count} Zeilen ohne gültige GUID entfernt.")
|
|
|
|
self.logger.info(f"{len(self.d365_df)} gültige Datensätze aus D365 geladen, {len(self.gsheet_df)} gültige Datensätze im Google Sheet.")
|
|
return True
|
|
|
|
def run_sync(self):
|
|
"""Führt den gesamten Synchronisationsprozess aus."""
|
|
if not self._load_data():
|
|
return
|
|
|
|
self.target_sheet_name = self.sheet_handler.get_main_sheet_name()
|
|
if not self.target_sheet_name:
|
|
self.logger.critical("Konnte Namen des Ziel-Sheets nicht ermitteln. Abbruch.")
|
|
return
|
|
|
|
d365_ids = set(self.d365_df['CRM ID'].dropna())
|
|
gsheet_ids = set(self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID'].dropna())
|
|
|
|
new_ids = d365_ids - gsheet_ids
|
|
# --- LOGIK-KORREKTUR START ---
|
|
# Die 'deleted_ids' Logik ist nur korrekt, wenn der D365-Export ein VOLLSTÄNDIGER
|
|
# Export aller aktiven Accounts ist. Da wir hier mit Stichproben arbeiten,
|
|
# deaktivieren wir diesen Schritt, um fälschliches Archivieren zu verhindern.
|
|
deleted_ids = set() # Wir setzen es auf ein leeres Set.
|
|
self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).")
|
|
# --- LOGIK-KORREKTUR ENDE ---
|
|
existing_ids = d365_ids.intersection(gsheet_ids)
|
|
|
|
self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.")
|
|
|
|
updates_to_batch, rows_to_append = [], []
|
|
|
|
if new_ids:
|
|
# ... (Rest der Funktion bleibt exakt gleich) ...
|
|
new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)]
|
|
for _, row in new_accounts_df.iterrows():
|
|
new_row_data = [""] * len(COLUMN_ORDER)
|
|
for gsheet_col in self.d365_to_gsheet_map.values():
|
|
if gsheet_col in row:
|
|
col_idx = COLUMN_MAP[gsheet_col]['index']
|
|
new_row_data[col_idx] = row[gsheet_col]
|
|
rows_to_append.append(new_row_data)
|
|
|
|
if deleted_ids: # Dieser Block wird nun nie mehr betreten
|
|
for crm_id in deleted_ids:
|
|
row_indices = self.gsheet_df[self.gsheet_df['CRM ID'] == crm_id].index
|
|
if not row_indices.empty:
|
|
row_idx = row_indices[0]
|
|
updates_to_batch.append({ "range": f"{COLUMN_MAP['Archiviert']['Titel']}{row_idx + 2}", "values": [["TRUE"]] })
|
|
|
|
if existing_ids:
|
|
d365_indexed = self.d365_df.set_index('CRM ID')
|
|
gsheet_indexed = self.gsheet_df.set_index('CRM ID')
|
|
|
|
for crm_id in existing_ids:
|
|
d365_row = d365_indexed.loc[crm_id]
|
|
gsheet_row = gsheet_indexed.loc[crm_id]
|
|
|
|
row_updates = {}
|
|
conflict_messages = []
|
|
needs_reeval = False
|
|
|
|
for gsheet_col in self.d365_wins_cols:
|
|
d365_val = str(d365_row[gsheet_col])
|
|
gsheet_val = str(gsheet_row[gsheet_col])
|
|
if d365_val != gsheet_val:
|
|
row_updates[gsheet_col] = d365_val
|
|
needs_reeval = True
|
|
self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'")
|
|
|
|
for gsheet_col in self.smart_merge_cols:
|
|
d365_val = str(d365_row.get(gsheet_col, ''))
|
|
gsheet_val = str(gsheet_row.get(gsheet_col, ''))
|
|
|
|
if d365_val and not gsheet_val:
|
|
row_updates[gsheet_col] = d365_val
|
|
needs_reeval = True
|
|
self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}' (GSheet war leer): D365='{d365_val}'")
|
|
elif d365_val and gsheet_val and d365_val != gsheet_val:
|
|
conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'")
|
|
|
|
if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages)
|
|
if needs_reeval: row_updates["ReEval Flag"] = "x"
|
|
|
|
if row_updates:
|
|
row_idx = gsheet_indexed.index.get_loc(crm_id)
|
|
for col_name, value in row_updates.items():
|
|
updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{row_idx + 2}", "values": [[value]] })
|
|
|
|
if rows_to_append:
|
|
self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...")
|
|
self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append)
|
|
|
|
if updates_to_batch:
|
|
self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...")
|
|
self.sheet_handler.batch_update_cells(updates_to_batch)
|
|
|
|
if not rows_to_append and not updates_to_batch:
|
|
self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.")
|
|
|
|
self.logger.info("Synchronisation erfolgreich abgeschlossen.")
|
|
|
|
def debug_sync(self):
|
|
"""
|
|
Führt eine reine Analyse des Sync-Prozesses durch, ohne Daten zu schreiben.
|
|
Gibt detaillierte Debug-Informationen im Log aus.
|
|
"""
|
|
self.logger.info("========== START SYNC-DEBUG-MODUS ==========")
|
|
|
|
if not self._load_data():
|
|
self.logger.error("Debug abgebrochen, da das Laden der Daten fehlschlug.")
|
|
return
|
|
|
|
# 1. Analyse des D365 DataFrames
|
|
self.logger.info("\n--- D365 DataFrame Analyse ---")
|
|
d365_ids_series = self.d365_df['CRM ID'].dropna()
|
|
d365_ids = set(d365_ids_series)
|
|
self.logger.info(f"Anzahl Zeilen im D365 DataFrame: {len(self.d365_df)}")
|
|
self.logger.info(f"Anzahl nicht-leerer GUIDs in D365: {len(d365_ids)}")
|
|
self.logger.info(f"Erste 5 D365 GUIDs:\n{d365_ids_series.head().to_string()}")
|
|
self.logger.info(f"Letzte 5 D365 GUIDs:\n{d365_ids_series.tail().to_string()}")
|
|
|
|
# 2. Analyse des Google Sheet DataFrames
|
|
self.logger.info("\n--- Google Sheet DataFrame Analyse ---")
|
|
gsheet_ids_series = self.gsheet_df[self.gsheet_df['CRM ID'] != '']['CRM ID'].dropna()
|
|
gsheet_ids = set(gsheet_ids_series)
|
|
self.logger.info(f"Anzahl Zeilen im GSheet DataFrame: {len(self.gsheet_df)}")
|
|
self.logger.info(f"Anzahl nicht-leerer GUIDs im GSheet: {len(gsheet_ids)}")
|
|
self.logger.info(f"Erste 5 GSheet GUIDs:\n{gsheet_ids_series.head().to_string()}")
|
|
self.logger.info(f"Letzte 5 GSheet GUIDs:\n{gsheet_ids_series.tail().to_string()}")
|
|
|
|
# 3. Analyse der Set-Operationen
|
|
self.logger.info("\n--- Set-Analyse (Vergleich) ---")
|
|
new_ids = d365_ids - gsheet_ids
|
|
deleted_ids = gsheet_ids - d365_ids
|
|
existing_ids = d365_ids.intersection(gsheet_ids)
|
|
|
|
self.logger.info(f"Anzahl neuer IDs (in D365, nicht in GSheet): {len(new_ids)}")
|
|
self.logger.info(f"Anzahl zu archivierender IDs (in GSheet, nicht in D365): {len(deleted_ids)}")
|
|
self.logger.info(f"Größe der Schnittmenge (in beiden vorhanden): {len(existing_ids)}")
|
|
|
|
if len(existing_ids) < 90 and len(d365_ids) > 90:
|
|
self.logger.warning("WARNUNG: Die Schnittmenge ist unerwartet klein. Dies bestätigt den Verdacht eines Matching-Problems!")
|
|
# Zeige ein paar IDs, die hätten übereinstimmen sollen
|
|
if gsheet_ids:
|
|
self.logger.info(f"Beispiel-GUID aus GSheet, die nicht gefunden wurde: {next(iter(gsheet_ids))}")
|
|
|
|
self.logger.info("========== ENDE SYNC-DEBUG-MODUS ==========") |