duplicate_checker.py aktualisiert

This commit is contained in:
2025-08-06 09:06:07 +00:00
parent 6b4c8295c7
commit dfcb270a7f

View File

@@ -1,169 +1,107 @@
import os duplicate_checker.py (v2.0 - mit Blocking-Strategie)
import re
import logging import logging
import pandas as pd import pandas as pd
import numpy as np from thefuzz import fuzz
import recordlinkage from config import Config
from rapidfuzz import fuzz from helpers import normalize_company_name, simple_normalize_url
from google_sheet_handler import GoogleSheetHandler from google_sheet_handler import GoogleSheetHandler
--- Konfiguration ---
# --- Konfiguration --- CRM_SHEET_NAME = "CRM_Accounts"
CRM_SHEET_NAME = "CRM_Accounts"
MATCHING_SHEET_NAME = "Matching_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts"
# Threshold gesenkt und konfigurierbar im Code SCORE_THRESHOLD = 80
SCORE_THRESHOLD = 0.75 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
WEIGHTS = { def calculate_similarity(record1, record2):
'domain': 0.5, """Berechnet einen gewichteten Ähnlichkeits-Score zwischen zwei Datensätzen."""
'name': 0.4, total_score = 0
'city': 0.1, if record1['normalized_domain'] and record1['normalized_domain'] == record2['normalized_domain']:
} total_score += 100
# Relativer Log-Ordner if record1['normalized_name'] and record2['normalized_name']:
LOG_DIR = 'log' name_similarity = fuzz.token_set_ratio(record1['normalized_name'], record2['normalized_name'])
LOG_FILENAME = 'duplicate_check.log' total_score += name_similarity * 0.7
if record1['CRM Ort'] and record1['CRM Ort'] == record2['CRM Ort']:
# --- Logging Setup --- if record1['CRM Land'] and record1['CRM Land'] == record2['CRM Land']:
if not os.path.exists(LOG_DIR): total_score += 20
try: return round(total_score)
os.makedirs(LOG_DIR)
except Exception as e:
print(f"Warnung: Konnte Log-Ordner nicht anlegen: {e}")
log_path = os.path.join(LOG_DIR, LOG_FILENAME)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)-8s - %(name)s - %(message)s')
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler
try:
file_handler = logging.FileHandler(log_path, mode='a', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.info(f"Logging auch in Datei: {log_path}")
except Exception as e:
logger.warning(f"Konnte keine Log-Datei schreiben: {e}")
# --- Hilfsfunktionen ---
def normalize_company_name(name: str) -> str:
s = str(name).casefold()
for src, dst in [('ä','ae'), ('ö','oe'), ('ü','ue'), ('ß','ss')]:
s = s.replace(src, dst)
s = re.sub(r'[^a-z0-9\s]', ' ', s)
stops = ['gmbh','ag','kg','ug','ohg','holding','group','international']
tokens = [t for t in s.split() if t and t not in stops]
return ' '.join(tokens)
def normalize_domain(url: str) -> str:
s = str(url).casefold().strip()
s = re.sub(r'^https?://', '', s)
s = s.split('/')[0]
if s.startswith('www.'):
s = s[4:]
return s
def main(): def main():
logger.info("Starte den Duplikats-Check mit Fallback-Blocking...") """Hauptfunktion zum Laden, Vergleichen und Schreiben der Daten."""
# GoogleSheetHandler initialisieren logging.info("Starte den Duplikats-Check (v2.0 mit Blocking)...")
try: try:
sheet_handler = GoogleSheetHandler() sheet_handler = GoogleSheetHandler()
logger.info("GoogleSheetHandler initialisiert") except Exception as e:
except Exception as e: logging.critical(f"FEHLER bei Initialisierung des GoogleSheetHandler: {e}")
logger.critical(f"FEHLER bei Initialisierung des GoogleSheetHandler: {e}") return
return
# Daten laden logging.info(f"Lade Master-Daten aus '{CRM_SHEET_NAME}'...")
crm_df = sheet_handler.get_sheet_as_dataframe(CRM_SHEET_NAME) crm_df = sheet_handler.get_sheet_as_dataframe(CRM_SHEET_NAME)
match_df = sheet_handler.get_sheet_as_dataframe(MATCHING_SHEET_NAME) if crm_df is None or crm_df.empty:
if crm_df is None or crm_df.empty or match_df is None or match_df.empty: logging.critical(f"Konnte keine Daten aus '{CRM_SHEET_NAME}' laden. Breche ab.")
logger.critical("CRM- oder Matching-Daten leer. Abbruch.") return
return
logger.info(f"{len(crm_df)} CRM-Zeilen, {len(match_df)} Matching-Zeilen geladen")
# Normalisierung logging.info(f"Lade zu prüfende Daten aus '{MATCHING_SHEET_NAME}'...")
for df, label in [(crm_df, 'CRM'), (match_df, 'Matching')]: matching_df = sheet_handler.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
df['norm_name'] = df['CRM Name'].fillna('').apply(normalize_company_name) if matching_df is None or matching_df.empty:
df['norm_domain'] = df['CRM Website'].fillna('').apply(normalize_domain) logging.critical(f"Konnte keine Daten aus '{MATCHING_SHEET_NAME}' laden. Breche ab.")
df['city'] = df['CRM Ort'].fillna('').apply(lambda x: str(x).casefold().strip()) return
df['name_token'] = df['norm_name'].apply(lambda x: x.split()[0] if x else np.nan)
# Leere Werte als NaN markieren
df['norm_domain'].replace('', np.nan, inplace=True)
df['city'].replace('', np.nan, inplace=True)
logger.debug(f"{label}-Normalisierung: norm_domain={df.iloc[0]['norm_domain']}, name_token={df.iloc[0]['name_token']}")
# Blocking: Domain und Name-Token logging.info("Normalisiere Daten für den Vergleich...")
index_dom = recordlinkage.Index() for df in [crm_df, matching_df]:
index_dom.block('norm_domain') df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name)
pairs_dom = index_dom.index(crm_df, match_df) df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url)
index_name = recordlinkage.Index() df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip()
index_name.block('name_token') df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip()
pairs_name = index_name.index(crm_df, match_df) # Blocking Key: Das erste Wort des normalisierten Namens
# Union der Kandidatenpaare df['block_key'] = df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
candidate_pairs = pairs_dom.append(pairs_name).drop_duplicates()
logger.info(f"Blocking abgeschlossen: Dom-Paare={len(pairs_dom)}, Name-Paare={len(pairs_name)}, Gesamt={len(candidate_pairs)}")
# Vergleichsregeln definieren # --- NEUE, SCHNELLE BLOCKING-STRATEGIE ---
compare = recordlinkage.Compare() logging.info("Erstelle Index für CRM-Daten zur Beschleunigung...")
compare.exact('norm_domain', 'norm_domain', label='domain', missing_value=0) crm_index = {}
compare.string('norm_name', 'norm_name', method='jarowinkler', label='name_sim') for index, row in crm_df.iterrows():
compare.exact('city', 'city', label='city', missing_value=0) key = row['block_key']
features = compare.compute(candidate_pairs, crm_df, match_df) if key:
logger.debug(f"Features berechnet: {features.head()}...") if key not in crm_index:
crm_index[key] = []
crm_index[key].append(row)
# Score berechnen logging.info("Starte Matching-Prozess...")
features['score'] = ( results = []
WEIGHTS['domain'] * features['domain'] + total_matches = len(matching_df)
WEIGHTS['name'] * features['name_sim'] +
WEIGHTS['city'] * features['city']
)
logger.info("Scores berechnet")
# Per-Match Logging und Auswahl for index, match_row in matching_df.iterrows():
results = [] best_score = 0
crm_map = crm_df.reset_index() best_match_name = ""
for match_idx, group in features.reset_index().groupby('level_1'):
logger.info(f"--- Prüfe Matching-Zeile {match_idx} ---")
df_block = group.sort_values('score', ascending=False).copy()
# CRM-Daten für Log
df_block['CRM Name'] = df_block['level_0'].map(crm_map.set_index('index')['CRM Name'])
# Log der Top-Kandidaten
for _, row in df_block.head(5).iterrows():
logger.debug(f"Candidate [{int(row['level_0'])}]: score={row['score']:.3f}, name_sim={row['name_sim']:.3f}, dom={row['domain']}, city={row['city']} => {row['CRM Name']}")
top = df_block.iloc[0]
crm_idx = top['level_0'] if top['score'] >= SCORE_THRESHOLD else None
if crm_idx is not None:
logger.info(f" --> Match: {int(crm_idx)} ({top['CRM Name']}) mit Score {top['score']:.2f}")
else:
logger.info(f" --> Kein Match (höchster Score {top['score']:.2f})")
results.append((crm_idx, match_idx, top['score']))
# Ausgabe zusammenstellen logging.info(f"Prüfe {index + 1}/{total_matches}: {match_row['CRM Name']}...")
match_map = match_df.reset_index()
output = match_map[['CRM Name','CRM Website','CRM Ort','CRM Land']].copy()
output[['Matched CRM Name','Matched CRM Website','Matched CRM Ort','Matched CRM Land','Score']] = ''
for crm_idx, match_idx, score in results:
if crm_idx is not None:
crm_row = crm_map[crm_map['index']==crm_idx].iloc[0]
output.at[match_idx, 'Matched CRM Name'] = crm_row['CRM Name']
output.at[match_idx, 'Matched CRM Website'] = crm_row['CRM Website']
output.at[match_idx, 'Matched CRM Ort'] = crm_row['CRM Ort']
output.at[match_idx, 'Matched CRM Land'] = crm_row['CRM Land']
output.at[match_idx, 'Score'] = round(score,3)
# Zurückschreiben ins Google Sheet # Finde den Block von Kandidaten
data = [output.columns.tolist()] + output.values.tolist() block_key = match_row['block_key']
success = sheet_handler.clear_and_write_data(MATCHING_SHEET_NAME, data) candidates = crm_index.get(block_key, [])
if success:
logger.info(f"Erfolgreich geschrieben: {len([r for r in results if r[0] is not None])} Matches") # Führe den teuren Vergleich nur für die Kandidaten in diesem Block durch
for crm_row in candidates:
score = calculate_similarity(match_row, crm_row)
if score > best_score:
best_score = score
best_match_name = crm_row['CRM Name']
if best_score >= SCORE_THRESHOLD:
results.append({'Potenzieller Treffer im CRM': best_match_name, 'Ähnlichkeits-Score': best_score})
else: else:
logger.error("Fehler beim Schreiben ins Google Sheet.") # Wenn nichts im Block gefunden wurde, trotzdem den besten Treffer (kann 0 sein) anzeigen
results.append({'Potenzieller Treffer im CRM': '' if not best_match_name else best_match_name, 'Ähnlichkeits-Score': best_score})
if __name__ == '__main__': logging.info("Matching abgeschlossen. Schreibe Ergebnisse zurück ins Sheet...")
main() result_df = pd.DataFrame(results)
# Die ursprünglichen Spalten aus matching_df für die Ausgabe nehmen
output_df = matching_df[['CRM Name', 'CRM Website', 'CRM Ort', 'CRM Land']].copy()
output_df = pd.concat([output_df.reset_index(drop=True), result_df], axis=1)
data_to_write = [output_df.columns.values.tolist()] + output_df.values.tolist()
success = sheet_handler.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write)
if success:
logging.info(f"Ergebnisse erfolgreich in das Tabellenblatt '{MATCHING_SHEET_NAME}' geschrieben.")
else:
logging.error("FEHLER beim Schreiben der Ergebnisse ins Google Sheet.")
if name == "main":
main()