# duplicate_checker.py v5.0 # Build timestamp is injected into logfile name. # --- FEATURES v5.0 --- # - Integration eines trainierten Machine-Learning-Modells (XGBoost) für die Match-Entscheidung. # - Die alte, heuristische Scoring-Logik wurde vollständig durch das ML-Modell ersetzt. # - Ergebnis ist eine datengetriebene, hochpräzise Duplikatserkennung. import os import sys import re import argparse import json import logging import pandas as pd import math import joblib import xgboost as xgb from datetime import datetime from collections import Counter from thefuzz import fuzz from helpers import normalize_company_name, simple_normalize_url from config import Config from google_sheet_handler import GoogleSheetHandler STATUS_DIR = "job_status" def update_status(job_id, status, progress_message): if not job_id: return status_file = os.path.join(STATUS_DIR, f"{job_id}.json") try: try: with open(status_file, 'r') as f: data = json.load(f) except FileNotFoundError: data = {} data.update({"status": status, "progress": progress_message}) with open(status_file, 'w') as f: json.dump(data, f) except Exception as e: logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}") # --- Konfiguration v5.0 --- CRM_SHEET_NAME = "CRM_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts" LOG_DIR = "Log" now = datetime.now().strftime('%Y-%m-%d_%H-%M') LOG_FILE = f"{now}_duplicate_check_v5.0.txt" MODEL_FILE = 'xgb_model.json' TERM_WEIGHTS_FILE = 'term_weights.joblib' PREDICTION_THRESHOLD = 0.5 # Wahrscheinlichkeit, ab der ein Match als "sicher" gilt PREFILTER_MIN_PARTIAL = 65 PREFILTER_LIMIT = 50 # --- Logging Setup --- if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True) log_path = os.path.join(LOG_DIR, LOG_FILE) root = logging.getLogger() root.setLevel(logging.DEBUG) for h in list(root.handlers): root.removeHandler(h) formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s") ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) ch.setFormatter(formatter) root.addHandler(ch) fh = logging.FileHandler(log_path, mode='a', encoding='utf-8') fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) root.addHandler(fh) logger = logging.getLogger(__name__) logger.info(f"Logging to console and file: {log_path}") logger.info(f"Starting duplicate_checker.py v5.0 | Build: {now}") # --- Stop-/City-Tokens --- STOP_TOKENS_BASE = { 'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv', 'holding','gruppe','group','international','solutions','solution','service','services', 'deutschland','austria','germany','technik','technology','technologies','systems','systeme', 'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel', 'international','company','gesellschaft','mbh&co','mbhco','werke','werk' } CITY_TOKENS = set() # --- Utilities --- def _tokenize(s: str): if not s: return [] return re.split(r"[^a-z0-9äöüß]+", str(s).lower()) def clean_name_for_scoring(norm_name: str): if not norm_name: return "", set() tokens = [t for t in _tokenize(norm_name) if len(t) >= 3] stop_union = STOP_TOKENS_BASE | CITY_TOKENS final_tokens = [t for t in tokens if t not in stop_union] return " ".join(final_tokens), set(final_tokens) def choose_rarest_token(norm_name: str, term_weights: dict): _, toks = clean_name_for_scoring(norm_name) if not toks: return None return max(toks, key=lambda t: term_weights.get(t, 0)) # --- Feature Engineering Funktion --- def create_features(mrec: dict, crec: dict, term_weights: dict): features = {} n1_raw = mrec.get('normalized_name', '') n2_raw = crec.get('normalized_name', '') clean1, toks1 = clean_name_for_scoring(n1_raw) clean2, toks2 = clean_name_for_scoring(n2_raw) features['fuzz_ratio'] = fuzz.ratio(n1_raw, n2_raw) features['fuzz_partial_ratio'] = fuzz.partial_ratio(n1_raw, n2_raw) features['fuzz_token_set_ratio'] = fuzz.token_set_ratio(clean1, clean2) features['fuzz_token_sort_ratio'] = fuzz.token_sort_ratio(clean1, clean2) features['domain_match'] = 1 if mrec.get('normalized_domain') and mrec.get('normalized_domain') == crec.get('normalized_domain') else 0 features['city_match'] = 1 if mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort') else 0 features['country_match'] = 1 if mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land') else 0 features['country_mismatch'] = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') != crec.get('CRM Land')) else 0 overlapping_tokens = toks1 & toks2 rarest_token_mrec = choose_rarest_token(n1_raw, term_weights) features['rarest_token_overlap'] = 1 if rarest_token_mrec and rarest_token_mrec in toks2 else 0 features['weighted_token_score'] = sum(term_weights.get(t, 0) for t in overlapping_tokens) features['jaccard_similarity'] = len(overlapping_tokens) / len(toks1 | toks2) if len(toks1 | toks2) > 0 else 0 features['name_len_diff'] = abs(len(n1_raw) - len(n2_raw)) features['candidate_is_shorter'] = 1 if len(n2_raw) < len(n1_raw) else 0 return features # --- Indexe & Hauptfunktion --- def build_indexes(crm_df: pd.DataFrame): records = list(crm_df.to_dict('records')) domain_index = {} for r in records: d = r.get('normalized_domain') if d: domain_index.setdefault(d, []).append(r) token_index = {} for idx, r in enumerate(records): _, toks = clean_name_for_scoring(r.get('normalized_name','')) for t in set(toks): token_index.setdefault(t, []).append(idx) return records, domain_index, token_index def main(job_id=None): logger.info("Starte Duplikats-Check v5.0 (Machine Learning Model)") try: model = xgb.XGBClassifier() model.load_model(MODEL_FILE) term_weights = joblib.load(TERM_WEIGHTS_FILE) logger.info("Machine-Learning-Modell und Wortgewichte erfolgreich geladen.") except Exception as e: logger.critical(f"Konnte Modelldateien nicht laden. Fehler: {e}") update_status(job_id, "Fehlgeschlagen", f"Modelldateien nicht gefunden: {e}") sys.exit(1) try: sheet = GoogleSheetHandler() crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME) match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) except Exception as e: logger.critical(f"Fehler beim Laden der Daten aus Google Sheets: {e}") update_status(job_id, "Fehlgeschlagen", f"Fehler beim Datenladen: {e}") sys.exit(1) total = len(match_df) if match_df is not None else 0 if crm_df is None or crm_df.empty or match_df is None or match_df.empty: logger.critical("Leere Daten in einem der Sheets. Abbruch.") update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.") return logger.info(f"{len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze") for df in [crm_df, match_df]: df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name) df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url) df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip() df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip() global CITY_TOKENS CITY_TOKENS = {t for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']]).dropna().unique() for t in _tokenize(s) if len(t) >= 3} crm_records, domain_index, token_index = build_indexes(crm_df) results = [] logger.info("Starte Matching-Prozess mit ML-Modell…") for idx, mrow in match_df.to_dict('index').items(): processed = idx + 1 progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'" logger.info(progress_message) if processed % 10 == 0 or processed == total: update_status(job_id, "Läuft", progress_message) candidate_indices = set() if mrow.get('normalized_domain'): candidates_from_domain = domain_index.get(mrow['normalized_domain'], []) for c in candidates_from_domain: try: indices = crm_df.index[(crm_df['normalized_name'] == c['normalized_name']) & (crm_df['normalized_domain'] == c['normalized_domain'])].tolist() if indices: candidate_indices.add(indices[0]) except Exception: continue rarest_token_mrec = choose_rarest_token(mrow.get('normalized_name',''), term_weights) if len(candidate_indices) < 5 and rarest_token_mrec: candidate_indices.update(token_index.get(rarest_token_mrec, [])) if len(candidate_indices) < 5: clean1, _ = clean_name_for_scoring(mrow.get('normalized_name','')) pf = sorted([(fuzz.partial_ratio(clean1, clean_name_for_scoring(r.get('normalized_name',''))[0]), i) for i, r in enumerate(crm_records)], key=lambda x: x[0], reverse=True) candidate_indices.update([i for score, i in pf if score >= PREFILTER_MIN_PARTIAL][:PREFILTER_LIMIT]) candidates = [crm_records[i] for i in candidate_indices] if not candidates: results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) continue feature_list = [] for cr in candidates: features = create_features(mrow, cr, term_weights) feature_list.append(features) feature_df = pd.DataFrame(feature_list) feature_df = feature_df[model.feature_names_in_] probabilities = model.predict_proba(feature_df)[:, 1] scored_candidates = [] for i, prob in enumerate(probabilities): scored_candidates.append({'name': candidates[i].get('CRM Name', ''), 'score': prob, 'record': candidates[i]}) scored_candidates.sort(key=lambda x: x['score'], reverse=True) best_match = scored_candidates[0] if scored_candidates else None if best_match and best_match['score'] >= PREDICTION_THRESHOLD: results.append({'Match': best_match['name'], 'Score': round(best_match['score'] * 100), 'Match_Grund': f"ML Confidence: {round(best_match['score']*100)}%"}) logger.info(f" --> Match: '{best_match['name']}' (Confidence: {round(best_match['score']*100)}%)") else: score_val = round(best_match['score'] * 100) if best_match else 0 results.append({'Match':'', 'Score': score_val, 'Match_Grund': f"Below Threshold ({int(PREDICTION_THRESHOLD*100)}%)"}) logger.info(f" --> No Match (Confidence: {score_val}%)") logger.info("Matching-Prozess abgeschlossen. Schreibe Ergebnisse...") result_df = pd.DataFrame(results) final_df = pd.concat([match_df.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1) cols_to_drop = ['normalized_name', 'normalized_domain'] final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore') upload_df = final_df.astype(str).replace({'nan': '', 'None': ''}) data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist() ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) if ok: logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.") if job_id: update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.") else: logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.") if job_id: update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.") if __name__=='__main__': parser = argparse.ArgumentParser(description="Duplicate Checker v5.0 (ML Model)") parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.") args = parser.parse_args() # Config-Klasse wird hier nicht mehr benötigt, wenn API-Keys nicht genutzt werden # Config.load_api_keys() main(job_id=args.job_id)