duplicate_checker.py aktualisiert

This commit is contained in:
2025-09-24 14:23:45 +00:00
parent b74116c821
commit 4eea3f0f80

View File

@@ -1,10 +1,4 @@
# duplicate_checker_v5.1.py # duplicate_checker_v6.0.py
# Build timestamp is injected into logfile name.
# --- FEATURES v5.1 ---
# - NEU: Robusteres, mehrstufiges Blocking, um sicherzustellen, dass relevante Kandidaten gefunden werden.
# - Nutzt jetzt die Top-3 seltensten Tokens, wenn die primäre Suche zu wenige Ergebnisse liefert.
# - Lädt den CRM-Datensatz aus einer lokalen .pkl-Datei, um Konsistenz zwischen Training und Anwendung zu garantieren.
import os import os
import sys import sys
@@ -13,9 +7,9 @@ import argparse
import json import json
import logging import logging
import pandas as pd import pandas as pd
import math import numpy as np
import joblib import joblib
import xgboost as xgb import treelite_runtime
from datetime import datetime from datetime import datetime
from collections import Counter from collections import Counter
from thefuzz import fuzz from thefuzz import fuzz
@@ -24,35 +18,18 @@ from config import Config
from google_sheet_handler import GoogleSheetHandler from google_sheet_handler import GoogleSheetHandler
STATUS_DIR = "job_status" STATUS_DIR = "job_status"
LOG_DIR = "Log"
def update_status(job_id, status, progress_message): MODEL_FILE = 'xgb_model.json' # Wird nicht mehr direkt genutzt, aber als Referenz behalten
if not job_id: return TERM_WEIGHTS_FILE = 'term_weights.joblib'
status_file = os.path.join(STATUS_DIR, f"{job_id}.json") CRM_DATA_FILE = 'crm_for_prediction.pkl'
try: TREELITE_MODEL_FILE = 'xgb_model.treelite'
try:
with open(status_file, 'r') as f: data = json.load(f)
except FileNotFoundError: data = {}
data.update({"status": status, "progress": progress_message})
with open(status_file, 'w') as f: json.dump(data, f)
except Exception as e:
logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}")
# --- Konfiguration v5.1 ---
CRM_SHEET_NAME = "CRM_Accounts" # Nur noch für den Fallback, falls .pkl fehlt
MATCHING_SHEET_NAME = "Matching_Accounts"
LOG_DIR = "Log"
now = datetime.now().strftime('%Y-%m-%d_%H-%M')
LOG_FILE = f"{now}_duplicate_check_v5.1.txt"
MODEL_FILE = 'xgb_model.json'
TERM_WEIGHTS_FILE = 'term_weights.joblib'
CRM_DATA_FILE = 'crm_for_prediction.pkl' # WICHTIG
PREDICTION_THRESHOLD = 0.5 PREDICTION_THRESHOLD = 0.5
PREFILTER_MIN_PARTIAL = 65
PREFILTER_MIN_PARTIAL = 70 PREFILTER_LIMIT = 50
PREFILTER_LIMIT = 50
# --- Logging Setup --- # --- Logging Setup ---
now = datetime.now().strftime('%Y-%m-%d_%H-%M')
LOG_FILE = f"{now}_duplicate_check_v6.0.txt"
if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True) if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True)
log_path = os.path.join(LOG_DIR, LOG_FILE) log_path = os.path.join(LOG_DIR, LOG_FILE)
root = logging.getLogger() root = logging.getLogger()
@@ -68,20 +45,27 @@ fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter) fh.setFormatter(formatter)
root.addHandler(fh) root.addHandler(fh)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.info(f"Logging to console and file: {log_path}")
logger.info(f"Starting duplicate_checker.py v5.1 | Build: {now}")
# --- Stop-/City-Tokens --- # --- Stop-/City-Tokens ---
STOP_TOKENS_BASE = { STOP_TOKENS_BASE = {
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv', 'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv',
'holding','gruppe','group','international','solutions','solution','service','services', 'holding','gruppe','group','international','solutions','solution','service','services',
'deutschland','austria','germany','technik','technology','technologies','systems','systeme',
'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel',
'international','company','gesellschaft','mbh&co','mbhco','werke','werk'
} }
CITY_TOKENS = set() CITY_TOKENS = set()
# --- Utilities --- # --- Hilfsfunktionen ---
def update_status(job_id, status, progress_message):
if not job_id: return
status_file = os.path.join(STATUS_DIR, f"{job_id}.json")
try:
try:
with open(status_file, 'r') as f: data = json.load(f)
except FileNotFoundError: data = {}
data.update({"status": status, "progress": progress_message})
with open(status_file, 'w') as f: json.dump(data, f)
except Exception as e:
logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}")
def _tokenize(s: str): def _tokenize(s: str):
if not s: return [] if not s: return []
return re.split(r"[^a-z0-9äöüß]+", str(s).lower()) return re.split(r"[^a-z0-9äöüß]+", str(s).lower())
@@ -98,9 +82,7 @@ def get_rarest_tokens(norm_name: str, term_weights: dict, count=3):
if not toks: return [] if not toks: return []
return sorted(list(toks), key=lambda t: term_weights.get(t, 0), reverse=True)[:count] return sorted(list(toks), key=lambda t: term_weights.get(t, 0), reverse=True)[:count]
# --- Feature Engineering Funktion --- def create_features(mrec: dict, crec: dict, term_weights: dict, feature_names: list):
def create_features(mrec: dict, crec: dict, term_weights: dict):
# ... (Diese Funktion bleibt exakt identisch wie in der letzten Version)
features = {} features = {}
n1_raw = mrec.get('normalized_name', '') n1_raw = mrec.get('normalized_name', '')
n2_raw = crec.get('normalized_name', '') n2_raw = crec.get('normalized_name', '')
@@ -127,9 +109,9 @@ def create_features(mrec: dict, crec: dict, term_weights: dict):
features['name_len_diff'] = abs(len(n1_raw) - len(n2_raw)) features['name_len_diff'] = abs(len(n1_raw) - len(n2_raw))
features['candidate_is_shorter'] = 1 if len(n2_raw) < len(n1_raw) else 0 features['candidate_is_shorter'] = 1 if len(n2_raw) < len(n1_raw) else 0
return features # Stelle sicher, dass das Array die gleiche Reihenfolge hat wie beim Training
return [features.get(name, 0) for name in feature_names]
# --- Indexe & Hauptfunktion ---
def build_indexes(crm_df: pd.DataFrame): def build_indexes(crm_df: pd.DataFrame):
records = list(crm_df.to_dict('records')) records = list(crm_df.to_dict('records'))
domain_index = {} domain_index = {}
@@ -143,17 +125,15 @@ def build_indexes(crm_df: pd.DataFrame):
return records, domain_index, token_index return records, domain_index, token_index
def main(job_id=None): def main(job_id=None):
logger.info("Starte Duplikats-Check v5.1 (ML Model with Robust Blocking)") logger.info(f"Starting duplicate_checker.py v6.0 | Build: {now}")
try: try:
model = xgb.XGBClassifier() predictor = treelite_runtime.Predictor(TREELITE_MODEL_FILE, nthread=4)
model.load_model(MODEL_FILE)
term_weights = joblib.load(TERM_WEIGHTS_FILE) term_weights = joblib.load(TERM_WEIGHTS_FILE)
crm_df = pd.read_pickle(CRM_DATA_FILE) crm_df = pd.read_pickle(CRM_DATA_FILE)
logger.info("ML-Modell, Wortgewichte und lokaler CRM-Datensatz erfolgreich geladen.") logger.info("Treelite-Modell, Gewichte und lokaler CRM-Datensatz erfolgreich geladen.")
except Exception as e: except Exception as e:
logger.critical(f"Konnte Modelldateien/CRM-Daten nicht laden. Fehler: {e}") logger.critical(f"Konnte Modelldateien/CRM-Daten nicht laden. Fehler: {e}")
update_status(job_id, "Fehlgeschlagen", f"Modelldateien/CRM-Daten nicht gefunden: {e}")
sys.exit(1) sys.exit(1)
try: try:
@@ -161,13 +141,9 @@ def main(job_id=None):
match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
except Exception as e: except Exception as e:
logger.critical(f"Fehler beim Laden der Matching-Daten aus Google Sheets: {e}") logger.critical(f"Fehler beim Laden der Matching-Daten aus Google Sheets: {e}")
update_status(job_id, "Fehlgeschlagen", f"Fehler beim Matching-Datenladen: {e}")
sys.exit(1) sys.exit(1)
total = len(match_df) if match_df is not None else 0 total = len(match_df) if match_df is not None else 0
if match_df is None or match_df.empty:
logger.critical("Leere Daten im Matching-Sheet. Abbruch.")
return
logger.info(f"{len(crm_df)} CRM-Datensätze (lokal) | {total} Matching-Datensätze") logger.info(f"{len(crm_df)} CRM-Datensätze (lokal) | {total} Matching-Datensätze")
match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name) match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name)
@@ -189,27 +165,20 @@ def main(job_id=None):
logger.info(progress_message) logger.info(progress_message)
if processed % 10 == 0 or processed == total: update_status(job_id, "Läuft", progress_message) if processed % 10 == 0 or processed == total: update_status(job_id, "Läuft", progress_message)
# --- NEU: Robusteres, mehrstufiges Blocking ---
candidate_indices = set() candidate_indices = set()
# Stufe 1: Präzises Blocking
if mrow.get('normalized_domain'): if mrow.get('normalized_domain'):
# Hier verwenden wir direkt die Records, da der Index-Aufbau komplexer wäre
candidates_from_domain = domain_index.get(mrow['normalized_domain'], []) candidates_from_domain = domain_index.get(mrow['normalized_domain'], [])
for c in candidates_from_domain: for c in candidates_from_domain:
try: try:
# Finde den Index des Records
indices = crm_df.index[crm_df['normalized_name'] == c['normalized_name']].tolist() indices = crm_df.index[crm_df['normalized_name'] == c['normalized_name']].tolist()
if indices: candidate_indices.add(indices[0]) if indices: candidate_indices.add(indices[0])
except Exception: continue except Exception: continue
# Stufe 2: Großzügiges Token-Blocking
if len(candidate_indices) < 5: if len(candidate_indices) < 5:
top_tokens = get_rarest_tokens(mrow.get('normalized_name',''), term_weights, count=3) top_tokens = get_rarest_tokens(mrow.get('normalized_name',''), term_weights, count=3)
for token in top_tokens: for token in top_tokens:
candidate_indices.update(token_index.get(token, [])) candidate_indices.update(token_index.get(token, []))
# Stufe 3: Fallback-Prefilter
if len(candidate_indices) < 5: if len(candidate_indices) < 5:
clean1, _ = clean_name_for_scoring(mrow.get('normalized_name','')) clean1, _ = clean_name_for_scoring(mrow.get('normalized_name',''))
pf = sorted([(fuzz.partial_ratio(clean1, clean_name_for_scoring(r.get('normalized_name',''))[0]), i) for i, r in enumerate(crm_records)], key=lambda x: x[0], reverse=True) pf = sorted([(fuzz.partial_ratio(clean1, clean_name_for_scoring(r.get('normalized_name',''))[0]), i) for i, r in enumerate(crm_records)], key=lambda x: x[0], reverse=True)
@@ -220,15 +189,12 @@ def main(job_id=None):
results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'})
continue continue
# Feature Engineering und Prediction feature_list = [create_features(mrow, cr, term_weights, predictor.feature_names) for cr in candidates]
feature_list = [create_features(mrow, cr, term_weights) for cr in candidates]
feature_df = pd.DataFrame(feature_list)
feature_df = feature_df[model.feature_names_in_]
probabilities = model.predict_proba(feature_df)[:, 1] dmatrix = treelite_runtime.DMatrix(np.array(feature_list, dtype='float32'))
probabilities = predictor.predict(dmatrix)[:, 1]
scored_candidates = sorted([{'name': candidates[i].get('CRM Name', ''), 'score': prob} for i, prob in enumerate(probabilities)], key=lambda x: x['score'], reverse=True) scored_candidates = sorted([{'name': candidates[i].get('CRM Name', ''), 'score': prob} for i, prob in enumerate(probabilities)], key=lambda x: x['score'], reverse=True)
best_match = scored_candidates[0] if scored_candidates else None best_match = scored_candidates[0] if scored_candidates else None
if best_match and best_match['score'] >= PREDICTION_THRESHOLD: if best_match and best_match['score'] >= PREDICTION_THRESHOLD:
@@ -256,7 +222,7 @@ def main(job_id=None):
if job_id: update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.") if job_id: update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.")
if __name__=='__main__': if __name__=='__main__':
parser = argparse.ArgumentParser(description="Duplicate Checker v5.1 (ML Model)") parser = argparse.ArgumentParser(description="Duplicate Checker v6.0 (Treelite Model)")
parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.") parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.")
args = parser.parse_args() args = parser.parse_args()
main(job_id=args.job_id) main(job_id=args.job_id)