Files
Brancheneinstufung2/duplicate_checker.py

262 lines
12 KiB
Python

# duplicate_checker_v5.1.py
# Build timestamp is injected into logfile name.
# --- FEATURES v5.1 ---
# - NEU: Robusteres, mehrstufiges Blocking, um sicherzustellen, dass relevante Kandidaten gefunden werden.
# - Nutzt jetzt die Top-3 seltensten Tokens, wenn die primäre Suche zu wenige Ergebnisse liefert.
# - Lädt den CRM-Datensatz aus einer lokalen .pkl-Datei, um Konsistenz zwischen Training und Anwendung zu garantieren.
import os
import sys
import re
import argparse
import json
import logging
import pandas as pd
import math
import joblib
import xgboost as xgb
from datetime import datetime
from collections import Counter
from thefuzz import fuzz
from helpers import normalize_company_name, simple_normalize_url
from config import Config
from google_sheet_handler import GoogleSheetHandler
STATUS_DIR = "job_status"
def update_status(job_id, status, progress_message):
if not job_id: return
status_file = os.path.join(STATUS_DIR, f"{job_id}.json")
try:
try:
with open(status_file, 'r') as f: data = json.load(f)
except FileNotFoundError: data = {}
data.update({"status": status, "progress": progress_message})
with open(status_file, 'w') as f: json.dump(data, f)
except Exception as e:
logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}")
# --- Konfiguration v5.1 ---
CRM_SHEET_NAME = "CRM_Accounts" # Nur noch für den Fallback, falls .pkl fehlt
MATCHING_SHEET_NAME = "Matching_Accounts"
LOG_DIR = "Log"
now = datetime.now().strftime('%Y-%m-%d_%H-%M')
LOG_FILE = f"{now}_duplicate_check_v5.1.txt"
MODEL_FILE = 'xgb_model.json'
TERM_WEIGHTS_FILE = 'term_weights.joblib'
CRM_DATA_FILE = 'crm_for_prediction.pkl' # WICHTIG
PREDICTION_THRESHOLD = 0.5
PREFILTER_MIN_PARTIAL = 70
PREFILTER_LIMIT = 50
# --- Logging Setup ---
if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True)
log_path = os.path.join(LOG_DIR, LOG_FILE)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
for h in list(root.handlers): root.removeHandler(h)
formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s")
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
root.addHandler(ch)
fh = logging.FileHandler(log_path, mode='a', encoding='utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
root.addHandler(fh)
logger = logging.getLogger(__name__)
logger.info(f"Logging to console and file: {log_path}")
logger.info(f"Starting duplicate_checker.py v5.1 | Build: {now}")
# --- Stop-/City-Tokens ---
STOP_TOKENS_BASE = {
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv',
'holding','gruppe','group','international','solutions','solution','service','services',
'deutschland','austria','germany','technik','technology','technologies','systems','systeme',
'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel',
'international','company','gesellschaft','mbh&co','mbhco','werke','werk'
}
CITY_TOKENS = set()
# --- Utilities ---
def _tokenize(s: str):
if not s: return []
return re.split(r"[^a-z0-9äöüß]+", str(s).lower())
def clean_name_for_scoring(norm_name: str):
if not norm_name: return "", set()
tokens = [t for t in _tokenize(norm_name) if len(t) >= 3]
stop_union = STOP_TOKENS_BASE | CITY_TOKENS
final_tokens = [t for t in tokens if t not in stop_union]
return " ".join(final_tokens), set(final_tokens)
def get_rarest_tokens(norm_name: str, term_weights: dict, count=3):
_, toks = clean_name_for_scoring(norm_name)
if not toks: return []
return sorted(list(toks), key=lambda t: term_weights.get(t, 0), reverse=True)[:count]
# --- Feature Engineering Funktion ---
def create_features(mrec: dict, crec: dict, term_weights: dict):
# ... (Diese Funktion bleibt exakt identisch wie in der letzten Version)
features = {}
n1_raw = mrec.get('normalized_name', '')
n2_raw = crec.get('normalized_name', '')
clean1, toks1 = clean_name_for_scoring(n1_raw)
clean2, toks2 = clean_name_for_scoring(n2_raw)
features['fuzz_ratio'] = fuzz.ratio(n1_raw, n2_raw)
features['fuzz_partial_ratio'] = fuzz.partial_ratio(n1_raw, n2_raw)
features['fuzz_token_set_ratio'] = fuzz.token_set_ratio(clean1, clean2)
features['fuzz_token_sort_ratio'] = fuzz.token_sort_ratio(clean1, clean2)
features['domain_match'] = 1 if mrec.get('normalized_domain') and mrec.get('normalized_domain') == crec.get('normalized_domain') else 0
features['city_match'] = 1 if mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort') else 0
features['country_match'] = 1 if mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land') else 0
features['country_mismatch'] = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') != crec.get('CRM Land')) else 0
overlapping_tokens = toks1 & toks2
rarest_token_mrec = get_rarest_tokens(n1_raw, term_weights, 1)[0] if get_rarest_tokens(n1_raw, term_weights, 1) else None
features['rarest_token_overlap'] = 1 if rarest_token_mrec and rarest_token_mrec in toks2 else 0
features['weighted_token_score'] = sum(term_weights.get(t, 0) for t in overlapping_tokens)
features['jaccard_similarity'] = len(overlapping_tokens) / len(toks1 | toks2) if len(toks1 | toks2) > 0 else 0
features['name_len_diff'] = abs(len(n1_raw) - len(n2_raw))
features['candidate_is_shorter'] = 1 if len(n2_raw) < len(n1_raw) else 0
return features
# --- Indexe & Hauptfunktion ---
def build_indexes(crm_df: pd.DataFrame):
records = list(crm_df.to_dict('records'))
domain_index = {}
for r in records:
d = r.get('normalized_domain')
if d: domain_index.setdefault(d, []).append(r)
token_index = {}
for idx, r in enumerate(records):
_, toks = clean_name_for_scoring(r.get('normalized_name',''))
for t in set(toks): token_index.setdefault(t, []).append(idx)
return records, domain_index, token_index
def main(job_id=None):
logger.info("Starte Duplikats-Check v5.1 (ML Model with Robust Blocking)")
try:
model = xgb.XGBClassifier()
model.load_model(MODEL_FILE)
term_weights = joblib.load(TERM_WEIGHTS_FILE)
crm_df = pd.read_pickle(CRM_DATA_FILE)
logger.info("ML-Modell, Wortgewichte und lokaler CRM-Datensatz erfolgreich geladen.")
except Exception as e:
logger.critical(f"Konnte Modelldateien/CRM-Daten nicht laden. Fehler: {e}")
update_status(job_id, "Fehlgeschlagen", f"Modelldateien/CRM-Daten nicht gefunden: {e}")
sys.exit(1)
try:
sheet = GoogleSheetHandler()
match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
except Exception as e:
logger.critical(f"Fehler beim Laden der Matching-Daten aus Google Sheets: {e}")
update_status(job_id, "Fehlgeschlagen", f"Fehler beim Matching-Datenladen: {e}")
sys.exit(1)
total = len(match_df) if match_df is not None else 0
if match_df is None or match_df.empty:
logger.critical("Leere Daten im Matching-Sheet. Abbruch.")
return
logger.info(f"{len(crm_df)} CRM-Datensätze (lokal) | {total} Matching-Datensätze")
match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name)
match_df['normalized_domain'] = match_df['CRM Website'].astype(str).apply(simple_normalize_url)
match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip()
match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip()
global CITY_TOKENS
CITY_TOKENS = {t for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']]).dropna().unique() for t in _tokenize(s) if len(t) >= 3}
crm_records, domain_index, token_index = build_indexes(crm_df)
results = []
logger.info("Starte Matching-Prozess mit ML-Modell…")
for idx, mrow in match_df.to_dict('index').items():
processed = idx + 1
progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'"
logger.info(progress_message)
if processed % 10 == 0 or processed == total: update_status(job_id, "Läuft", progress_message)
# --- NEU: Robusteres, mehrstufiges Blocking ---
candidate_indices = set()
# Stufe 1: Präzises Blocking
if mrow.get('normalized_domain'):
# Hier verwenden wir direkt die Records, da der Index-Aufbau komplexer wäre
candidates_from_domain = domain_index.get(mrow['normalized_domain'], [])
for c in candidates_from_domain:
try:
# Finde den Index des Records
indices = crm_df.index[crm_df['normalized_name'] == c['normalized_name']].tolist()
if indices: candidate_indices.add(indices[0])
except Exception: continue
# Stufe 2: Großzügiges Token-Blocking
if len(candidate_indices) < 5:
top_tokens = get_rarest_tokens(mrow.get('normalized_name',''), term_weights, count=3)
for token in top_tokens:
candidate_indices.update(token_index.get(token, []))
# Stufe 3: Fallback-Prefilter
if len(candidate_indices) < 5:
clean1, _ = clean_name_for_scoring(mrow.get('normalized_name',''))
pf = sorted([(fuzz.partial_ratio(clean1, clean_name_for_scoring(r.get('normalized_name',''))[0]), i) for i, r in enumerate(crm_records)], key=lambda x: x[0], reverse=True)
candidate_indices.update([i for score, i in pf if score >= PREFILTER_MIN_PARTIAL][:PREFILTER_LIMIT])
candidates = [crm_records[i] for i in candidate_indices]
if not candidates:
results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'})
continue
# Feature Engineering und Prediction
feature_list = [create_features(mrow, cr, term_weights) for cr in candidates]
feature_df = pd.DataFrame(feature_list)
feature_df = feature_df[model.feature_names_in_]
probabilities = model.predict_proba(feature_df)[:, 1]
scored_candidates = sorted([{'name': candidates[i].get('CRM Name', ''), 'score': prob} for i, prob in enumerate(probabilities)], key=lambda x: x['score'], reverse=True)
best_match = scored_candidates[0] if scored_candidates else None
if best_match and best_match['score'] >= PREDICTION_THRESHOLD:
results.append({'Match': best_match['name'], 'Score': round(best_match['score'] * 100), 'Match_Grund': f"ML Confidence: {round(best_match['score']*100)}%"})
logger.info(f" --> Match: '{best_match['name']}' (Confidence: {round(best_match['score']*100)}%)")
else:
score_val = round(best_match['score'] * 100) if best_match else 0
results.append({'Match':'', 'Score': score_val, 'Match_Grund': f"Below Threshold ({int(PREDICTION_THRESHOLD*100)}%)"})
logger.info(f" --> No Match (Confidence: {score_val}%)")
logger.info("Matching-Prozess abgeschlossen. Schreibe Ergebnisse...")
result_df = pd.DataFrame(results)
final_df = pd.concat([match_df.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1)
cols_to_drop = ['normalized_name', 'normalized_domain']
final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore')
upload_df = final_df.astype(str).replace({'nan': '', 'None': ''})
data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist()
ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write)
if ok:
logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.")
if job_id: update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.")
else:
logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.")
if job_id: update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.")
if __name__=='__main__':
parser = argparse.ArgumentParser(description="Duplicate Checker v5.1 (ML Model)")
parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.")
args = parser.parse_args()
main(job_id=args.job_id)