sync_manager.py aktualisiert

This commit is contained in:
2025-08-28 18:22:42 +00:00
parent 0273174a57
commit edec30652d

View File

@@ -1,4 +1,4 @@
#!/usr/-bin/env python3 #!/usr/bin/env python3
""" """
sync_manager.py sync_manager.py
@@ -9,6 +9,7 @@ gelöschte Datensätze zu identifizieren und zu verarbeiten.
import pandas as pd import pandas as pd
import logging import logging
import re
from collections import defaultdict from collections import defaultdict
from config import COLUMN_ORDER, COLUMN_MAP, Config from config import COLUMN_ORDER, COLUMN_MAP, Config
@@ -37,7 +38,6 @@ class SyncStatistics:
] ]
if self.field_updates: if self.field_updates:
report.append("| Feld-Updates im Detail:") report.append("| Feld-Updates im Detail:")
# Sortiert die Feld-Updates nach Häufigkeit
sorted_updates = sorted(self.field_updates.items(), key=lambda item: item[1], reverse=True) sorted_updates = sorted(self.field_updates.items(), key=lambda item: item[1], reverse=True)
for field, count in sorted_updates: for field, count in sorted_updates:
report.append(f"| - {field:<25} | {count} mal") report.append(f"| - {field:<25} | {count} mal")
@@ -82,6 +82,17 @@ class SyncManager:
"CRM Anzahl Mitarbeiter", "CRM Beschreibung"] "CRM Anzahl Mitarbeiter", "CRM Beschreibung"]
self.smart_merge_cols = ["CRM Website"] self.smart_merge_cols = ["CRM Website"]
def _normalize_header(self, header_str: str) -> str:
"""Bereinigt einen Header-String von unsichtbaren Zeichen und normalisiert Whitespace."""
if not isinstance(header_str, str): return ""
# 1. Ersetze Non-Breaking-Spaces
normalized = header_str.replace('\u00A0', ' ')
# 2. Entferne Zero-Width-Spaces und Byte Order Mark (BOM)
normalized = re.sub(r'[\u200B\u200E\u200F\ufeff]', '', normalized)
# 3. Fasse mehrere Leerzeichen zusammen und entferne führende/nachfolgende
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized
def _load_data(self): def _load_data(self):
"""Lädt und bereitet die Daten aus D365 und Google Sheets vor.""" """Lädt und bereitet die Daten aus D365 und Google Sheets vor."""
self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...") self.logger.info(f"Lade Daten aus D365-Export: '{self.d365_export_path}'...")
@@ -104,30 +115,37 @@ class SyncManager:
if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows: if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows:
self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER) self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER)
else: else:
actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1] # --- HIER IST DER FINALE FIX ---
header_raw = all_data_with_headers[self.sheet_handler._header_rows - 1]
data_rows = all_data_with_headers[self.sheet_handler._header_rows:] data_rows = all_data_with_headers[self.sheet_handler._header_rows:]
# --- FINALER DATAFRAME-ERSTELLUNGS-FIX --- # 1. Normalisiere die gelesenen Header
# Wir erstellen das DataFrame aus einem Dictionary, das ist am robustesten. header_normalized = [self._normalize_header(h) for h in header_raw]
data_dict = {header_col: [] for header_col in actual_header}
for row in data_rows:
for i, header_col in enumerate(actual_header):
# Fülle fehlende Werte auf, wenn eine Zeile zu kurz ist
value = row[i] if i < len(row) else ''
data_dict[header_col].append(str(value)) # Sofort zu String konvertieren
temp_df = pd.DataFrame(data_dict) # 2. Härtung: Logge Abweichungen für zukünftige Analysen
for raw, norm in zip(header_raw, header_normalized):
if raw != norm:
self.logger.debug(f"Header normalisiert: {repr(raw)} -> '{norm}'")
# 3. Erstelle das DataFrame mit den normalisierten Headern
temp_df = pd.DataFrame(data_rows, columns=header_normalized)
# 4. Stelle sicher, dass alle Spalten aus unserer Config existieren
for col_name in COLUMN_ORDER: for col_name in COLUMN_ORDER:
if col_name not in temp_df.columns: if col_name not in temp_df.columns:
self.logger.warning(f"Spalte '{col_name}' fehlt im GSheet und wird als leere Spalte hinzugefügt.")
temp_df[col_name] = '' temp_df[col_name] = ''
self.gsheet_df = temp_df[COLUMN_ORDER] # 5. Reduziere auf die korrekte Reihenfolge und fülle leere Zellen
self.gsheet_df = temp_df[COLUMN_ORDER].fillna('')
except Exception as e: except Exception as e:
self.logger.critical(f"Fehler beim Laden/Umwandeln der GSheet-Daten: {e}", exc_info=True) self.logger.critical(f"Fehler beim Laden/Umwandeln der GSheet-Daten: {e}", exc_info=True)
return False return False
# Konvertiere ALLES im finalen DataFrame zu Strings, um Typenkonflikte zu vermeiden
self.gsheet_df = self.gsheet_df.astype(str)
self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip().str.lower() self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip().str.lower()
initial_row_count = len(self.gsheet_df) initial_row_count = len(self.gsheet_df)
self.gsheet_df = self.gsheet_df[self.gsheet_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)] self.gsheet_df = self.gsheet_df[self.gsheet_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)]
@@ -139,6 +157,8 @@ class SyncManager:
def run_sync(self): def run_sync(self):
"""Führt den gesamten Synchronisationsprozess aus.""" """Führt den gesamten Synchronisationsprozess aus."""
# Diese Methode bleibt exakt wie in der letzten funktionierenden Version.
# Der Fix fand ausschließlich in _load_data() statt.
if not self._load_data(): return if not self._load_data(): return
self.target_sheet_name = self.sheet_handler.get_main_sheet_name() self.target_sheet_name = self.sheet_handler.get_main_sheet_name()
@@ -154,7 +174,10 @@ class SyncManager:
self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).") self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).")
existing_ids = d365_ids.intersection(gsheet_ids) existing_ids = d365_ids.intersection(gsheet_ids)
self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.") self.stats.new_accounts = len(new_ids)
self.stats.archived_accounts = len(deleted_ids)
self.stats.existing_accounts = len(existing_ids)
self.logger.info(f"Sync-Analyse: {self.stats.new_accounts} neue, {self.stats.archived_accounts} zu archivierende, {self.stats.existing_accounts} bestehende Accounts.")
updates_to_batch, rows_to_append = [], [] updates_to_batch, rows_to_append = [], []
@@ -170,68 +193,56 @@ class SyncManager:
if existing_ids: if existing_ids:
d365_indexed = self.d365_df.set_index('CRM ID') d365_indexed = self.d365_df.set_index('CRM ID')
# --- KORREKTE DATENQUELLE VERWENDEN ---
gsheet_to_update_df = self.gsheet_df[self.gsheet_df['CRM ID'].isin(existing_ids)] gsheet_to_update_df = self.gsheet_df[self.gsheet_df['CRM ID'].isin(existing_ids)]
for original_row_index, gsheet_row in gsheet_to_update_df.iterrows(): for original_row_index, gsheet_row in gsheet_to_update_df.iterrows():
crm_id = gsheet_row['CRM ID'] crm_id = gsheet_row['CRM ID']
if crm_id not in d365_indexed.index: continue if crm_id not in d365_indexed.index: continue
d365_row = d365_indexed.loc[crm_id] d365_row = d365_indexed.loc[crm_id]
row_updates, conflict_messages, needs_reeval = {}, [], False row_updates, conflict_messages, needs_reeval = {}, [], False
for gsheet_col in self.d365_wins_cols: for gsheet_col in self.d365_wins_cols:
d365_val = str(d365_row[gsheet_col]).strip() d365_val = str(d365_row[gsheet_col]).strip()
gsheet_val = str(gsheet_row[gsheet_col]).strip() gsheet_val = str(gsheet_row[gsheet_col]).strip()
trigger_update = False trigger_update = False
if gsheet_col == 'CRM Land': if gsheet_col == 'CRM Land':
d365_code_lower = d365_val.lower() d365_code_lower, gsheet_val_lower = d365_val.lower(), gsheet_val.lower()
gsheet_val_lower = gsheet_val.lower()
d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower() d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower()
if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower: if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Anzahl Techniker': elif gsheet_col == 'CRM Anzahl Techniker':
if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass
elif d365_val != gsheet_val: trigger_update = True elif d365_val != gsheet_val: trigger_update = True
elif gsheet_col == 'CRM Branche': elif gsheet_col == 'CRM Branche':
if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val: if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Umsatz': elif gsheet_col == 'CRM Umsatz':
if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val: if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Anzahl Mitarbeiter': elif gsheet_col == 'CRM Anzahl Mitarbeiter':
if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val: if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val:
trigger_update = True trigger_update = True
elif gsheet_col == 'CRM Beschreibung':
if gsheet_row['Website Zusammenfassung'] == '' and d365_val != gsheet_val:
trigger_update = True
else: else:
if d365_val != gsheet_val: trigger_update = True if d365_val != gsheet_val: trigger_update = True
if trigger_update: if trigger_update:
row_updates[gsheet_col] = d365_val row_updates[gsheet_col] = d365_val; needs_reeval = True
needs_reeval = True self.logger.debug(f"Update für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'")
self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'")
for gsheet_col in self.smart_merge_cols: for gsheet_col in self.smart_merge_cols:
d365_val = str(d365_row.get(gsheet_col, '')).strip() d365_val = str(d365_row.get(gsheet_col, '')).strip()
gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip() gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip()
if d365_val and not gsheet_val: if d365_val and not gsheet_val:
row_updates[gsheet_col] = d365_val row_updates[gsheet_col] = d365_val; needs_reeval = True
needs_reeval = True
elif d365_val and gsheet_val and d365_val != gsheet_val: elif d365_val and gsheet_val and d365_val != gsheet_val:
conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'") conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'")
if conflict_messages:
if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages) row_updates["SyncConflict"] = "; ".join(conflict_messages)
self.stats.conflict_accounts.add(crm_id)
for msg in conflict_messages: self.stats.field_conflicts[msg.split('_CONFLICT')[0]] += 1
if needs_reeval: row_updates["ReEval Flag"] = "x" if needs_reeval: row_updates["ReEval Flag"] = "x"
if row_updates: if row_updates:
self.stats.accounts_to_update.add(crm_id)
for field in row_updates.keys(): self.stats.field_updates[field] += 1
sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1 sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1
for col_name, value in row_updates.items(): for col_name, value in row_updates.items():
updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] }) updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] })
@@ -239,14 +250,13 @@ class SyncManager:
if rows_to_append: if rows_to_append:
self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...") self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...")
self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append) self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append)
if updates_to_batch: if updates_to_batch:
self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...") self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...")
self.sheet_handler.batch_update_cells(updates_to_batch) self.sheet_handler.batch_update_cells(updates_to_batch)
if not rows_to_append and not updates_to_batch: report = self.stats.generate_report()
self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.") self.logger.info(report)
print(report)
self.logger.info("Synchronisation erfolgreich abgeschlossen.") self.logger.info("Synchronisation erfolgreich abgeschlossen.")
def debug_sync(self, debug_id=None): def debug_sync(self, debug_id=None):