sync_manager.py aktualisiert

This commit is contained in:
2025-08-28 14:08:08 +00:00
parent 27155526a3
commit 184d0fb880

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python3 #!/usr/-bin/env python3
""" """
sync_manager.py sync_manager.py
@@ -9,10 +9,54 @@ gelöschte Datensätze zu identifizieren und zu verarbeiten.
import pandas as pd import pandas as pd
import logging import logging
from datetime import datetime from collections import defaultdict
from config import COLUMN_ORDER, COLUMN_MAP, Config from config import COLUMN_ORDER, COLUMN_MAP, Config
class SyncStatistics:
"""Eine einfache Klasse zum Sammeln von Statistiken während des Sync-Prozesses."""
def __init__(self):
self.new_accounts = 0
self.existing_accounts = 0
self.archived_accounts = 0
self.accounts_to_update = set()
self.field_updates = defaultdict(int)
self.conflict_accounts = set()
self.field_conflicts = defaultdict(int)
def generate_report(self):
report = [
"\n" + "="*50,
" Sync-Prozess Abschlussbericht",
"="*50,
f"| Neue Accounts hinzugefügt: | {self.new_accounts}",
f"| Bestehende Accounts analysiert: | {self.existing_accounts}",
f"| Accounts für Archivierung markiert:| {self.archived_accounts}",
"-"*50,
f"| Accounts mit Updates gesamt: | {len(self.accounts_to_update)}",
]
if self.field_updates:
report.append("| Feld-Updates im Detail:")
# Sortiert die Feld-Updates nach Häufigkeit
sorted_updates = sorted(self.field_updates.items(), key=lambda item: item[1], reverse=True)
for field, count in sorted_updates:
report.append(f"| - {field:<25} | {count} mal")
else:
report.append("| Keine Feld-Updates durchgeführt.")
report.append("-" * 50)
report.append(f"| Accounts mit Konflikten: | {len(self.conflict_accounts)}")
if self.field_conflicts:
report.append("| Feld-Konflikte im Detail:")
sorted_conflicts = sorted(self.field_conflicts.items(), key=lambda item: item[1], reverse=True)
for field, count in sorted_conflicts:
report.append(f"| - {field:<25} | {count} mal")
else:
report.append("| Keine Konflikte festgestellt.")
report.append("="*50)
return "\n".join(report)
class SyncManager: class SyncManager:
""" """
Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet. Kapselt die Logik für den Abgleich zwischen D365-Export und Google Sheet.
@@ -21,46 +65,35 @@ class SyncManager:
self.sheet_handler = sheet_handler self.sheet_handler = sheet_handler
self.d365_export_path = d365_export_path self.d365_export_path = d365_export_path
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.stats = SyncStatistics()
self.target_sheet_name = None self.target_sheet_name = None
self.d365_to_gsheet_map = { self.d365_to_gsheet_map = {
"Account Name": "CRM Name", "Account Name": "CRM Name", "Parent Account": "Parent Account Name",
"Parent Account": "Parent Account Name", "Website": "CRM Website", "City": "CRM Ort", "Country": "CRM Land",
"Website": "CRM Website", "Description FSM": "CRM Beschreibung", "Branch detail": "CRM Branche",
"City": "CRM Ort",
"Country": "CRM Land",
"Description FSM": "CRM Beschreibung",
"Branch detail": "CRM Branche",
"No. Service Technicians": "CRM Anzahl Techniker", "No. Service Technicians": "CRM Anzahl Techniker",
"Annual Revenue (Mio. €)": "CRM Umsatz", "Annual Revenue (Mio. €)": "CRM Umsatz",
"Number of Employees": "CRM Anzahl Mitarbeiter", "Number of Employees": "CRM Anzahl Mitarbeiter", "GUID": "CRM ID"
"GUID": "CRM ID"
} }
# --- ANGEPASSTE LOGIK-LISTEN ---
# Spalten, bei denen D365 (fast) immer gewinnt
self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land", self.d365_wins_cols = ["CRM Name", "Parent Account Name", "CRM Ort", "CRM Land",
"CRM Anzahl Techniker", "CRM Branche", "CRM Umsatz", "CRM Anzahl Techniker", "CRM Branche", "CRM Umsatz",
"CRM Anzahl Mitarbeiter", "CRM Beschreibung"] "CRM Anzahl Mitarbeiter", "CRM Beschreibung"]
# Spalten, bei denen unsere angereicherten Daten im GSheet Vorrang haben
self.smart_merge_cols = ["CRM Website"] self.smart_merge_cols = ["CRM Website"]
def _load_data(self): def _load_data(self):
"""Lädt und bereitet die Daten aus D365 und Google Sheets vor.""" # (Diese Funktion bleibt exakt wie in der letzten Version)
self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...") self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...")
try: try:
temp_d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('') temp_d365_df = pd.read_excel(self.d365_export_path, dtype=str).fillna('')
for d365_col in self.d365_to_gsheet_map.keys(): for d365_col in self.d365_to_gsheet_map.keys():
if d365_col not in temp_d365_df.columns: if d365_col not in temp_d365_df.columns:
raise ValueError(f"Erwartete Spalte '{d365_col}' nicht in der D365-Exportdatei gefunden.") raise ValueError(f"Erwartete Spalte '{d365_col}' nicht in der D365-Exportdatei gefunden.")
self.d365_df = temp_d365_df[list(self.d365_to_gsheet_map.keys())].copy() self.d365_df = temp_d365_df[list(self.d365_to_gsheet_map.keys())].copy()
self.d365_df.rename(columns=self.d365_to_gsheet_map, inplace=True) self.d365_df.rename(columns=self.d365_to_gsheet_map, inplace=True)
self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip().str.lower() self.d365_df['CRM ID'] = self.d365_df['CRM ID'].str.strip().str.lower()
self.d365_df = self.d365_df[self.d365_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)] self.d365_df = self.d365_df[self.d365_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)]
except Exception as e: except Exception as e:
self.logger.critical(f"Fehler beim Laden der Excel-Datei: {e}", exc_info=True) self.logger.critical(f"Fehler beim Laden der Excel-Datei: {e}", exc_info=True)
return False return False
@@ -74,14 +107,12 @@ class SyncManager:
actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1] actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1]
data_rows = all_data_with_headers[self.sheet_handler._header_rows:] data_rows = all_data_with_headers[self.sheet_handler._header_rows:]
temp_df = pd.DataFrame(data_rows) temp_df = pd.DataFrame(data_rows)
if not temp_df.empty: if not temp_df.empty:
if temp_df.shape[1] > len(actual_header): if temp_df.shape[1] > len(actual_header):
temp_df = temp_df.iloc[:, :len(actual_header)] temp_df = temp_df.iloc[:, :len(actual_header)]
temp_df.columns = actual_header temp_df.columns = actual_header
else: else:
temp_df = pd.DataFrame(columns=actual_header) temp_df = pd.DataFrame(columns=actual_header)
temp_df = temp_df.fillna('') temp_df = temp_df.fillna('')
for col_name in COLUMN_ORDER: for col_name in COLUMN_ORDER:
if col_name not in temp_df.columns: if col_name not in temp_df.columns:
@@ -117,7 +148,12 @@ class SyncManager:
self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).") self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).")
existing_ids = d365_ids.intersection(gsheet_ids) existing_ids = d365_ids.intersection(gsheet_ids)
self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.") # Statistik befüllen
self.stats.new_accounts = len(new_ids)
self.stats.archived_accounts = len(deleted_ids)
self.stats.existing_accounts = len(existing_ids)
self.logger.info(f"Sync-Analyse: {self.stats.new_accounts} neue, {self.stats.archived_accounts} zu archivierende, {self.stats.existing_accounts} bestehende Accounts.")
updates_to_batch, rows_to_append = [], [] updates_to_batch, rows_to_append = [], []
@@ -147,56 +183,52 @@ class SyncManager:
gsheet_val = str(gsheet_row[gsheet_col]).strip() gsheet_val = str(gsheet_row[gsheet_col]).strip()
trigger_update = False trigger_update = False
if gsheet_col == 'CRM Land': if gsheet_col == 'CRM Land':
d365_code_lower = d365_val.lower() d365_code_lower, gsheet_val_lower = d365_val.lower(), gsheet_val.lower()
gsheet_val_lower = gsheet_val.lower()
d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower() d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower()
if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower: if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Anzahl Techniker': elif gsheet_col == 'CRM Anzahl Techniker':
if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass
elif d365_val != gsheet_val: trigger_update = True elif d365_val != gsheet_val: trigger_update = True
elif gsheet_col == 'CRM Branche': elif gsheet_col == 'CRM Branche':
if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val: if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Umsatz': elif gsheet_col == 'CRM Umsatz':
if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val: if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Anzahl Mitarbeiter': elif gsheet_col == 'CRM Anzahl Mitarbeiter':
if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val: if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Beschreibung': elif gsheet_col == 'CRM Beschreibung':
if gsheet_row['Website Zusammenfassung'] == '' and d365_val != gsheet_val: if gsheet_row['Website Zusammenfassung'] == '' and d365_val != gsheet_val:
trigger_update = True trigger_update = True
else:
else: # Gilt nur noch für Name, Ort, Parent if d365_val != gsheet_val: trigger_update = True
if d365_val != gsheet_val:
trigger_update = True
if trigger_update: if trigger_update:
row_updates[gsheet_col] = d365_val row_updates[gsheet_col] = d365_val; needs_reeval = True
needs_reeval = True self.logger.debug(f"Update für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'")
self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'")
for gsheet_col in self.smart_merge_cols: for gsheet_col in self.smart_merge_cols:
d365_val = str(d365_row.get(gsheet_col, '')).strip() d365_val = str(d365_row.get(gsheet_col, '')).strip()
gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip() gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip()
if d365_val and not gsheet_val: if d365_val and not gsheet_val:
row_updates[gsheet_col] = d365_val row_updates[gsheet_col] = d365_val; needs_reeval = True
needs_reeval = True
elif d365_val and gsheet_val and d365_val != gsheet_val: elif d365_val and gsheet_val and d365_val != gsheet_val:
conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'") conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'")
if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages) if conflict_messages:
row_updates["SyncConflict"] = "; ".join(conflict_messages)
self.stats.conflict_accounts.add(crm_id)
for msg in conflict_messages: self.stats.field_conflicts[msg.split('_CONFLICT')[0]] += 1
if needs_reeval: row_updates["ReEval Flag"] = "x" if needs_reeval: row_updates["ReEval Flag"] = "x"
if row_updates: if row_updates:
self.stats.accounts_to_update.add(crm_id)
for field in row_updates.keys(): self.stats.field_updates[field] += 1
sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1 sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1
for col_name, value in row_updates.items(): for col_name, value in row_updates.items():
updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] }) updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] })
@@ -209,8 +241,10 @@ class SyncManager:
self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...") self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...")
self.sheet_handler.batch_update_cells(updates_to_batch) self.sheet_handler.batch_update_cells(updates_to_batch)
if not rows_to_append and not updates_to_batch: # Generiere und logge den Abschlussbericht
self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.") report = self.stats.generate_report()
self.logger.info(report)
print(report)
self.logger.info("Synchronisation erfolgreich abgeschlossen.") self.logger.info("Synchronisation erfolgreich abgeschlossen.")