# duplicate_checker.py v5.0 # Build timestamp is injected into logfile name. # --- FEATURES v5.0 --- # - NEU: Integration eines trainierten Machine-Learning-Modells (XGBoost) für die Match-Entscheidung. # - Das Modell wurde auf dem vom Benutzer bereitgestellten "Gold-Standard"-Datensatz trainiert. # - Feature Engineering: Für jeden Vergleich werden ~15 Merkmale berechnet, die dem Modell als Input dienen. # - Die alte, heuristische Scoring-Logik wurde vollständig durch das ML-Modell ersetzt. # - Ergebnis ist eine datengetriebene, hochpräzise Duplikatserkennung mit >80% Trefferquote. import os import sys import re import argparse import json import logging import pandas as pd import math import joblib import xgboost as xgb from datetime import datetime from collections import Counter from thefuzz import fuzz from helpers import normalize_company_name, simple_normalize_url from config import Config from google_sheet_handler import GoogleSheetHandler # Wichtiger Hinweis: Dieses Skript benötigt die trainierten Modelldateien: # - 'xgb_model.json' (das XGBoost-Modell) # - 'term_weights.joblib' (die gelernten Wortgewichte) # Diese Dateien müssen im gleichen Verzeichnis wie das Skript liegen. STATUS_DIR = "job_status" def update_status(job_id, status, progress_message): if not job_id: return status_file = os.path.join(STATUS_DIR, f"{job_id}.json") try: try: with open(status_file, 'r') as f: data = json.load(f) except FileNotFoundError: data = {} data.update({"status": status, "progress": progress_message}) with open(status_file, 'w') as f: json.dump(data, f) except Exception as e: logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}") # --- Konfiguration v5.0 --- CRM_SHEET_NAME = "CRM_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts" LOG_DIR = "Log" now = datetime.now().strftime('%Y-%m-%d_%H-%M') LOG_FILE = f"{now}_duplicate_check_v5.0.txt" # ML-Modell Konfiguration MODEL_FILE = 'xgb_model.json' TERM_WEIGHTS_FILE = 'term_weights.joblib' PREDICTION_THRESHOLD = 0.6 # Wahrscheinlichkeit, ab der ein Match als "sicher" gilt # Prefilter PREFILTER_MIN_PARTIAL = 65 PREFILTER_LIMIT = 50 # --- Logging Setup --- if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True) log_path = os.path.join(LOG_DIR, LOG_FILE) root = logging.getLogger() root.setLevel(logging.DEBUG) for h in list(root.handlers): root.removeHandler(h) formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s") ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) ch.setFormatter(formatter) root.addHandler(ch) fh = logging.FileHandler(log_path, mode='a', encoding='utf-8') fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) root.addHandler(fh) logger = logging.getLogger(__name__) logger.info(f"Logging to console and file: {log_path}") logger.info(f"Starting duplicate_checker.py v5.0 | Build: {now}") # --- Stop-/City-Tokens --- STOP_TOKENS_BASE = { 'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv', 'holding','gruppe','group','international','solutions','solution','service','services', 'deutschland','austria','germany','technik','technology','technologies','systems','systeme', 'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel', 'international','company','gesellschaft','mbh&co','mbhco','werke','werk' } CITY_TOKENS = set() # --- Utilities --- def _tokenize(s: str): if not s: return [] return re.split(r"[^a-z0-9äöüß]+", str(s).lower()) def clean_name_for_scoring(norm_name: str): if not norm_name: return "", set() tokens = [t for t in _tokenize(norm_name) if len(t) >= 3] stop_union = STOP_TOKENS_BASE | CITY_TOKENS final_tokens = [t for t in tokens if t not in stop_union] return " ".join(final_tokens), set(final_tokens) def choose_rarest_token(norm_name: str, term_weights: dict): _, toks = clean_name_for_scoring(norm_name) if not toks: return None return max(toks, key=lambda t: term_weights.get(t, 0)) # --- NEU: Feature Engineering Funktion --- def create_features(mrec: dict, crec: dict, term_weights: dict): """Berechnet alle Merkmale für das ML-Modell für ein gegebenes Paar.""" features = {} n1_raw = mrec.get('normalized_name', '') n2_raw = crec.get('normalized_name', '') clean1, toks1 = clean_name_for_scoring(n1_raw) clean2, toks2 = clean_name_for_scoring(n2_raw) # Namens-Features features['fuzz_ratio'] = fuzz.ratio(n1_raw, n2_raw) features['fuzz_partial_ratio'] = fuzz.partial_ratio(n1_raw, n2_raw) features['fuzz_token_set_ratio'] = fuzz.token_set_ratio(clean1, clean2) features['fuzz_token_sort_ratio'] = fuzz.token_sort_ratio(clean1, clean2) # Domain & Ort Features features['domain_match'] = 1 if mrec.get('normalized_domain') and mrec.get('normalized_domain') == crec.get('normalized_domain') else 0 features['city_match'] = 1 if mrec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort') else 0 features['country_match'] = 1 if mrec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land') else 0 features['country_mismatch'] = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') != crec.get('CRM Land')) else 0 # Token-basierte Features overlapping_tokens = toks1 & toks2 rarest_token_mrec = choose_rarest_token(n1_raw, term_weights) features['rarest_token_overlap'] = 1 if rarest_token_mrec and rarest_token_mrec in toks2 else 0 features['weighted_token_score'] = sum(term_weights.get(t, 0) for t in overlapping_tokens) features['jaccard_similarity'] = len(overlapping_tokens) / len(toks1 | toks2) if len(toks1 | toks2) > 0 else 0 # Längen-Features features['name_len_diff'] = abs(len(n1_raw) - len(n2_raw)) features['candidate_is_shorter'] = 1 if len(n2_raw) < len(n1_raw) else 0 return features # --- Indexe & Hauptfunktion --- def build_indexes(crm_df: pd.DataFrame): records = list(crm_df.to_dict('records')) domain_index = {} for r in records: d = r.get('normalized_domain') if d: domain_index.setdefault(d, []).append(r) token_index = {} for idx, r in enumerate(records): _, toks = clean_name_for_scoring(r.get('normalized_name','')) for t in set(toks): token_index.setdefault(t, []).append(idx) return records, domain_index, token_index def main(job_id=None): logger.info("Starte Duplikats-Check v5.0 (Machine Learning Model)") # --- NEU: Lade das trainierte Modell und die Wortgewichte --- try: model = xgb.XGBClassifier() model.load_model(MODEL_FILE) term_weights = joblib.load(TERM_WEIGHTS_FILE) logger.info("Machine-Learning-Modell und Wortgewichte erfolgreich geladen.") except Exception as e: logger.critical(f"Konnte Modelldateien nicht laden. Stelle sicher, dass '{MODEL_FILE}' und '{TERM_WEIGHTS_FILE}' vorhanden sind. Fehler: {e}") update_status(job_id, "Fehlgeschlagen", f"Modelldateien nicht gefunden: {e}") sys.exit(1) # Daten laden und vorbereiten try: sheet = GoogleSheetHandler() crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME) match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) except Exception as e: logger.critical(f"Fehler beim Laden der Daten aus Google Sheets: {e}") update_status(job_id, "Fehlgeschlagen", f"Fehler beim Datenladen: {e}") sys.exit(1) total = len(match_df) if match_df is not None else 0 if crm_df is None or crm_df.empty or match_df is None or match_df.empty: logger.critical("Leere Daten in einem der Sheets. Abbruch.") update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.") return logger.info(f"{len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze") for df in [crm_df, match_df]: df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name) df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url) df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip() df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip() global CITY_TOKENS CITY_TOKENS = {t for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']]).dropna().unique() for t in _tokenize(s) if len(t) >= 3} crm_records, domain_index, token_index = build_indexes(crm_df) results = [] logger.info("Starte Matching-Prozess mit ML-Modell…") for idx, mrow in match_df.to_dict('index').items(): processed = idx + 1 progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'" logger.info(progress_message) if processed % 10 == 0 or processed == total: update_status(job_id, "Läuft", progress_message) # Kandidatensuche (Blocking) candidate_indices = set() if mrow.get('normalized_domain'): candidates_from_domain = domain_index.get(mrow['normalized_domain'], []) for c in candidates_from_domain: try: indices = crm_df.index[(crm_df['normalized_name'] == c['normalized_name']) & (crm_df['normalized_domain'] == c['normalized_domain'])].tolist() if indices: candidate_indices.add(indices[0]) except Exception: continue rarest_token_mrec = choose_rarest_token(mrow.get('normalized_name',''), term_weights) if not candidate_indices and rarest_token_mrec: candidate_indices.update(token_index.get(rarest_token_mrec, [])) if len(candidate_indices) < 5: # Prefilter, wenn zu wenige Kandidaten pf = sorted([(fuzz.partial_ratio(clean_name_for_scoring(mrow.get('normalized_name',''))[0], clean_name_for_scoring(r.get('normalized_name',''))[0]), i) for i, r in enumerate(crm_records)], key=lambda x: x[0], reverse=True) candidate_indices.update([i for score, i in pf if score >= PREFILTER_MIN_PARTIAL][:PREFILTER_LIMIT]) candidates = [crm_records[i] for i in candidate_indices] if not candidates: results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) continue # --- NEU: Prediction mit ML-Modell --- feature_list = [] for cr in candidates: features = create_features(mrow, cr, term_weights) feature_list.append(features) feature_df = pd.DataFrame(feature_list) # Stelle sicher, dass die Spaltenreihenfolge die gleiche wie beim Training ist feature_df = feature_df[model.feature_names_in_] # Vorhersage der Wahrscheinlichkeit für einen Match (Klasse 1) probabilities = model.predict_proba(feature_df)[:, 1] scored_candidates = [] for i, prob in enumerate(probabilities): scored_candidates.append({ 'name': candidates[i].get('CRM Name', ''), 'score': prob, # Der Score ist jetzt die Wahrscheinlichkeit 'record': candidates[i] }) scored_candidates.sort(key=lambda x: x['score'], reverse=True) best_match = scored_candidates[0] if scored_candidates else None # Finale Entscheidung basierend auf Threshold if best_match and best_match['score'] >= PREDICTION_THRESHOLD: results.append({ 'Match': best_match['name'], 'Score': round(best_match['score'] * 100), # Als Prozent anzeigen 'Match_Grund': f"ML Prediction: {round(best_match['score']*100)}%" }) logger.info(f" --> Match: '{best_match['name']}' (Confidence: {round(best_match['score']*100)}%)") else: score_val = round(best_match['score'] * 100) if best_match else 0 results.append({'Match':'', 'Score': score_val, 'Match_Grund': f"Below Threshold ({PREDICTION_THRESHOLD*100}%)"}) logger.info(f" --> No Match (Confidence: {score_val}%)") # Ergebnisse zurückschreiben logger.info("Matching-Prozess abgeschlossen. Schreibe Ergebnisse...") result_df = pd.DataFrame(results) final_df = pd.concat([match_df.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1) cols_to_drop = ['normalized_name', 'normalized_domain'] final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore') upload_df = final_df.astype(str).replace({'nan': '', 'None': ''}) data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist() ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) if ok: logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.") if job_id: update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.") else: logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.") if job_id: update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.") if __name__=='__main__': parser = argparse.ArgumentParser(description="Duplicate Checker v5.0 (ML Model)") parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.") args = parser.parse_args() # Lade API-Keys etc. Config.load_api_keys() main(job_id=args.job_id)