duplicate_checker.py aktualisiert

This commit is contained in:
2025-08-04 06:08:27 +00:00
parent 38612a858e
commit 3a8809e08f

View File

@@ -1,106 +1,155 @@
# duplicate_checker.py (v3.0 - Back to Basics: Optimized Brute-Force) # duplicate_checker.py (v2.0 + Transparenz)
import logging import logging
import pandas as pd import pandas as pd
from thefuzz import fuzz from thefuzz import fuzz
from config import Config from config import Config
from helpers import normalize_company_name, simple_normalize_url from helpers import normalize_company_name, simple_normalize_url, create_log_filename
from google_sheet_handler import GoogleSheetHandler from google_sheet_handler import GoogleSheetHandler
import time import time
# --- Konfiguration --- # --- Konfiguration ---
CRM_SHEET_NAME = "CRM_Accounts" CRM_SHEET_NAME = "CRM_Accounts"
MATCHING_SHEET_NAME = "Matching_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts"
SCORE_THRESHOLD = 85 # Treffer unter diesem Wert werden nicht als "potenzieller Treffer" angezeigt SCORE_THRESHOLD = 80
# --- VOLLSTÄNDIGES LOGGING SETUP ---
LOG_LEVEL = logging.DEBUG if Config.DEBUG else logging.INFO
LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)s - %(message)s'
root_logger = logging.getLogger()
root_logger.setLevel(LOG_LEVEL)
# Handler nur hinzufügen, wenn noch keine konfiguriert sind, um Dopplung zu vermeiden
if not root_logger.handlers:
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter(LOG_FORMAT))
root_logger.addHandler(stream_handler)
log_file_path = create_log_filename("duplicate_check_v2_final")
if log_file_path:
file_handler = logging.FileHandler(log_file_path, mode='a', encoding='utf-8')
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
root_logger.addHandler(file_handler)
else:
log_file_path = next((h.baseFilename for h in root_logger.handlers if isinstance(h, logging.FileHandler)), None)
# WICHTIG: Logging Setup für detaillierte Ausgaben
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)-8s - %(name)s - %(message)s')
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def calculate_similarity_with_details(record1, record2):
def calculate_similarity_details(record1, record2):
""" """
Berechnet einen gewichteten Ähnlichkeits-Score und gibt die Details zurück. Berechnet einen gewichteten Ähnlichkeits-Score und gibt den Score und den Grund zurück.
Basierend auf der v2.0 Scoring-Logik.
""" """
scores = {'name': 0, 'location': 0, 'domain': 0} scores = {'name': 0, 'location': 0, 'domain': 0}
# Domain-Match (höchste Priorität, 100 Punkte) domain1 = record1.get('normalized_domain')
if record1.get('normalized_domain') and record1['normalized_domain'] != 'k.a.' and record1['normalized_domain'] == record2.get('normalized_domain'): domain2 = record2.get('normalized_domain')
if domain1 and domain1 != 'k.a.' and domain1 == domain2:
scores['domain'] = 100 scores['domain'] = 100
# Namensähnlichkeit (hohe 85% Gewichtung) name1 = record1.get('normalized_name')
if record1.get('normalized_name') and record2.get('normalized_name'): name2 = record2.get('normalized_name')
# token_set_ratio ist robust gegen zusätzliche Wörter wie "Holding" oder "Gruppe" if name1 and name2:
scores['name'] = round(fuzz.token_set_ratio(record1['normalized_name'], record2['normalized_name']) * 0.85) name_similarity = fuzz.token_set_ratio(name1, name2)
scores['name'] = round(name_similarity * 0.7)
# Standort-Bonus (20 Punkte) ort1 = record1.get('CRM Ort')
if record1.get('CRM Ort') and record1['CRM Ort'] == record2.get('CRM Ort'): ort2 = record2.get('CRM Ort')
if record1.get('CRM Land') and record1['CRM Land'] == record2.get('CRM Land'): land1 = record1.get('CRM Land')
scores['location'] = 20 land2 = record2.get('CRM Land')
if ort1 and ort1 == ort2 and land1 and land1 == land2:
scores['location'] = 20
total_score = sum(scores.values()) total_score = sum(scores.values())
return {'total': total_score, 'details': scores}
reasons = []
if scores['domain'] > 0: reasons.append(f"Domain({scores['domain']})")
if scores['name'] > 0: reasons.append(f"Name({scores['name']})")
if scores['location'] > 0: reasons.append(f"Ort({scores['location']})")
reason_text = " + ".join(reasons) if reasons else "Keine Übereinstimmung"
return round(total_score), reason_text
def main(): def main():
"""Hauptfunktion zum Laden, Vergleichen und Schreiben der Daten."""
start_time = time.time() start_time = time.time()
logger.info("Starte den Duplikats-Check (v3.0 - Back to Basics)...") logger.info("Starte den Duplikats-Check (v2.0 mit Blocking und Maximum Logging)...")
logger.info(f"Logdatei: {log_file_path}")
# ... (Initialisierung und Laden der Daten bleibt gleich) ...
try: try:
sheet_handler = GoogleSheetHandler() sheet_handler = GoogleSheetHandler()
except Exception as e: except Exception as e:
logger.critical(f"FEHLER bei Initialisierung: {e}") logger.critical(f"FEHLER bei Initialisierung des GoogleSheetHandler: {e}")
return return
logging.info(f"Lade Master-Daten aus '{CRM_SHEET_NAME}'...") logger.info(f"Lade Master-Daten aus '{CRM_SHEET_NAME}'...")
crm_df = sheet_handler.get_sheet_as_dataframe(CRM_SHEET_NAME) crm_df = sheet_handler.get_sheet_as_dataframe(CRM_SHEET_NAME)
if crm_df is None or crm_df.empty: return if crm_df is None or crm_df.empty: return
logging.info(f"Lade zu prüfende Daten aus '{MATCHING_SHEET_NAME}'...") logger.info(f"Lade zu prüfende Daten aus '{MATCHING_SHEET_NAME}'...")
matching_df = sheet_handler.get_sheet_as_dataframe(MATCHING_SHEET_NAME) matching_df = sheet_handler.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
if matching_df is None or matching_df.empty: return if matching_df is None or matching_df.empty: return
original_matching_df = matching_df.copy() original_matching_df = matching_df.copy()
logging.info("Normalisiere Daten für den Vergleich...") logger.info("Normalisiere Daten für den Vergleich...")
for df in [crm_df, matching_df]: for df in [crm_df, matching_df]:
df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name) df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name)
df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url) df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url)
df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip() df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip()
df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip() df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip()
df['block_key'] = df['normalized_name'].apply(lambda x: x.split()[0] if x and x.split() else None)
logger.info("Erstelle Index für CRM-Daten zur Beschleunigung...")
crm_index = {}
crm_records = crm_df.to_dict('records') crm_records = crm_df.to_dict('records')
matching_records = matching_df.to_dict('records') for record in crm_records:
key = record['block_key']
if key:
if key not in crm_index:
crm_index[key] = []
crm_index[key].append(record)
logger.info(f"Starte Matching-Prozess: {len(matching_records)} Einträge werden mit {len(crm_records)} CRM-Einträgen verglichen...") logger.info("Starte Matching-Prozess...")
results = [] results = []
for i, match_record in enumerate(matching_records): for match_record in matching_df.to_dict('records'):
best_score_info = {'total': -1, 'details': {'name': 0, 'location': 0, 'domain': 0}} best_score = -1
best_match_name = "" best_match_name = ""
best_reason = ""
logger.info(f"--- Prüfe {i + 1}/{len(matching_records)}: '{match_record.get('CRM Name', 'N/A')}' ---") logger.info(f"--- Prüfe: '{match_record.get('CRM Name', 'N/A')}' ---")
logger.debug(f" [Normalisiert: '{match_record.get('normalized_name')}', Domain: '{match_record.get('normalized_domain')}', Key: '{match_record.get('block_key')}']")
# BRUTE-FORCE: Vergleiche mit jedem einzelnen CRM-Eintrag block_key = match_record.get('block_key')
for crm_record in crm_records: candidates = crm_index.get(block_key, [])
score_info = calculate_similarity_details(match_record, crm_record)
# Logge jeden interessanten Vergleich (Score > 60) if not candidates:
if score_info['total'] > 60: logger.debug(" -> Keine Kandidaten im Index gefunden. Überspringe Vergleich.")
logger.debug(f" - Kandidat: '{crm_record.get('CRM Name', 'N/A')}' -> Score: {score_info['total']} (Details: {score_info['details']})") results.append({
'Potenzieller Treffer im CRM': "", 'Ähnlichkeits-Score': 0, 'Matching-Grund': "Keine Kandidaten"
})
continue
if score_info['total'] > best_score_info['total']: logger.debug(f" -> Vergleiche mit {len(candidates)} Kandidaten aus Block '{block_key}'.")
best_score_info = score_info
best_match_name = crm_record.get('CRM Name', 'N/A')
logger.info(f" --> Bester Treffer: '{best_match_name}' mit Score {best_score_info['total']}") for crm_row in candidates:
score, reason = calculate_similarity_with_details(match_record, crm_row)
if score > 0:
logger.debug(f" - Kandidat: '{crm_row.get('CRM Name', 'N/A')}' -> Score: {score} (Grund: {reason})")
if score > best_score:
best_score = score
best_match_name = crm_row.get('CRM Name', 'N/A')
best_reason = reason
logger.info(f" --> Bester Treffer: '{best_match_name}' mit Score {best_score} (Grund: {best_reason})")
results.append({ results.append({
'Potenzieller Treffer im CRM': best_match_name if best_score_info['total'] >= SCORE_THRESHOLD else "", 'Potenzieller Treffer im CRM': best_match_name if best_score >= SCORE_THRESHOLD else "",
'Score (Gesamt)': best_score_info['total'], 'Ähnlichkeits-Score': best_score,
'Score (Name)': best_score_info['details']['name'], 'Matching-Grund': best_reason
'Bonus (Standort)': best_score_info['details']['location'],
'Bonus (Domain)': best_score_info['details']['domain']
}) })
logging.info("Matching abgeschlossen. Schreibe Ergebnisse zurück ins Sheet...") logging.info("Matching abgeschlossen. Schreibe Ergebnisse zurück ins Sheet...")
@@ -112,12 +161,13 @@ def main():
success = sheet_handler.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) success = sheet_handler.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write)
if success: if success:
logger.info(f"Ergebnisse erfolgreich in '{MATCHING_SHEET_NAME}' geschrieben.") logger.info(f"Ergebnisse erfolgreich in das Tabellenblatt '{MATCHING_SHEET_NAME}' geschrieben.")
else: else:
logger.error("FEHLER beim Schreiben der Ergebnisse ins Google Sheet.") logger.error("FEHLER beim Schreiben der Ergebnisse ins Google Sheet.")
end_time = time.time() end_time = time.time()
logger.info(f"Gesamtdauer des Duplikats-Checks: {end_time - start_time:.2f} Sekunden.") logger.info(f"Gesamtdauer des Duplikats-Checks: {end_time - start_time:.2f} Sekunden.")
logger.info(f"===== Skript beendet =====")
if __name__ == "__main__": if __name__ == "__main__":
main() main()