# duplicate_checker_v5.1.py # Build timestamp is injected into logfile name. # --- FEATURES v5.1 --- # - NEU: Robusteres, mehrstufiges Blocking, um sicherzustellen, dass relevante Kandidaten gefunden werden. # - Nutzt jetzt die Top-3 seltensten Tokens, wenn die primäre Suche zu wenige Ergebnisse liefert. # - Lädt den CRM-Datensatz aus einer lokalen .pkl-Datei, um Konsistenz zwischen Training und Anwendung zu garantieren. import os import sys import re import argparse import json import logging import pandas as pd import math import joblib import xgboost as xgb from datetime import datetime from collections import Counter from thefuzz import fuzz from helpers import normalize_company_name, simple_normalize_url from config import Config from google_sheet_handler import GoogleSheetHandler STATUS_DIR = "job_status" def update_status(job_id, status, progress_message): if not job_id: return status_file = os.path.join(STATUS_DIR, f"{job_id}.json") try: try: with open(status_file, 'r') as f: data = json.load(f) except FileNotFoundError: data = {} data.update({"status": status, "progress": progress_message}) with open(status_file, 'w') as f: json.dump(data, f) except Exception as e: logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}") # --- Konfiguration v5.1 --- CRM_SHEET_NAME = "CRM_Accounts" # Nur noch für den Fallback, falls .pkl fehlt MATCHING_SHEET_NAME = "Matching_Accounts" LOG_DIR = "Log" now = datetime.now().strftime('%Y-%m-%d_%H-%M') LOG_FILE = f"{now}_duplicate_check_v5.1.txt" MODEL_FILE = 'xgb_model.json' TERM_WEIGHTS_FILE = 'term_weights.joblib' CRM_DATA_FILE = 'crm_for_prediction.pkl' # WICHTIG PREDICTION_THRESHOLD = 0.5 PREFILTER_MIN_PARTIAL = 70 PREFILTER_LIMIT = 50 # --- Logging Setup --- if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True) log_path = os.path.join(LOG_DIR, LOG_FILE) root = logging.getLogger() root.setLevel(logging.DEBUG) for h in list(root.handlers): root.removeHandler(h) formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s") ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) ch.setFormatter(formatter) root.addHandler(ch) fh = logging.FileHandler(log_path, mode='a', encoding='utf-8') fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) root.addHandler(fh) logger = logging.getLogger(__name__) logger.info(f"Logging to console and file: {log_path}") logger.info(f"Starting duplicate_checker.py v5.1 | Build: {now}") # --- Stop-/City-Tokens --- STOP_TOKENS_BASE = { 'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv', 'holding','gruppe','group','international','solutions','solution','service','services', 'deutschland','austria','germany','technik','technology','technologies','systems','systeme', 'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel', 'international','company','gesellschaft','mbh&co','mbhco','werke','werk' } CITY_TOKENS = set() # --- Utilities --- def _tokenize(s: str): if not s: return [] return re.split(r"[^a-z0-9äöüß]+", str(s).lower()) def clean_name_for_scoring(norm_name: str): if not norm_name: return "", set() tokens = [t for t in _tokenize(norm_name) if len(t) >= 3] stop_union = STOP_TOKENS_BASE | CITY_TOKENS final_tokens = [t for t in tokens if t not in stop_union] return " ".join(final_tokens), set(final_tokens) def get_rarest_tokens(norm_name: str, term_weights: dict, count=3): _, toks = clean_name_for_scoring(norm_name) if not toks: return [] return sorted(list(toks), key=lambda t: term_weights.get(t, 0), reverse=True)[:count] # --- Feature Engineering Funktion --- def create_features(mrec: dict, crec: dict, term_weights: dict): # ... (Diese Funktion bleibt exakt identisch wie in der letzten Version) features = {} n1_raw = mrec.get('normalized_name', '') n2_raw = crec.get('normalized_name', '') clean1, toks1 = clean_name_for_scoring(n1_raw) clean2, toks2 = clean_name_for_scoring(n2_raw) features['fuzz_ratio'] = fuzz.ratio(n1_raw, n2_raw) features['fuzz_partial_ratio'] = fuzz.partial_ratio(n1_raw, n2_raw) features['fuzz_token_set_ratio'] = fuzz.token_set_ratio(clean1, clean2) features['fuzz_token_sort_ratio'] = fuzz.token_sort_ratio(clean1, clean2) features['domain_match'] = 1 if mrec.get('normalized_domain') and mrec.get('normalized_domain') == crec.get('normalized_domain') else 0 features['city_match'] = 1 if mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort') else 0 features['country_match'] = 1 if mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land') else 0 features['country_mismatch'] = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') != crec.get('CRM Land')) else 0 overlapping_tokens = toks1 & toks2 rarest_token_mrec = get_rarest_tokens(n1_raw, term_weights, 1)[0] if get_rarest_tokens(n1_raw, term_weights, 1) else None features['rarest_token_overlap'] = 1 if rarest_token_mrec and rarest_token_mrec in toks2 else 0 features['weighted_token_score'] = sum(term_weights.get(t, 0) for t in overlapping_tokens) features['jaccard_similarity'] = len(overlapping_tokens) / len(toks1 | toks2) if len(toks1 | toks2) > 0 else 0 features['name_len_diff'] = abs(len(n1_raw) - len(n2_raw)) features['candidate_is_shorter'] = 1 if len(n2_raw) < len(n1_raw) else 0 return features # --- Indexe & Hauptfunktion --- def build_indexes(crm_df: pd.DataFrame): records = list(crm_df.to_dict('records')) domain_index = {} for r in records: d = r.get('normalized_domain') if d: domain_index.setdefault(d, []).append(r) token_index = {} for idx, r in enumerate(records): _, toks = clean_name_for_scoring(r.get('normalized_name','')) for t in set(toks): token_index.setdefault(t, []).append(idx) return records, domain_index, token_index def main(job_id=None): logger.info("Starte Duplikats-Check v5.1 (ML Model with Robust Blocking)") try: model = xgb.XGBClassifier() model.load_model(MODEL_FILE) term_weights = joblib.load(TERM_WEIGHTS_FILE) crm_df = pd.read_pickle(CRM_DATA_FILE) logger.info("ML-Modell, Wortgewichte und lokaler CRM-Datensatz erfolgreich geladen.") except Exception as e: logger.critical(f"Konnte Modelldateien/CRM-Daten nicht laden. Fehler: {e}") update_status(job_id, "Fehlgeschlagen", f"Modelldateien/CRM-Daten nicht gefunden: {e}") sys.exit(1) try: sheet = GoogleSheetHandler() match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) except Exception as e: logger.critical(f"Fehler beim Laden der Matching-Daten aus Google Sheets: {e}") update_status(job_id, "Fehlgeschlagen", f"Fehler beim Matching-Datenladen: {e}") sys.exit(1) total = len(match_df) if match_df is not None else 0 if match_df is None or match_df.empty: logger.critical("Leere Daten im Matching-Sheet. Abbruch.") return logger.info(f"{len(crm_df)} CRM-Datensätze (lokal) | {total} Matching-Datensätze") match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name) match_df['normalized_domain'] = match_df['CRM Website'].astype(str).apply(simple_normalize_url) match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip() match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip() global CITY_TOKENS CITY_TOKENS = {t for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']]).dropna().unique() for t in _tokenize(s) if len(t) >= 3} crm_records, domain_index, token_index = build_indexes(crm_df) results = [] logger.info("Starte Matching-Prozess mit ML-Modell…") for idx, mrow in match_df.to_dict('index').items(): processed = idx + 1 progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'" logger.info(progress_message) if processed % 10 == 0 or processed == total: update_status(job_id, "Läuft", progress_message) # --- NEU: Robusteres, mehrstufiges Blocking --- candidate_indices = set() # Stufe 1: Präzises Blocking if mrow.get('normalized_domain'): # Hier verwenden wir direkt die Records, da der Index-Aufbau komplexer wäre candidates_from_domain = domain_index.get(mrow['normalized_domain'], []) for c in candidates_from_domain: try: # Finde den Index des Records indices = crm_df.index[crm_df['normalized_name'] == c['normalized_name']].tolist() if indices: candidate_indices.add(indices[0]) except Exception: continue # Stufe 2: Großzügiges Token-Blocking if len(candidate_indices) < 5: top_tokens = get_rarest_tokens(mrow.get('normalized_name',''), term_weights, count=3) for token in top_tokens: candidate_indices.update(token_index.get(token, [])) # Stufe 3: Fallback-Prefilter if len(candidate_indices) < 5: clean1, _ = clean_name_for_scoring(mrow.get('normalized_name','')) pf = sorted([(fuzz.partial_ratio(clean1, clean_name_for_scoring(r.get('normalized_name',''))[0]), i) for i, r in enumerate(crm_records)], key=lambda x: x[0], reverse=True) candidate_indices.update([i for score, i in pf if score >= PREFILTER_MIN_PARTIAL][:PREFILTER_LIMIT]) candidates = [crm_records[i] for i in candidate_indices] if not candidates: results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) continue # Feature Engineering und Prediction feature_list = [create_features(mrow, cr, term_weights) for cr in candidates] feature_df = pd.DataFrame(feature_list) feature_df = feature_df[model.feature_names_in_] probabilities = model.predict_proba(feature_df)[:, 1] scored_candidates = sorted([{'name': candidates[i].get('CRM Name', ''), 'score': prob} for i, prob in enumerate(probabilities)], key=lambda x: x['score'], reverse=True) best_match = scored_candidates[0] if scored_candidates else None if best_match and best_match['score'] >= PREDICTION_THRESHOLD: results.append({'Match': best_match['name'], 'Score': round(best_match['score'] * 100), 'Match_Grund': f"ML Confidence: {round(best_match['score']*100)}%"}) logger.info(f" --> Match: '{best_match['name']}' (Confidence: {round(best_match['score']*100)}%)") else: score_val = round(best_match['score'] * 100) if best_match else 0 results.append({'Match':'', 'Score': score_val, 'Match_Grund': f"Below Threshold ({int(PREDICTION_THRESHOLD*100)}%)"}) logger.info(f" --> No Match (Confidence: {score_val}%)") logger.info("Matching-Prozess abgeschlossen. Schreibe Ergebnisse...") result_df = pd.DataFrame(results) final_df = pd.concat([match_df.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1) cols_to_drop = ['normalized_name', 'normalized_domain'] final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore') upload_df = final_df.astype(str).replace({'nan': '', 'None': ''}) data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist() ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) if ok: logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.") if job_id: update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.") else: logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.") if job_id: update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.") if __name__=='__main__': parser = argparse.ArgumentParser(description="Duplicate Checker v5.1 (ML Model)") parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.") args = parser.parse_args() main(job_id=args.job_id)