Files
Brancheneinstufung2/duplicate_checker.py
Floke 24e32da53c duplicate_checker.py aktualisiert
- NEU: Mehrstufiges Entscheidungsmodell für höhere Präzision und "Großzügigkeit".
- Stufe 1: "Golden Match" für exakte Treffer.
- Stufe 2: "Kernidentitäts-Bonus & Tie-Breaker" zur korrekten Zuordnung von Konzerngesellschaften.
- Stufe 3: Neu kalibrierter, gewichteter Score für alle anderen Fälle.
- Intelligenter Tie-Breaker, der nur bei wirklich guten und engen Kandidaten greift.
2025-09-05 11:40:52 +00:00

319 lines
15 KiB
Python

# duplicate_checker.py v5.0
# Build timestamp is injected into logfile name.
# --- FEATURES v5.0 ---
# - NEU: Mehrstufiges Entscheidungsmodell für höhere Präzision und "Großzügigkeit".
# - Stufe 1: "Golden Match" für exakte Treffer.
# - Stufe 2: "Kernidentitäts-Bonus & Tie-Breaker" zur korrekten Zuordnung von Konzerngesellschaften.
# - Stufe 3: Neu kalibrierter, gewichteter Score für alle anderen Fälle.
# - Intelligenter Tie-Breaker, der nur bei wirklich guten und engen Kandidaten greift.
import os
import sys
import re
import argparse
import json
import logging
import pandas as pd
import math
from datetime import datetime
from collections import Counter
from thefuzz import fuzz
from helpers import normalize_company_name, simple_normalize_url, serp_website_lookup
from config import Config
from google_sheet_handler import GoogleSheetHandler
STATUS_DIR = "job_status"
def update_status(job_id, status, progress_message):
if not job_id: return
status_file = os.path.join(STATUS_DIR, f"{job_id}.json")
try:
try:
with open(status_file, 'r') as f: data = json.load(f)
except FileNotFoundError: data = {}
data.update({"status": status, "progress": progress_message})
with open(status_file, 'w') as f: json.dump(data, f)
except Exception as e:
logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}")
# --- Konfiguration v5.0 ---
CRM_SHEET_NAME = "CRM_Accounts"
MATCHING_SHEET_NAME = "Matching_Accounts"
LOG_DIR = "Log"
now = datetime.now().strftime('%Y-%m-%d_%H-%M')
LOG_FILE = f"{now}_duplicate_check_v5.0.txt"
# Scoring-Konfiguration
SCORE_THRESHOLD = 110 # Standard-Schwelle
SCORE_THRESHOLD_WEAK= 140 # Schwelle für Matches ohne Domain oder Ort
GOLDEN_MATCH_RATIO = 97
GOLDEN_MATCH_SCORE = 300
CORE_IDENTITY_BONUS = 50 # Bonus für die Übereinstimmung des wichtigsten Tokens
# Tie-Breaker & Interaktiver Modus
TRIGGER_SCORE_MIN = 150 # Mindestscore für Tie-Breaker / Interaktiv
TIE_SCORE_DIFF = 25
# Prefilter
PREFILTER_MIN_PARTIAL = 70
PREFILTER_LIMIT = 30
# --- Logging Setup ---
if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True)
log_path = os.path.join(LOG_DIR, LOG_FILE)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
for h in list(root.handlers): root.removeHandler(h)
formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s")
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
root.addHandler(ch)
fh = logging.FileHandler(log_path, mode='a', encoding='utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
root.addHandler(fh)
logger = logging.getLogger(__name__)
logger.info(f"Logging to console and file: {log_path}")
logger.info(f"Starting duplicate_checker.py v5.0 | Build: {now}")
# --- API Keys ---
try:
Config.load_api_keys()
serp_key = Config.API_KEYS.get('serpapi')
if not serp_key: logger.warning("SerpAPI Key nicht gefunden; Serp-Fallback deaktiviert.")
except Exception as e:
logger.warning(f"Fehler beim Laden API-Keys: {e}")
serp_key = None
# --- Stop-/City-Tokens ---
STOP_TOKENS_BASE = {
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv',
'holding','gruppe','group','international','solutions','solution','service','services',
'deutschland','austria','germany','technik','technology','technologies','systems','systeme',
'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel',
'international','company','gesellschaft','mbh&co','mbhco','werke','werk'
}
CITY_TOKENS = set()
# --- Utilities ---
def _tokenize(s: str):
if not s: return []
return re.split(r"[^a-z0-9äöüß]+", str(s).lower())
def clean_name_for_scoring(norm_name: str):
if not norm_name: return "", set()
tokens = [t for t in _tokenize(norm_name) if len(t) >= 3]
stop_union = STOP_TOKENS_BASE | CITY_TOKENS
final_tokens = [t for t in tokens if t not in stop_union]
return " ".join(final_tokens), set(final_tokens)
def build_term_weights(crm_df: pd.DataFrame):
logger.info("Starte Berechnung der Wortgewichte (TF-IDF)...")
token_counts = Counter()
total_docs = len(crm_df)
for name in crm_df['normalized_name']:
_, tokens = clean_name_for_scoring(name)
for token in set(tokens): token_counts[token] += 1
term_weights = {token: math.log(total_docs / (count + 1)) for token, count in token_counts.items()}
logger.info(f"Wortgewichte für {len(term_weights)} Tokens berechnet.")
return term_weights
# --- Similarity v5.0 ---
def calculate_similarity(mrec: dict, crec: dict, term_weights: dict, rarest_token_mrec: str):
n1_raw = mrec.get('normalized_name', '')
n2_raw = crec.get('normalized_name', '')
if fuzz.ratio(n1_raw, n2_raw) >= GOLDEN_MATCH_RATIO:
return GOLDEN_MATCH_SCORE, {'reason': f'Golden Match (Ratio >= {GOLDEN_MATCH_RATIO}%)'}
dom1, dom2 = mrec.get('normalized_domain',''), crec.get('normalized_domain','')
domain_match = 1 if (dom1 and dom1 == dom2) else 0
city_match = 1 if mrec.get('CRM Ort') and crec.get('CRM Ort') == mrec.get('CRM Ort') else 0
country_match = 1 if mrec.get('CRM Land') and crec.get('CRM Land') == mrec.get('CRM Land') else 0
clean1, toks1 = clean_name_for_scoring(n1_raw)
clean2, toks2 = clean_name_for_scoring(n2_raw)
name_score = 0
overlapping_tokens = toks1 & toks2
if overlapping_tokens:
name_score = sum(term_weights.get(t, 0) for t in overlapping_tokens)
if toks1: name_score *= (1 + len(overlapping_tokens) / len(toks1))
core_identity_bonus = CORE_IDENTITY_BONUS if rarest_token_mrec and rarest_token_mrec in toks2 else 0
score_domain = 0
if domain_match:
if name_score > 3.0 or (city_match and country_match): score_domain = 75
else: score_domain = 20
score_location = 25 if (city_match and country_match) else 0
total = name_score * 10 + score_domain + score_location + core_identity_bonus
penalties = 0
if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match: penalties += 40
if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match: penalties += 30
total -= penalties
comp = {
'name_score': round(name_score,1), 'domain': domain_match, 'location': int(city_match and country_match),
'core_bonus': core_identity_bonus, 'penalties': penalties, 'tokens': list(overlapping_tokens)
}
return max(0, round(total)), comp
# --- Indexe & Hauptfunktion ---
def build_indexes(crm_df: pd.DataFrame):
records = list(crm_df.to_dict('records'))
domain_index = {}
for r in records:
d = r.get('normalized_domain')
if d: domain_index.setdefault(d, []).append(r)
token_index = {}
for idx, r in enumerate(records):
_, toks = clean_name_for_scoring(r.get('normalized_name',''))
for t in set(toks): token_index.setdefault(t, []).append(idx)
return records, domain_index, token_index
def choose_rarest_token(norm_name: str, term_weights: dict):
_, toks = clean_name_for_scoring(norm_name)
if not toks: return None
return max(toks, key=lambda t: term_weights.get(t, 0))
def main(job_id=None, interactive=False):
logger.info("Starte Duplikats-Check v5.0 (Hybrid Model & Core Identity)")
# ... (Initialisierung und Datenladen bleibt identisch)
update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...")
try:
sheet = GoogleSheetHandler()
except Exception as e:
logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}")
update_status(job_id, "Fehlgeschlagen", f"Init GoogleSheetHandler fehlgeschlagen: {e}")
sys.exit(1)
update_status(job_id, "Läuft", "Lade CRM- und Matching-Daten...")
crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME)
match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
total = len(match_df) if match_df is not None else 0
if crm_df is None or crm_df.empty or match_df is None or match_df.empty:
logger.critical("Leere Daten in einem der Sheets. Abbruch.")
update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.")
return
logger.info(f"{len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze")
update_status(job_id, "Läuft", "Normalisiere Daten...")
for df in [crm_df, match_df]:
df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name)
df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url)
df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip()
df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip()
global CITY_TOKENS
CITY_TOKENS = {t for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']]).dropna().unique() for t in _tokenize(s) if len(t) >= 3}
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
term_weights = build_term_weights(crm_df)
crm_records, domain_index, token_index = build_indexes(crm_df)
logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}")
results = []
logger.info("Starte Matching-Prozess…")
for idx, mrow in match_df.to_dict('index').items():
processed = idx + 1
progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'"
logger.info(progress_message)
if processed % 10 == 0 or processed == total: update_status(job_id, "Läuft", progress_message)
candidate_indices = set()
# ... (Kandidatensuche bleibt gleich)
if mrow.get('normalized_domain'):
candidates_from_domain = domain_index.get(mrow['normalized_domain'], [])
for c in candidates_from_domain:
try:
indices = crm_df.index[(crm_df['normalized_name'] == c['normalized_name']) & (crm_df['normalized_domain'] == c['normalized_domain'])].tolist()
if indices: candidate_indices.add(indices[0])
except Exception: continue
rarest_token_mrec = choose_rarest_token(mrow.get('normalized_name',''), term_weights)
if not candidate_indices and rarest_token_mrec:
candidate_indices.update(token_index.get(rarest_token_mrec, []))
if not candidate_indices:
pf = sorted([(fuzz.partial_ratio(clean_name_for_scoring(mrow.get('normalized_name',''))[0], clean_name_for_scoring(r.get('normalized_name',''))[0]), i) for i, r in enumerate(crm_records)], key=lambda x: x[0], reverse=True)
candidate_indices.update([i for score, i in pf if score >= PREFILTER_MIN_PARTIAL][:PREFILTER_LIMIT])
candidates = [crm_records[i] for i in candidate_indices]
if not candidates:
results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'})
continue
scored = sorted([{'score': s, 'comp': c, 'record': r} for r in candidates for s, c in [calculate_similarity(mrow, r, term_weights, rarest_token_mrec)]], key=lambda x: x['score'], reverse=True)
for cand in scored[:5]: logger.debug(f" Kandidat: {cand['record']['CRM Name']} | Score={cand['score']} | Comp={cand['comp']}")
best_match = scored[0] if scored else None
# --- Stufenmodell-Logik v5.0 ---
if best_match:
# Stufe 1 ist bereits in calculate_similarity behandelt (score=300)
# Stufe 2: Intelligenter Tie-Breaker für Konzern-Logik
best_score = best_match['score']
if len(scored) > 1 and best_score >= TRIGGER_SCORE_MIN and (best_score - scored[1]['score']) < TIE_SCORE_DIFF and best_score < GOLDEN_MATCH_SCORE:
if interactive: # Stufe 4: Manuelle Klärung
# ... (Interaktive Logik wie gehabt)
pass
else: # Stufe 2 Automatik
logger.info(f" Tie-Breaker-Situation erkannt. Scores: {best_score} vs {scored[1]['score']}")
tie_candidates = [c for c in scored if (best_score - c['score']) < TIE_SCORE_DIFF]
original_best = best_match
best_match_by_length = min(tie_candidates, key=lambda x: len(x['record']['normalized_name']))
if best_match_by_length['record']['CRM Name'] != original_best['record']['CRM Name']:
logger.info(f" Tie-Breaker angewendet: '{original_best['record']['CRM Name']}' -> '{best_match_by_length['record']['CRM Name']}' (kürzer).")
best_match = best_match_by_length
# Finale Entscheidung und Logging
if best_match and best_match['score'] >= SCORE_THRESHOLD:
is_weak = best_match['comp'].get('domain', 0) == 0 and not best_match['comp'].get('location', 0)
applied_threshold = SCORE_THRESHOLD_WEAK if is_weak else SCORE_THRESHOLD
if best_match['score'] >= applied_threshold:
results.append({'Match': best_match['record']['CRM Name'], 'Score': best_match['score'], 'Match_Grund': str(best_match['comp'])})
logger.info(f" --> Match: '{best_match['record']['CRM Name']}' ({best_match['score']})")
else:
results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below WEAK TH | {str(best_match['comp'])}"})
logger.info(f" --> No Match (below weak TH): '{best_match['record']['CRM Name']}' ({best_match['score']})")
elif best_match:
results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below TH | {str(best_match['comp'])}"})
logger.info(f" --> No Match (below TH): '{best_match['record']['CRM Name']}' ({best_match['score']})")
else:
results.append({'Match':'', 'Score':0, 'Match_Grund':'No valid candidates'})
logger.info(f" --> No Match (no candidates)")
# --- Ergebnisse zurückschreiben ---
logger.info("Matching-Prozess abgeschlossen. Schreibe Ergebnisse...")
update_status(job_id, "Läuft", "Schreibe Ergebnisse zurück ins Sheet...")
result_df = pd.DataFrame(results)
final_df = pd.concat([match_df.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1)
cols_to_drop = ['normalized_name', 'normalized_domain']
final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore')
upload_df = final_df.astype(str).replace({'nan': '', 'None': ''})
data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist()
ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write)
if ok:
logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.")
update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.")
else:
logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.")
update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.")
if __name__=='__main__':
parser = argparse.ArgumentParser(description="Duplicate Checker v5.0")
parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.")
parser.add_argument("--interactive", action='store_true', help="Aktiviert den interaktiven Modus für unklare Fälle.")
args = parser.parse_args()
Config.load_api_keys()
main(job_id=args.job_id, interactive=args.interactive)