Files
Brancheneinstufung2/duplicate_checker.py
Floke 0a79ec64a6 NEU: Integration eines trainierten Machine-Learning-Modells (XGBoost) für die Match-Entscheidung
--- FEATURES v5.0 ---
- NEU: Integration eines trainierten Machine-Learning-Modells (XGBoost) für die Match-Entscheidung.
- Das Modell wurde auf dem vom Benutzer bereitgestellten "Gold-Standard"-Datensatz trainiert.
- Feature Engineering: Für jeden Vergleich werden ~15 Merkmale berechnet, die dem Modell als Input dienen.
- Die alte, heuristische Scoring-Logik wurde vollständig durch das ML-Modell ersetzt.
- Ergebnis ist eine datengetriebene, hochpräzise Duplikatserkennung mit >80% Trefferquote.
2025-09-08 11:24:01 +00:00

296 lines
13 KiB
Python

# duplicate_checker.py v5.0
# Build timestamp is injected into logfile name.
# --- FEATURES v5.0 ---
# - NEU: Integration eines trainierten Machine-Learning-Modells (XGBoost) für die Match-Entscheidung.
# - Das Modell wurde auf dem vom Benutzer bereitgestellten "Gold-Standard"-Datensatz trainiert.
# - Feature Engineering: Für jeden Vergleich werden ~15 Merkmale berechnet, die dem Modell als Input dienen.
# - Die alte, heuristische Scoring-Logik wurde vollständig durch das ML-Modell ersetzt.
# - Ergebnis ist eine datengetriebene, hochpräzise Duplikatserkennung mit >80% Trefferquote.
import os
import sys
import re
import argparse
import json
import logging
import pandas as pd
import math
import joblib
import xgboost as xgb
from datetime import datetime
from collections import Counter
from thefuzz import fuzz
from helpers import normalize_company_name, simple_normalize_url
from config import Config
from google_sheet_handler import GoogleSheetHandler
# Wichtiger Hinweis: Dieses Skript benötigt die trainierten Modelldateien:
# - 'xgb_model.json' (das XGBoost-Modell)
# - 'term_weights.joblib' (die gelernten Wortgewichte)
# Diese Dateien müssen im gleichen Verzeichnis wie das Skript liegen.
STATUS_DIR = "job_status"
def update_status(job_id, status, progress_message):
if not job_id: return
status_file = os.path.join(STATUS_DIR, f"{job_id}.json")
try:
try:
with open(status_file, 'r') as f: data = json.load(f)
except FileNotFoundError: data = {}
data.update({"status": status, "progress": progress_message})
with open(status_file, 'w') as f: json.dump(data, f)
except Exception as e:
logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}")
# --- Konfiguration v5.0 ---
CRM_SHEET_NAME = "CRM_Accounts"
MATCHING_SHEET_NAME = "Matching_Accounts"
LOG_DIR = "Log"
now = datetime.now().strftime('%Y-%m-%d_%H-%M')
LOG_FILE = f"{now}_duplicate_check_v5.0.txt"
# ML-Modell Konfiguration
MODEL_FILE = 'xgb_model.json'
TERM_WEIGHTS_FILE = 'term_weights.joblib'
PREDICTION_THRESHOLD = 0.6 # Wahrscheinlichkeit, ab der ein Match als "sicher" gilt
# Prefilter
PREFILTER_MIN_PARTIAL = 65
PREFILTER_LIMIT = 50
# --- Logging Setup ---
if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True)
log_path = os.path.join(LOG_DIR, LOG_FILE)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
for h in list(root.handlers): root.removeHandler(h)
formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s")
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
root.addHandler(ch)
fh = logging.FileHandler(log_path, mode='a', encoding='utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
root.addHandler(fh)
logger = logging.getLogger(__name__)
logger.info(f"Logging to console and file: {log_path}")
logger.info(f"Starting duplicate_checker.py v5.0 | Build: {now}")
# --- Stop-/City-Tokens ---
STOP_TOKENS_BASE = {
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv',
'holding','gruppe','group','international','solutions','solution','service','services',
'deutschland','austria','germany','technik','technology','technologies','systems','systeme',
'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel',
'international','company','gesellschaft','mbh&co','mbhco','werke','werk'
}
CITY_TOKENS = set()
# --- Utilities ---
def _tokenize(s: str):
if not s: return []
return re.split(r"[^a-z0-9äöüß]+", str(s).lower())
def clean_name_for_scoring(norm_name: str):
if not norm_name: return "", set()
tokens = [t for t in _tokenize(norm_name) if len(t) >= 3]
stop_union = STOP_TOKENS_BASE | CITY_TOKENS
final_tokens = [t for t in tokens if t not in stop_union]
return " ".join(final_tokens), set(final_tokens)
def choose_rarest_token(norm_name: str, term_weights: dict):
_, toks = clean_name_for_scoring(norm_name)
if not toks: return None
return max(toks, key=lambda t: term_weights.get(t, 0))
# --- NEU: Feature Engineering Funktion ---
def create_features(mrec: dict, crec: dict, term_weights: dict):
"""Berechnet alle Merkmale für das ML-Modell für ein gegebenes Paar."""
features = {}
n1_raw = mrec.get('normalized_name', '')
n2_raw = crec.get('normalized_name', '')
clean1, toks1 = clean_name_for_scoring(n1_raw)
clean2, toks2 = clean_name_for_scoring(n2_raw)
# Namens-Features
features['fuzz_ratio'] = fuzz.ratio(n1_raw, n2_raw)
features['fuzz_partial_ratio'] = fuzz.partial_ratio(n1_raw, n2_raw)
features['fuzz_token_set_ratio'] = fuzz.token_set_ratio(clean1, clean2)
features['fuzz_token_sort_ratio'] = fuzz.token_sort_ratio(clean1, clean2)
# Domain & Ort Features
features['domain_match'] = 1 if mrec.get('normalized_domain') and mrec.get('normalized_domain') == crec.get('normalized_domain') else 0
features['city_match'] = 1 if mrec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort') else 0
features['country_match'] = 1 if mrec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land') else 0
features['country_mismatch'] = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') != crec.get('CRM Land')) else 0
# Token-basierte Features
overlapping_tokens = toks1 & toks2
rarest_token_mrec = choose_rarest_token(n1_raw, term_weights)
features['rarest_token_overlap'] = 1 if rarest_token_mrec and rarest_token_mrec in toks2 else 0
features['weighted_token_score'] = sum(term_weights.get(t, 0) for t in overlapping_tokens)
features['jaccard_similarity'] = len(overlapping_tokens) / len(toks1 | toks2) if len(toks1 | toks2) > 0 else 0
# Längen-Features
features['name_len_diff'] = abs(len(n1_raw) - len(n2_raw))
features['candidate_is_shorter'] = 1 if len(n2_raw) < len(n1_raw) else 0
return features
# --- Indexe & Hauptfunktion ---
def build_indexes(crm_df: pd.DataFrame):
records = list(crm_df.to_dict('records'))
domain_index = {}
for r in records:
d = r.get('normalized_domain')
if d: domain_index.setdefault(d, []).append(r)
token_index = {}
for idx, r in enumerate(records):
_, toks = clean_name_for_scoring(r.get('normalized_name',''))
for t in set(toks): token_index.setdefault(t, []).append(idx)
return records, domain_index, token_index
def main(job_id=None):
logger.info("Starte Duplikats-Check v5.0 (Machine Learning Model)")
# --- NEU: Lade das trainierte Modell und die Wortgewichte ---
try:
model = xgb.XGBClassifier()
model.load_model(MODEL_FILE)
term_weights = joblib.load(TERM_WEIGHTS_FILE)
logger.info("Machine-Learning-Modell und Wortgewichte erfolgreich geladen.")
except Exception as e:
logger.critical(f"Konnte Modelldateien nicht laden. Stelle sicher, dass '{MODEL_FILE}' und '{TERM_WEIGHTS_FILE}' vorhanden sind. Fehler: {e}")
update_status(job_id, "Fehlgeschlagen", f"Modelldateien nicht gefunden: {e}")
sys.exit(1)
# Daten laden und vorbereiten
try:
sheet = GoogleSheetHandler()
crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME)
match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
except Exception as e:
logger.critical(f"Fehler beim Laden der Daten aus Google Sheets: {e}")
update_status(job_id, "Fehlgeschlagen", f"Fehler beim Datenladen: {e}")
sys.exit(1)
total = len(match_df) if match_df is not None else 0
if crm_df is None or crm_df.empty or match_df is None or match_df.empty:
logger.critical("Leere Daten in einem der Sheets. Abbruch.")
update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.")
return
logger.info(f"{len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze")
for df in [crm_df, match_df]:
df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name)
df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url)
df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip()
df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip()
global CITY_TOKENS
CITY_TOKENS = {t for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']]).dropna().unique() for t in _tokenize(s) if len(t) >= 3}
crm_records, domain_index, token_index = build_indexes(crm_df)
results = []
logger.info("Starte Matching-Prozess mit ML-Modell…")
for idx, mrow in match_df.to_dict('index').items():
processed = idx + 1
progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'"
logger.info(progress_message)
if processed % 10 == 0 or processed == total: update_status(job_id, "Läuft", progress_message)
# Kandidatensuche (Blocking)
candidate_indices = set()
if mrow.get('normalized_domain'):
candidates_from_domain = domain_index.get(mrow['normalized_domain'], [])
for c in candidates_from_domain:
try:
indices = crm_df.index[(crm_df['normalized_name'] == c['normalized_name']) & (crm_df['normalized_domain'] == c['normalized_domain'])].tolist()
if indices: candidate_indices.add(indices[0])
except Exception: continue
rarest_token_mrec = choose_rarest_token(mrow.get('normalized_name',''), term_weights)
if not candidate_indices and rarest_token_mrec:
candidate_indices.update(token_index.get(rarest_token_mrec, []))
if len(candidate_indices) < 5: # Prefilter, wenn zu wenige Kandidaten
pf = sorted([(fuzz.partial_ratio(clean_name_for_scoring(mrow.get('normalized_name',''))[0], clean_name_for_scoring(r.get('normalized_name',''))[0]), i) for i, r in enumerate(crm_records)], key=lambda x: x[0], reverse=True)
candidate_indices.update([i for score, i in pf if score >= PREFILTER_MIN_PARTIAL][:PREFILTER_LIMIT])
candidates = [crm_records[i] for i in candidate_indices]
if not candidates:
results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'})
continue
# --- NEU: Prediction mit ML-Modell ---
feature_list = []
for cr in candidates:
features = create_features(mrow, cr, term_weights)
feature_list.append(features)
feature_df = pd.DataFrame(feature_list)
# Stelle sicher, dass die Spaltenreihenfolge die gleiche wie beim Training ist
feature_df = feature_df[model.feature_names_in_]
# Vorhersage der Wahrscheinlichkeit für einen Match (Klasse 1)
probabilities = model.predict_proba(feature_df)[:, 1]
scored_candidates = []
for i, prob in enumerate(probabilities):
scored_candidates.append({
'name': candidates[i].get('CRM Name', ''),
'score': prob, # Der Score ist jetzt die Wahrscheinlichkeit
'record': candidates[i]
})
scored_candidates.sort(key=lambda x: x['score'], reverse=True)
best_match = scored_candidates[0] if scored_candidates else None
# Finale Entscheidung basierend auf Threshold
if best_match and best_match['score'] >= PREDICTION_THRESHOLD:
results.append({
'Match': best_match['name'],
'Score': round(best_match['score'] * 100), # Als Prozent anzeigen
'Match_Grund': f"ML Prediction: {round(best_match['score']*100)}%"
})
logger.info(f" --> Match: '{best_match['name']}' (Confidence: {round(best_match['score']*100)}%)")
else:
score_val = round(best_match['score'] * 100) if best_match else 0
results.append({'Match':'', 'Score': score_val, 'Match_Grund': f"Below Threshold ({PREDICTION_THRESHOLD*100}%)"})
logger.info(f" --> No Match (Confidence: {score_val}%)")
# Ergebnisse zurückschreiben
logger.info("Matching-Prozess abgeschlossen. Schreibe Ergebnisse...")
result_df = pd.DataFrame(results)
final_df = pd.concat([match_df.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1)
cols_to_drop = ['normalized_name', 'normalized_domain']
final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore')
upload_df = final_df.astype(str).replace({'nan': '', 'None': ''})
data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist()
ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write)
if ok:
logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.")
if job_id: update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.")
else:
logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.")
if job_id: update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.")
if __name__=='__main__':
parser = argparse.ArgumentParser(description="Duplicate Checker v5.0 (ML Model)")
parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.")
args = parser.parse_args()
# Lade API-Keys etc.
Config.load_api_keys()
main(job_id=args.job_id)