duplicate_checker.py aktualisiert

This commit is contained in:
2025-08-06 09:08:12 +00:00
parent 42e09ed7ad
commit 555d9dcbfc

View File

@@ -1,107 +1,114 @@
#duplicate_checker.py (v2.0 - mit Blocking-Strategie) # duplicate_checker.py (v2.0 - mit Blocking-Strategie)
import logging import logging
import pandas as pd import pandas as pd
from thefuzz import fuzz from thefuzz import fuzz
from config import Config from config import Config
from helpers import normalize_company_name, simple_normalize_url from helpers import normalize_company_name, simple_normalize_url
from google_sheet_handler import GoogleSheetHandler from google_sheet_handler import GoogleSheetHandler
--- Konfiguration ---
# --- Konfiguration ---
CRM_SHEET_NAME = "CRM_Accounts" CRM_SHEET_NAME = "CRM_Accounts"
MATCHING_SHEET_NAME = "Matching_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts"
SCORE_THRESHOLD = 80 SCORE_THRESHOLD = 80
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def calculate_similarity(record1, record2): def calculate_similarity(record1, record2):
"""Berechnet einen gewichteten Ähnlichkeits-Score zwischen zwei Datensätzen.""" """Berechnet einen gewichteten Ähnlichkeits-Score zwischen zwei Datensätzen."""
total_score = 0 total_score = 0
if record1['normalized_domain'] and record1['normalized_domain'] == record2['normalized_domain']: if record1['normalized_domain'] and record1['normalized_domain'] == record2['normalized_domain']:
total_score += 100 total_score += 100
if record1['normalized_name'] and record2['normalized_name']: if record1['normalized_name'] and record2['normalized_name']:
name_similarity = fuzz.token_set_ratio(record1['normalized_name'], record2['normalized_name']) name_similarity = fuzz.token_set_ratio(record1['normalized_name'], record2['normalized_name'])
total_score += name_similarity * 0.7 total_score += name_similarity * 0.7
if record1['CRM Ort'] and record1['CRM Ort'] == record2['CRM Ort']: if record1['CRM Ort'] and record1['CRM Ort'] == record2['CRM Ort']:
if record1['CRM Land'] and record1['CRM Land'] == record2['CRM Land']: if record1['CRM Land'] and record1['CRM Land'] == record2['CRM Land']:
total_score += 20 total_score += 20
return round(total_score) return round(total_score)
def main(): def main():
"""Hauptfunktion zum Laden, Vergleichen und Schreiben der Daten.""" """Hauptfunktion zum Laden, Vergleichen und Schreiben der Daten."""
logging.info("Starte den Duplikats-Check (v2.0 mit Blocking)...") logging.info("Starte den Duplikats-Check (v2.0 mit Blocking)...")
try:
sheet_handler = GoogleSheetHandler()
except Exception as e:
logging.critical(f"FEHLER bei Initialisierung des GoogleSheetHandler: {e}")
return
logging.info(f"Lade Master-Daten aus '{CRM_SHEET_NAME}'...") try:
crm_df = sheet_handler.get_sheet_as_dataframe(CRM_SHEET_NAME) sheet_handler = GoogleSheetHandler()
if crm_df is None or crm_df.empty: except Exception as e:
logging.critical(f"Konnte keine Daten aus '{CRM_SHEET_NAME}' laden. Breche ab.") logging.critical(f"FEHLER bei Initialisierung des GoogleSheetHandler: {e}")
return return
logging.info(f"Lade zu prüfende Daten aus '{MATCHING_SHEET_NAME}'...") logging.info(f"Lade Master-Daten aus '{CRM_SHEET_NAME}'...")
matching_df = sheet_handler.get_sheet_as_dataframe(MATCHING_SHEET_NAME) crm_df = sheet_handler.get_sheet_as_dataframe(CRM_SHEET_NAME)
if matching_df is None or matching_df.empty: if crm_df is None or crm_df.empty:
logging.critical(f"Konnte keine Daten aus '{MATCHING_SHEET_NAME}' laden. Breche ab.") logging.critical(f"Konnte keine Daten aus '{CRM_SHEET_NAME}' laden. Breche ab.")
return return
logging.info("Normalisiere Daten für den Vergleich...") logging.info(f"Lade zu prüfende Daten aus '{MATCHING_SHEET_NAME}'...")
for df in [crm_df, matching_df]: matching_df = sheet_handler.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name) if matching_df is None or matching_df.empty:
df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url) logging.critical(f"Konnte keine Daten aus '{MATCHING_SHEET_NAME}' laden. Breche ab.")
df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip() return
df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip()
# Blocking Key: Das erste Wort des normalisierten Namens
df['block_key'] = df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
# --- NEUE, SCHNELLE BLOCKING-STRATEGIE --- logging.info("Normalisiere Daten für den Vergleich...")
logging.info("Erstelle Index für CRM-Daten zur Beschleunigung...") for df in [crm_df, matching_df]:
crm_index = {} df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name)
for index, row in crm_df.iterrows(): df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url)
key = row['block_key'] df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip()
if key: df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip()
if key not in crm_index: # Blocking Key: Das erste Wort des normalisierten Namens
crm_index[key] = [] df['block_key'] = df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
crm_index[key].append(row)
logging.info("Starte Matching-Prozess...") # --- NEUE, SCHNELLE BLOCKING-STRATEGIE ---
results = [] logging.info("Erstelle Index für CRM-Daten zur Beschleunigung...")
total_matches = len(matching_df) crm_index = {}
for index, row in crm_df.iterrows():
key = row['block_key']
if key:
if key not in crm_index:
crm_index[key] = []
crm_index[key].append(row)
for index, match_row in matching_df.iterrows(): logging.info("Starte Matching-Prozess...")
best_score = 0 results = []
best_match_name = "" total_matches = len(matching_df)
logging.info(f"Prüfe {index + 1}/{total_matches}: {match_row['CRM Name']}...") for index, match_row in matching_df.iterrows():
best_score = 0
best_match_name = ""
# Finde den Block von Kandidaten logging.info(f"Prüfe {index + 1}/{total_matches}: {match_row['CRM Name']}...")
block_key = match_row['block_key']
candidates = crm_index.get(block_key, [])
# Führe den teuren Vergleich nur für die Kandidaten in diesem Block durch # Finde den Block von Kandidaten
for crm_row in candidates: block_key = match_row['block_key']
score = calculate_similarity(match_row, crm_row) candidates = crm_index.get(block_key, [])
if score > best_score:
best_score = score
best_match_name = crm_row['CRM Name']
if best_score >= SCORE_THRESHOLD: # Führe den teuren Vergleich nur für die Kandidaten in diesem Block durch
results.append({'Potenzieller Treffer im CRM': best_match_name, 'Ähnlichkeits-Score': best_score}) for crm_row in candidates:
score = calculate_similarity(match_row, crm_row)
if score > best_score:
best_score = score
best_match_name = crm_row['CRM Name']
if best_score >= SCORE_THRESHOLD:
results.append({'Potenzieller Treffer im CRM': best_match_name, 'Ähnlichkeits-Score': best_score})
else:
# Wenn nichts im Block gefunden wurde, trotzdem den besten Treffer (kann 0 sein) anzeigen
results.append({'Potenzieller Treffer im CRM': '' if not best_match_name else best_match_name, 'Ähnlichkeits-Score': best_score})
logging.info("Matching abgeschlossen. Schreibe Ergebnisse zurück ins Sheet...")
result_df = pd.DataFrame(results)
# Die ursprünglichen Spalten aus matching_df für die Ausgabe nehmen
output_df = matching_df[['CRM Name', 'CRM Website', 'CRM Ort', 'CRM Land']].copy()
output_df = pd.concat([output_df.reset_index(drop=True), result_df], axis=1)
data_to_write = [output_df.columns.values.tolist()] + output_df.values.tolist()
success = sheet_handler.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write)
if success:
logging.info(f"Ergebnisse erfolgreich in das Tabellenblatt '{MATCHING_SHEET_NAME}' geschrieben.")
else: else:
# Wenn nichts im Block gefunden wurde, trotzdem den besten Treffer (kann 0 sein) anzeigen logging.error("FEHLER beim Schreiben der Ergebnisse ins Google Sheet.")
results.append({'Potenzieller Treffer im CRM': '' if not best_match_name else best_match_name, 'Ähnlichkeits-Score': best_score})
logging.info("Matching abgeschlossen. Schreibe Ergebnisse zurück ins Sheet...") if __name__ == "__main__":
result_df = pd.DataFrame(results) main()
# Die ursprünglichen Spalten aus matching_df für die Ausgabe nehmen
output_df = matching_df[['CRM Name', 'CRM Website', 'CRM Ort', 'CRM Land']].copy()
output_df = pd.concat([output_df.reset_index(drop=True), result_df], axis=1)
data_to_write = [output_df.columns.values.tolist()] + output_df.values.tolist()
success = sheet_handler.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write)
if success:
logging.info(f"Ergebnisse erfolgreich in das Tabellenblatt '{MATCHING_SHEET_NAME}' geschrieben.")
else:
logging.error("FEHLER beim Schreiben der Ergebnisse ins Google Sheet.")
if name == "main":
main()