# duplicate_checker_v6.0.py import os import sys import re import argparse import json import logging import pandas as pd import numpy as np import joblib import treelite_runtime from datetime import datetime from collections import Counter from thefuzz import fuzz from helpers import normalize_company_name, simple_normalize_url from config import Config from google_sheet_handler import GoogleSheetHandler STATUS_DIR = "job_status" LOG_DIR = "Log" MODEL_FILE = 'xgb_model.json' # Wird nicht mehr direkt genutzt, aber als Referenz behalten TERM_WEIGHTS_FILE = 'term_weights.joblib' CRM_DATA_FILE = 'crm_for_prediction.pkl' TREELITE_MODEL_FILE = 'xgb_model.treelite' PREDICTION_THRESHOLD = 0.5 PREFILTER_MIN_PARTIAL = 65 PREFILTER_LIMIT = 50 # --- Logging Setup --- now = datetime.now().strftime('%Y-%m-%d_%H-%M') LOG_FILE = f"{now}_duplicate_check_v6.0.txt" if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True) log_path = os.path.join(LOG_DIR, LOG_FILE) root = logging.getLogger() root.setLevel(logging.DEBUG) for h in list(root.handlers): root.removeHandler(h) formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s") ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) ch.setFormatter(formatter) root.addHandler(ch) fh = logging.FileHandler(log_path, mode='a', encoding='utf-8') fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) root.addHandler(fh) logger = logging.getLogger(__name__) # --- Stop-/City-Tokens --- STOP_TOKENS_BASE = { 'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv', 'holding','gruppe','group','international','solutions','solution','service','services', } CITY_TOKENS = set() # --- Hilfsfunktionen --- def update_status(job_id, status, progress_message): if not job_id: return status_file = os.path.join(STATUS_DIR, f"{job_id}.json") try: try: with open(status_file, 'r') as f: data = json.load(f) except FileNotFoundError: data = {} data.update({"status": status, "progress": progress_message}) with open(status_file, 'w') as f: json.dump(data, f) except Exception as e: logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}") def _tokenize(s: str): if not s: return [] return re.split(r"[^a-z0-9äöüß]+", str(s).lower()) def clean_name_for_scoring(norm_name: str): if not norm_name: return "", set() tokens = [t for t in _tokenize(norm_name) if len(t) >= 3] stop_union = STOP_TOKENS_BASE | CITY_TOKENS final_tokens = [t for t in tokens if t not in stop_union] return " ".join(final_tokens), set(final_tokens) def get_rarest_tokens(norm_name: str, term_weights: dict, count=3): _, toks = clean_name_for_scoring(norm_name) if not toks: return [] return sorted(list(toks), key=lambda t: term_weights.get(t, 0), reverse=True)[:count] def create_features(mrec: dict, crec: dict, term_weights: dict, feature_names: list): features = {} n1_raw = mrec.get('normalized_name', '') n2_raw = crec.get('normalized_name', '') clean1, toks1 = clean_name_for_scoring(n1_raw) clean2, toks2 = clean_name_for_scoring(n2_raw) features['fuzz_ratio'] = fuzz.ratio(n1_raw, n2_raw) features['fuzz_partial_ratio'] = fuzz.partial_ratio(n1_raw, n2_raw) features['fuzz_token_set_ratio'] = fuzz.token_set_ratio(clean1, clean2) features['fuzz_token_sort_ratio'] = fuzz.token_sort_ratio(clean1, clean2) features['domain_match'] = 1 if mrec.get('normalized_domain') and mrec.get('normalized_domain') == crec.get('normalized_domain') else 0 features['city_match'] = 1 if mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort') else 0 features['country_match'] = 1 if mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land') else 0 features['country_mismatch'] = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') != crec.get('CRM Land')) else 0 overlapping_tokens = toks1 & toks2 rarest_token_mrec = get_rarest_tokens(n1_raw, term_weights, 1)[0] if get_rarest_tokens(n1_raw, term_weights, 1) else None features['rarest_token_overlap'] = 1 if rarest_token_mrec and rarest_token_mrec in toks2 else 0 features['weighted_token_score'] = sum(term_weights.get(t, 0) for t in overlapping_tokens) features['jaccard_similarity'] = len(overlapping_tokens) / len(toks1 | toks2) if len(toks1 | toks2) > 0 else 0 features['name_len_diff'] = abs(len(n1_raw) - len(n2_raw)) features['candidate_is_shorter'] = 1 if len(n2_raw) < len(n1_raw) else 0 # Stelle sicher, dass das Array die gleiche Reihenfolge hat wie beim Training return [features.get(name, 0) for name in feature_names] def build_indexes(crm_df: pd.DataFrame): records = list(crm_df.to_dict('records')) domain_index = {} for r in records: d = r.get('normalized_domain') if d: domain_index.setdefault(d, []).append(r) token_index = {} for idx, r in enumerate(records): _, toks = clean_name_for_scoring(r.get('normalized_name','')) for t in set(toks): token_index.setdefault(t, []).append(idx) return records, domain_index, token_index def main(job_id=None): logger.info(f"Starting duplicate_checker.py v6.0 | Build: {now}") try: predictor = treelite_runtime.Predictor(TREELITE_MODEL_FILE, nthread=4) term_weights = joblib.load(TERM_WEIGHTS_FILE) crm_df = pd.read_pickle(CRM_DATA_FILE) logger.info("Treelite-Modell, Gewichte und lokaler CRM-Datensatz erfolgreich geladen.") except Exception as e: logger.critical(f"Konnte Modelldateien/CRM-Daten nicht laden. Fehler: {e}") sys.exit(1) try: sheet = GoogleSheetHandler() match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) except Exception as e: logger.critical(f"Fehler beim Laden der Matching-Daten aus Google Sheets: {e}") sys.exit(1) total = len(match_df) if match_df is not None else 0 logger.info(f"{len(crm_df)} CRM-Datensätze (lokal) | {total} Matching-Datensätze") match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name) match_df['normalized_domain'] = match_df['CRM Website'].astype(str).apply(simple_normalize_url) match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip() match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip() global CITY_TOKENS CITY_TOKENS = {t for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']]).dropna().unique() for t in _tokenize(s) if len(t) >= 3} crm_records, domain_index, token_index = build_indexes(crm_df) results = [] logger.info("Starte Matching-Prozess mit ML-Modell…") for idx, mrow in match_df.to_dict('index').items(): processed = idx + 1 progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'" logger.info(progress_message) if processed % 10 == 0 or processed == total: update_status(job_id, "Läuft", progress_message) candidate_indices = set() if mrow.get('normalized_domain'): candidates_from_domain = domain_index.get(mrow['normalized_domain'], []) for c in candidates_from_domain: try: indices = crm_df.index[crm_df['normalized_name'] == c['normalized_name']].tolist() if indices: candidate_indices.add(indices[0]) except Exception: continue if len(candidate_indices) < 5: top_tokens = get_rarest_tokens(mrow.get('normalized_name',''), term_weights, count=3) for token in top_tokens: candidate_indices.update(token_index.get(token, [])) if len(candidate_indices) < 5: clean1, _ = clean_name_for_scoring(mrow.get('normalized_name','')) pf = sorted([(fuzz.partial_ratio(clean1, clean_name_for_scoring(r.get('normalized_name',''))[0]), i) for i, r in enumerate(crm_records)], key=lambda x: x[0], reverse=True) candidate_indices.update([i for score, i in pf if score >= PREFILTER_MIN_PARTIAL][:PREFILTER_LIMIT]) candidates = [crm_records[i] for i in candidate_indices] if not candidates: results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) continue feature_list = [create_features(mrow, cr, term_weights, predictor.feature_names) for cr in candidates] dmatrix = treelite_runtime.DMatrix(np.array(feature_list, dtype='float32')) probabilities = predictor.predict(dmatrix)[:, 1] scored_candidates = sorted([{'name': candidates[i].get('CRM Name', ''), 'score': prob} for i, prob in enumerate(probabilities)], key=lambda x: x['score'], reverse=True) best_match = scored_candidates[0] if scored_candidates else None if best_match and best_match['score'] >= PREDICTION_THRESHOLD: results.append({'Match': best_match['name'], 'Score': round(best_match['score'] * 100), 'Match_Grund': f"ML Confidence: {round(best_match['score']*100)}%"}) logger.info(f" --> Match: '{best_match['name']}' (Confidence: {round(best_match['score']*100)}%)") else: score_val = round(best_match['score'] * 100) if best_match else 0 results.append({'Match':'', 'Score': score_val, 'Match_Grund': f"Below Threshold ({int(PREDICTION_THRESHOLD*100)}%)"}) logger.info(f" --> No Match (Confidence: {score_val}%)") logger.info("Matching-Prozess abgeschlossen. Schreibe Ergebnisse...") result_df = pd.DataFrame(results) final_df = pd.concat([match_df.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1) cols_to_drop = ['normalized_name', 'normalized_domain'] final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore') upload_df = final_df.astype(str).replace({'nan': '', 'None': ''}) data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist() ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) if ok: logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.") if job_id: update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.") else: logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.") if job_id: update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.") if __name__=='__main__': parser = argparse.ArgumentParser(description="Duplicate Checker v6.0 (Treelite Model)") parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.") args = parser.parse_args() main(job_id=args.job_id)