# duplicate_checker.py v4.0 # Build timestamp is injected into logfile name. # --- FEATURES v4.0 --- # - NEU: "Kernidentitäts-Bonus": Ein hoher Bonus wird vergeben, wenn das seltenste (wichtigste) Token übereinstimmt. # Dies fördert das "großzügige Matchen" auf Basis der Kernmarke (z.B. "ANDRITZ AG" vs. "ANDRITZ HYDRO"). # - NEU: Intelligenter "Shortest Name Tie-Breaker": Wird nur noch bei sehr hohen und sehr ähnlichen Scores angewendet. # - Finale Kalibrierung der Score-Berechnung und Schwellenwerte für optimale Balance. # - Golden-Rule für exakte Matches und Interaktiver Modus beibehalten. import os import sys import re import argparse import json import logging import pandas as pd import math from datetime import datetime from collections import Counter from thefuzz import fuzz from helpers import normalize_company_name, simple_normalize_url, serp_website_lookup from config import Config from google_sheet_handler import GoogleSheetHandler STATUS_DIR = "job_status" def update_status(job_id, status, progress_message): if not job_id: return status_file = os.path.join(STATUS_DIR, f"{job_id}.json") try: try: with open(status_file, 'r') as f: data = json.load(f) except FileNotFoundError: data = {} data.update({"status": status, "progress": progress_message}) with open(status_file, 'w') as f: json.dump(data, f) except Exception as e: logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}") # --- Konfiguration --- CRM_SHEET_NAME = "CRM_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts" LOG_DIR = "Log" now = datetime.now().strftime('%Y-%m-%d_%H-%M') LOG_FILE = f"{now}_duplicate_check_v4.0.txt" # --- Scoring-Konfiguration v4.0 --- SCORE_THRESHOLD = 100 # Standard-Schwelle SCORE_THRESHOLD_WEAK= 130 # Schwelle für Matches ohne Domain oder Ort GOLDEN_MATCH_RATIO = 97 GOLDEN_MATCH_SCORE = 300 CORE_IDENTITY_BONUS = 60 # NEU: Bonus für die Übereinstimmung des wichtigsten Tokens # Tie-Breaker & Interaktiver Modus Konfiguration TRIGGER_SCORE_MIN = 150 # NEU: Mindestscore für Tie-Breaker / Interaktiv TIE_SCORE_DIFF = 20 # Prefilter-Konfiguration PREFILTER_MIN_PARTIAL = 70 PREFILTER_LIMIT = 30 # --- Logging Setup --- # ... (Keine Änderungen hier) if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True) log_path = os.path.join(LOG_DIR, LOG_FILE) root = logging.getLogger() root.setLevel(logging.DEBUG) for h in list(root.handlers): root.removeHandler(h) formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s") ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) ch.setFormatter(formatter) root.addHandler(ch) fh = logging.FileHandler(log_path, mode='a', encoding='utf-8') fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) root.addHandler(fh) logger = logging.getLogger(__name__) logger.info(f"Logging to console and file: {log_path}") logger.info(f"Starting duplicate_checker.py v4.0 | Build: {now}") # --- SerpAPI Key laden --- # ... (Keine Änderungen hier) try: Config.load_api_keys() serp_key = Config.API_KEYS.get('serpapi') if not serp_key: logger.warning("SerpAPI Key nicht gefunden; Serp-Fallback deaktiviert.") except Exception as e: logger.warning(f"Fehler beim Laden API-Keys: {e}") serp_key = None # --- Stop-/City-Tokens --- STOP_TOKENS_BASE = { 'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv', 'holding','gruppe','group','international','solutions','solution','service','services', 'deutschland','austria','germany','technik','technology','technologies','systems','systeme', 'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel', 'international','company','gesellschaft','mbh&co','mbhco','werke','werk' } CITY_TOKENS = set() # --- Utilities --- def _tokenize(s: str): if not s: return [] return re.split(r"[^a-z0-9äöüß]+", str(s).lower()) def clean_name_for_scoring(norm_name: str): if not norm_name: return "", set() tokens = [t for t in _tokenize(norm_name) if len(t) >= 3] stop_union = STOP_TOKENS_BASE | CITY_TOKENS final_tokens = [t for t in tokens if t not in stop_union] return " ".join(final_tokens), set(final_tokens) def build_term_weights(crm_df: pd.DataFrame): logger.info("Starte Berechnung der Wortgewichte (TF-IDF)...") token_counts = Counter() total_docs = len(crm_df) for name in crm_df['normalized_name']: _, tokens = clean_name_for_scoring(name) for token in set(tokens): token_counts[token] += 1 term_weights = {} for token, count in token_counts.items(): idf = math.log(total_docs / (count + 1)) term_weights[token] = idf logger.info(f"Wortgewichte für {len(term_weights)} Tokens berechnet.") return term_weights # --- Similarity v4.0 --- def calculate_similarity(mrec: dict, crec: dict, term_weights: dict): n1_raw = mrec.get('normalized_name', '') n2_raw = crec.get('normalized_name', '') if fuzz.ratio(n1_raw, n2_raw) >= GOLDEN_MATCH_RATIO: return GOLDEN_MATCH_SCORE, {'reason': f'Golden Match (Ratio >= {GOLDEN_MATCH_RATIO}%)', 'name_score': 100} dom1 = mrec.get('normalized_domain','') dom2 = crec.get('normalized_domain','') domain_match = 1 if (dom1 and dom1 == dom2) else 0 city_match = 1 if (mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort')) else 0 country_match = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land')) else 0 clean1, toks1 = clean_name_for_scoring(n1_raw) clean2, toks2 = clean_name_for_scoring(n2_raw) name_score = 0 overlapping_tokens = toks1 & toks2 if overlapping_tokens: name_score = sum(term_weights.get(token, 0) for token in overlapping_tokens) if toks1: overlap_percentage = len(overlapping_tokens) / len(toks1) name_score *= (1 + overlap_percentage) # --- NEU v4.0: Kernidentitäts-Bonus --- core_identity_bonus = 0 rarest_token_mrec = choose_rarest_token(n1_raw, term_weights) if rarest_token_mrec and rarest_token_mrec in toks2: core_identity_bonus = CORE_IDENTITY_BONUS # Domain-Gate score_domain = 0 if domain_match: if name_score > 2.0 or (city_match and country_match): score_domain = 70 else: score_domain = 20 score_location = 25 if (city_match and country_match) else 0 # Finale Score-Kalibrierung v4.0 total = name_score * 10 + score_domain + score_location + core_identity_bonus penalties = 0 if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match: penalties += 40 if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match: penalties += 30 total -= penalties comp = { 'name_score': round(name_score,1), 'domain_match': domain_match, 'location_match': int(city_match and country_match), 'core_bonus': core_identity_bonus, 'penalties': penalties, 'overlapping_tokens': list(overlapping_tokens) } return max(0, round(total)), comp # --- Indexe & Hauptfunktion --- def build_indexes(crm_df: pd.DataFrame): records = list(crm_df.to_dict('records')) domain_index = {} for r in records: d = r.get('normalized_domain') if d: domain_index.setdefault(d, []).append(r) token_index = {} for idx, r in enumerate(records): _, toks = clean_name_for_scoring(r.get('normalized_name','')) for t in set(toks): token_index.setdefault(t, []).append(idx) return records, domain_index, token_index def choose_rarest_token(norm_name: str, term_weights: dict): _, toks = clean_name_for_scoring(norm_name) if not toks: return None rarest = max(toks, key=lambda t: term_weights.get(t, 0)) return rarest if term_weights.get(rarest, 0) > 0 else None def main(job_id=None, interactive=False): logger.info("Starte Duplikats-Check v4.0 (Core Identity Bonus)") # ... (Code für Initialisierung und Datenladen bleibt identisch) ... update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...") try: sheet = GoogleSheetHandler() logger.info("GoogleSheetHandler initialisiert") except Exception as e: logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}") update_status(job_id, "Fehlgeschlagen", f"Init GoogleSheetHandler fehlgeschlagen: {e}") sys.exit(1) update_status(job_id, "Läuft", "Lade CRM- und Matching-Daten...") crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME) match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) total = len(match_df) if match_df is not None else 0 logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze") if crm_df is None or crm_df.empty or match_df is None or match_df.empty: logger.critical("Leere Daten in einem der Sheets. Abbruch.") update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.") return update_status(job_id, "Läuft", "Normalisiere Daten...") crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name) crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url) crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip() crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip() match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name) match_df['normalized_domain'] = match_df['CRM Website'].astype(str).apply(simple_normalize_url) match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip() match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip() def build_city_tokens(crm_df, match_df): cities = set() for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique(): for t in _tokenize(s): if len(t) >= 3: cities.add(t) return cities global CITY_TOKENS CITY_TOKENS = build_city_tokens(crm_df, match_df) logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}") term_weights = build_term_weights(crm_df) crm_records, domain_index, token_index = build_indexes(crm_df) logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}") results = [] logger.info("Starte Matching-Prozess…") for idx, mrow in match_df.to_dict('index').items(): processed = idx + 1 progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'" logger.info(progress_message) if processed % 5 == 0 or processed == total: update_status(job_id, "Läuft", progress_message) candidate_indices = set() used_block = '' # ... (Kandidatensuche bleibt gleich) ... if mrow.get('normalized_domain'): candidates_from_domain = domain_index.get(mrow['normalized_domain'], []) for c in candidates_from_domain: try: indices = crm_df.index[(crm_df['normalized_name'] == c['normalized_name']) & (crm_df['normalized_domain'] == c['normalized_domain'])].tolist() if indices: candidate_indices.add(indices[0]) except Exception: continue if candidate_indices: used_block = f"domain:{mrow['normalized_domain']}" if not candidate_indices: rtok = choose_rarest_token(mrow.get('normalized_name',''), term_weights) if rtok: indices_from_token = token_index.get(rtok, []) candidate_indices.update(indices_from_token) used_block = f"token:{rtok}" if not candidate_indices: pf = [] n1 = mrow.get('normalized_name','') clean1, _ = clean_name_for_scoring(n1) if clean1: for i, r in enumerate(crm_records): n2 = r.get('normalized_name','') clean2, _ = clean_name_for_scoring(n2) if not clean2: continue pr = fuzz.partial_ratio(clean1, clean2) if pr >= PREFILTER_MIN_PARTIAL: pf.append((pr, i)) pf.sort(key=lambda x: x[0], reverse=True) candidate_indices.update([i for _, i in pf[:PREFILTER_LIMIT]]) used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}" candidates = [crm_records[i] for i in candidate_indices] logger.info(f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}' -> {len(candidates)} Kandidaten (Block={used_block})") if not candidates: results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) continue scored = [] for cr in candidates: score, comp = calculate_similarity(mrow, cr, term_weights) scored.append({'name': cr.get('CRM Name',''), 'score': score, 'comp': comp, 'record': cr}) scored.sort(key=lambda x: x['score'], reverse=True) for cand in scored[:5]: logger.debug(f" Kandidat: {cand['name']} | Score={cand['score']} | Comp={cand['comp']}") best_match = scored[0] if scored else None # --- Intelligenter Tie-Breaker v4.0 --- if best_match and len(scored) > 1: best_score = best_match['score'] second_best_score = scored[1]['score'] if best_score >= TRIGGER_SCORE_MIN and (best_score - second_best_score) < TIE_SCORE_DIFF and best_score < GOLDEN_MATCH_SCORE: logger.info(f" Tie-Breaker-Situation erkannt für '{mrow['CRM Name']}'. Scores: {best_score} vs {second_best_score}") tie_candidates = [c for c in scored if (best_score - c['score']) < TIE_SCORE_DIFF] best_match_by_length = min(tie_candidates, key=lambda x: len(x['name'])) if best_match_by_length['name'] != best_match['name']: logger.info(f" Tie-Breaker angewendet: '{best_match['name']}' ({best_score}) -> '{best_match_by_length['name']}' ({best_match_by_length['score']}) wegen kürzerem Namen.") best_match = best_match_by_length # Interaktiver Modus if interactive and best_match and len(scored) > 1: best_score = best_match['score'] second_best_score = scored[1]['score'] if best_score > INTERACTIVE_SCORE_MIN and (best_score - second_best_score) < INTERACTIVE_SCORE_DIFF and best_score < GOLDEN_MATCH_SCORE: # ... (Interaktive Logik bleibt gleich) ... print("\n" + "="*50) # ... if best_match and best_match['score'] >= SCORE_THRESHOLD: is_weak = best_match['comp'].get('domain_match', 0) == 0 and not (best_match['comp'].get('location_match', 0)) applied_threshold = SCORE_THRESHOLD_WEAK if is_weak else SCORE_THRESHOLD if best_match['score'] >= applied_threshold: results.append({'Match': best_match['name'], 'Score': best_match['score'], 'Match_Grund': str(best_match['comp'])}) logger.info(f" --> Match: '{best_match['name']}' ({best_match['score']}) | TH={applied_threshold}{' (weak)' if is_weak else ''}") else: results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below WEAK threshold | {str(best_match['comp'])}"}) logger.info(f" --> No Match (below weak TH): '{best_match['name']}' ({best_match['score']}) | TH={applied_threshold}") elif best_match: results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below threshold | {str(best_match['comp'])}"}) logger.info(f" --> No Match (below TH): '{best_match['name']}' ({best_match['score']})") else: results.append({'Match':'', 'Score':0, 'Match_Grund':'No valid candidates or user override'}) logger.info(f" --> No Match (no candidates)") # --- Ergebnisse zurückschreiben (Logik unverändert) --- logger.info("Matching-Prozess abgeschlossen. Bereite Ergebnisse für den Upload vor...") # ... (Rest des Codes bleibt identisch) ... update_status(job_id, "Läuft", "Schreibe Ergebnisse zurück ins Sheet...") result_df = pd.DataFrame(results) cols_to_drop_from_match = ['Match', 'Score', 'Match_Grund'] match_df_clean = match_df.drop(columns=[col for col in cols_to_drop_from_match if col in match_df.columns], errors='ignore') final_df = pd.concat([match_df_clean.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1) cols_to_drop = ['normalized_name', 'normalized_domain'] final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore') upload_df = final_df.astype(str).replace({'nan': '', 'None': ''}) data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist() logger.info(f"Versuche, {len(data_to_write) - 1} Ergebniszeilen in das Sheet '{MATCHING_SHEET_NAME}' zu schreiben...") ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) if ok: logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.") update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.") else: logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.") update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.") if __name__=='__main__': parser = argparse.ArgumentParser(description="Duplicate Checker v4.0") parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.") parser.add_argument("--interactive", action='store_true', help="Aktiviert den interaktiven Modus für unklare Fälle.") args = parser.parse_args() Config.load_api_keys() main(job_id=args.job_id, interactive=args.interactive)