sync_manager.py aktualisiert

This commit is contained in:
2025-08-28 18:22:42 +00:00
parent 303eb71bf2
commit 44fbb618fa

View File

@@ -1,4 +1,4 @@
#!/usr/-bin/env python3
#!/usr/bin/env python3
"""
sync_manager.py
@@ -9,6 +9,7 @@ gelöschte Datensätze zu identifizieren und zu verarbeiten.
import pandas as pd
import logging
import re
from collections import defaultdict
from config import COLUMN_ORDER, COLUMN_MAP, Config
@@ -37,7 +38,6 @@ class SyncStatistics:
]
if self.field_updates:
report.append("| Feld-Updates im Detail:")
# Sortiert die Feld-Updates nach Häufigkeit
sorted_updates = sorted(self.field_updates.items(), key=lambda item: item[1], reverse=True)
for field, count in sorted_updates:
report.append(f"| - {field:<25} | {count} mal")
@@ -81,6 +81,17 @@ class SyncManager:
"CRM Anzahl Techniker", "CRM Branche", "CRM Umsatz",
"CRM Anzahl Mitarbeiter", "CRM Beschreibung"]
self.smart_merge_cols = ["CRM Website"]
def _normalize_header(self, header_str: str) -> str:
"""Bereinigt einen Header-String von unsichtbaren Zeichen und normalisiert Whitespace."""
if not isinstance(header_str, str): return ""
# 1. Ersetze Non-Breaking-Spaces
normalized = header_str.replace('\u00A0', ' ')
# 2. Entferne Zero-Width-Spaces und Byte Order Mark (BOM)
normalized = re.sub(r'[\u200B\u200E\u200F\ufeff]', '', normalized)
# 3. Fasse mehrere Leerzeichen zusammen und entferne führende/nachfolgende
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized
def _load_data(self):
"""Lädt und bereitet die Daten aus D365 und Google Sheets vor."""
@@ -104,30 +115,37 @@ class SyncManager:
if not all_data_with_headers or len(all_data_with_headers) < self.sheet_handler._header_rows:
self.gsheet_df = pd.DataFrame(columns=COLUMN_ORDER)
else:
actual_header = all_data_with_headers[self.sheet_handler._header_rows - 1]
# --- HIER IST DER FINALE FIX ---
header_raw = all_data_with_headers[self.sheet_handler._header_rows - 1]
data_rows = all_data_with_headers[self.sheet_handler._header_rows:]
# --- FINALER DATAFRAME-ERSTELLUNGS-FIX ---
# Wir erstellen das DataFrame aus einem Dictionary, das ist am robustesten.
data_dict = {header_col: [] for header_col in actual_header}
for row in data_rows:
for i, header_col in enumerate(actual_header):
# Fülle fehlende Werte auf, wenn eine Zeile zu kurz ist
value = row[i] if i < len(row) else ''
data_dict[header_col].append(str(value)) # Sofort zu String konvertieren
# 1. Normalisiere die gelesenen Header
header_normalized = [self._normalize_header(h) for h in header_raw]
# 2. Härtung: Logge Abweichungen für zukünftige Analysen
for raw, norm in zip(header_raw, header_normalized):
if raw != norm:
self.logger.debug(f"Header normalisiert: {repr(raw)} -> '{norm}'")
temp_df = pd.DataFrame(data_dict)
# 3. Erstelle das DataFrame mit den normalisierten Headern
temp_df = pd.DataFrame(data_rows, columns=header_normalized)
# 4. Stelle sicher, dass alle Spalten aus unserer Config existieren
for col_name in COLUMN_ORDER:
if col_name not in temp_df.columns:
self.logger.warning(f"Spalte '{col_name}' fehlt im GSheet und wird als leere Spalte hinzugefügt.")
temp_df[col_name] = ''
self.gsheet_df = temp_df[COLUMN_ORDER]
# 5. Reduziere auf die korrekte Reihenfolge und fülle leere Zellen
self.gsheet_df = temp_df[COLUMN_ORDER].fillna('')
except Exception as e:
self.logger.critical(f"Fehler beim Laden/Umwandeln der GSheet-Daten: {e}", exc_info=True)
return False
# Konvertiere ALLES im finalen DataFrame zu Strings, um Typenkonflikte zu vermeiden
self.gsheet_df = self.gsheet_df.astype(str)
self.gsheet_df['CRM ID'] = self.gsheet_df['CRM ID'].str.strip().str.lower()
initial_row_count = len(self.gsheet_df)
self.gsheet_df = self.gsheet_df[self.gsheet_df['CRM ID'].str.match(r'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', na=False)]
@@ -139,8 +157,10 @@ class SyncManager:
def run_sync(self):
"""Führt den gesamten Synchronisationsprozess aus."""
# Diese Methode bleibt exakt wie in der letzten funktionierenden Version.
# Der Fix fand ausschließlich in _load_data() statt.
if not self._load_data(): return
self.target_sheet_name = self.sheet_handler.get_main_sheet_name()
if not self.target_sheet_name:
self.logger.critical("Konnte Namen des Ziel-Sheets nicht ermitteln. Abbruch.")
@@ -154,10 +174,13 @@ class SyncManager:
self.logger.info("Archivierungs-Schritt wird übersprungen (Teil-Export angenommen).")
existing_ids = d365_ids.intersection(gsheet_ids)
self.logger.info(f"Sync-Analyse: {len(new_ids)} neue, {len(deleted_ids)} zu archivierende, {len(existing_ids)} bestehende Accounts.")
self.stats.new_accounts = len(new_ids)
self.stats.archived_accounts = len(deleted_ids)
self.stats.existing_accounts = len(existing_ids)
self.logger.info(f"Sync-Analyse: {self.stats.new_accounts} neue, {self.stats.archived_accounts} zu archivierende, {self.stats.existing_accounts} bestehende Accounts.")
updates_to_batch, rows_to_append = [], []
if new_ids:
new_accounts_df = self.d365_df[self.d365_df['CRM ID'].isin(new_ids)]
for _, row in new_accounts_df.iterrows():
@@ -170,83 +193,70 @@ class SyncManager:
if existing_ids:
d365_indexed = self.d365_df.set_index('CRM ID')
# --- KORREKTE DATENQUELLE VERWENDEN ---
gsheet_to_update_df = self.gsheet_df[self.gsheet_df['CRM ID'].isin(existing_ids)]
for original_row_index, gsheet_row in gsheet_to_update_df.iterrows():
crm_id = gsheet_row['CRM ID']
if crm_id not in d365_indexed.index: continue
d365_row = d365_indexed.loc[crm_id]
row_updates, conflict_messages, needs_reeval = {}, [], False
for gsheet_col in self.d365_wins_cols:
d365_val = str(d365_row[gsheet_col]).strip()
gsheet_val = str(gsheet_row[gsheet_col]).strip()
trigger_update = False
if gsheet_col == 'CRM Land':
d365_code_lower = d365_val.lower()
gsheet_val_lower = gsheet_val.lower()
d365_code_lower, gsheet_val_lower = d365_val.lower(), gsheet_val.lower()
d365_translated_lower = Config.COUNTRY_CODE_MAP.get(d365_code_lower, d365_code_lower).lower()
if gsheet_val_lower != d365_code_lower and gsheet_val_lower != d365_translated_lower:
trigger_update = True
elif gsheet_col == 'CRM Anzahl Techniker':
if (d365_val == '-1' or d365_val == '0') and gsheet_val == '': pass
elif d365_val != gsheet_val: trigger_update = True
elif gsheet_col == 'CRM Branche':
if gsheet_row['Chat Vorschlag Branche'] == '' and d365_val != gsheet_val:
trigger_update = True
elif gsheet_col == 'CRM Umsatz':
if gsheet_row['Wiki Umsatz'] == '' and d365_val != gsheet_val:
trigger_update = True
elif gsheet_col == 'CRM Anzahl Mitarbeiter':
if gsheet_row['Wiki Mitarbeiter'] == '' and d365_val != gsheet_val:
trigger_update = True
elif gsheet_col == 'CRM Beschreibung':
if gsheet_row['Website Zusammenfassung'] == '' and d365_val != gsheet_val:
trigger_update = True
else:
if d365_val != gsheet_val: trigger_update = True
if trigger_update:
row_updates[gsheet_col] = d365_val
needs_reeval = True
self.logger.debug(f"ReEval für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'")
row_updates[gsheet_col] = d365_val; needs_reeval = True
self.logger.debug(f"Update für {crm_id} durch '{gsheet_col}': D365='{d365_val}' | GSheet='{gsheet_val}'")
for gsheet_col in self.smart_merge_cols:
d365_val = str(d365_row.get(gsheet_col, '')).strip()
gsheet_val = str(gsheet_row.get(gsheet_col, '')).strip()
if d365_val and not gsheet_val:
row_updates[gsheet_col] = d365_val
needs_reeval = True
row_updates[gsheet_col] = d365_val; needs_reeval = True
elif d365_val and gsheet_val and d365_val != gsheet_val:
conflict_messages.append(f"{gsheet_col}_CONFLICT: D365='{d365_val}' | GSHEET='{gsheet_val}'")
if conflict_messages: row_updates["SyncConflict"] = "; ".join(conflict_messages)
if conflict_messages:
row_updates["SyncConflict"] = "; ".join(conflict_messages)
self.stats.conflict_accounts.add(crm_id)
for msg in conflict_messages: self.stats.field_conflicts[msg.split('_CONFLICT')[0]] += 1
if needs_reeval: row_updates["ReEval Flag"] = "x"
if row_updates:
self.stats.accounts_to_update.add(crm_id)
for field in row_updates.keys(): self.stats.field_updates[field] += 1
sheet_row_number = original_row_index + self.sheet_handler._header_rows + 1
for col_name, value in row_updates.items():
updates_to_batch.append({ "range": f"{COLUMN_MAP[col_name]['Titel']}{sheet_row_number}", "values": [[value]] })
if rows_to_append:
self.logger.info(f"Füge {len(rows_to_append)} neue Zeilen zum Google Sheet hinzu...")
self.sheet_handler.append_rows(sheet_name=self.target_sheet_name, values=rows_to_append)
if updates_to_batch:
self.logger.info(f"Sende {len(updates_to_batch)} Zell-Updates an das Google Sheet...")
self.sheet_handler.batch_update_cells(updates_to_batch)
if not rows_to_append and not updates_to_batch:
self.logger.info("Keine Änderungen festgestellt. Das Google Sheet ist bereits auf dem neuesten Stand.")
report = self.stats.generate_report()
self.logger.info(report)
print(report)
self.logger.info("Synchronisation erfolgreich abgeschlossen.")
def debug_sync(self, debug_id=None):