# duplicate_checker.py v3.0 # Build timestamp is injected into logfile name. # --- NEUE FEATURES v3.0 --- # - Golden-Rule: Fast exakte Namens-Matches (>98%) werden immer als Treffer gewertet. # - Weighted Scoring (TF-IDF): Einzigartige Wörter im Firmennamen erhalten mehr Gewicht als häufige Füllwörter. # - Interaktiver Modus: Bei unklaren Fällen kann der Nutzer manuell den besten Kandidaten auswählen. # - Umfassend überarbeitete Scoring-Logik für höhere Präzision. import os import sys import re import argparse import json import logging import pandas as pd import math from datetime import datetime from collections import Counter from thefuzz import fuzz from helpers import normalize_company_name, simple_normalize_url, serp_website_lookup from config import Config from google_sheet_handler import GoogleSheetHandler STATUS_DIR = "job_status" def update_status(job_id, status, progress_message): if not job_id: return status_file = os.path.join(STATUS_DIR, f"{job_id}.json") try: try: with open(status_file, 'r') as f: data = json.load(f) except FileNotFoundError: data = {} data.update({"status": status, "progress": progress_message}) with open(status_file, 'w') as f: json.dump(data, f) except Exception as e: logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}") # --- Konfiguration --- CRM_SHEET_NAME = "CRM_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts" LOG_DIR = "Log" now = datetime.now().strftime('%Y-%m-%d_%H-%M') LOG_FILE = f"{now}_duplicate_check_v3.0.txt" # Scoring-Konfiguration SCORE_THRESHOLD = 100 # Standard-Schwelle für einen Match SCORE_THRESHOLD_WEAK= 130 # Schwelle für Matches ohne Domain oder Ort GOLDEN_MATCH_RATIO = 98 # Ratio, ab der ein Namens-Match als "Golden Match" gilt GOLDEN_MATCH_SCORE = 300 # Score, der bei einem Golden Match vergeben wird # Interaktiver Modus Konfiguration INTERACTIVE_SCORE_MIN = 100 # Mindestscore des besten Kandidaten, um den interaktiven Modus zu triggern INTERACTIVE_SCORE_DIFF = 20 # Maximaler Score-Unterschied zum zweitbesten Kandidaten, um den Modus zu triggern # Prefilter-Konfiguration PREFILTER_MIN_PARTIAL = 70 PREFILTER_LIMIT = 30 # --- Logging Setup --- if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True) log_path = os.path.join(LOG_DIR, LOG_FILE) root = logging.getLogger() root.setLevel(logging.DEBUG) for h in list(root.handlers): root.removeHandler(h) formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s") ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) ch.setFormatter(formatter) root.addHandler(ch) fh = logging.FileHandler(log_path, mode='a', encoding='utf-8') fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) root.addHandler(fh) logger = logging.getLogger(__name__) logger.info(f"Logging to console and file: {log_path}") logger.info(f"Starting duplicate_checker.py v3.0 | Build: {now}") # --- SerpAPI Key laden --- try: Config.load_api_keys() serp_key = Config.API_KEYS.get('serpapi') if not serp_key: logger.warning("SerpAPI Key nicht gefunden; Serp-Fallback deaktiviert.") except Exception as e: logger.warning(f"Fehler beim Laden API-Keys: {e}") serp_key = None # --- Stop-/City-Tokens --- STOP_TOKENS_BASE = { 'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'holding','gruppe','group','international','solutions','solution','service','services', 'deutschland','austria','germany','technik','technology','technologies','systems','systeme', 'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel', 'international','company','gesellschaft','mbh&co','mbhco','werke','werk' } CITY_TOKENS = set() # dynamisch befüllt # --- Utilities --- def _tokenize(s: str): if not s: return [] return re.split(r"[^a-z0-9äöüß]+", str(s).lower()) def clean_name_for_scoring(norm_name: str, dynamic_stopwords: set): """Entfernt Stop- & City-Tokens sowie dynamische Stopwords.""" if not norm_name: return "", set() tokens = [t for t in _tokenize(norm_name) if len(t) >= 3] stop_union = STOP_TOKENS_BASE | CITY_TOKENS | dynamic_stopwords final_tokens = [t for t in tokens if t not in stop_union] return " ".join(final_tokens), set(final_tokens) # --- NEU: TF-IDF Logik (vereinfacht) --- def build_term_weights(crm_df: pd.DataFrame, dynamic_stopwords: set): """Erstellt ein Gewichts-Wörterbuch basierend auf der Seltenheit der Wörter (IDF).""" logger.info("Starte Berechnung der Wortgewichte (TF-IDF)...") token_counts = Counter() total_docs = len(crm_df) for name in crm_df['normalized_name']: _, tokens = clean_name_for_scoring(name, dynamic_stopwords) for token in set(tokens): # Zähle jedes Wort nur einmal pro Firmenname token_counts[token] += 1 term_weights = {} for token, count in token_counts.items(): # IDF-Formel: log(N / df) - je seltener das Wort, desto höher das Gewicht idf = math.log(total_docs / (count + 1)) # +1 zur Glättung term_weights[token] = idf logger.info(f"Wortgewichte für {len(term_weights)} Tokens berechnet.") return term_weights def get_dynamic_stopwords(crm_df: pd.DataFrame, threshold_percent=0.01): """Identifiziert häufige Wörter im CRM-Datensatz, die als Stopwords behandelt werden sollen.""" logger.info("Sammle dynamische Stopwords...") token_counts = Counter() for name in crm_df['normalized_name']: tokens = [t for t in _tokenize(name) if len(t) >= 3 and t not in (STOP_TOKENS_BASE | CITY_TOKENS)] for token in set(tokens): token_counts[token] += 1 limit = len(crm_df) * threshold_percent stopwords = {token for token, count in token_counts.items() if count > limit} logger.info(f"{len(stopwords)} dynamische Stopwords identifiziert (z.B. 'stadtwerke', 'werke', ...)") return stopwords # --- Similarity --- def calculate_similarity(mrec: dict, crec: dict, term_weights: dict, dynamic_stopwords: set): # --- NEU: Golden-Rule für exakten Namens-Match --- n1_raw = mrec.get('normalized_name', '') n2_raw = crec.get('normalized_name', '') if fuzz.ratio(n1_raw, n2_raw) >= GOLDEN_MATCH_RATIO: return GOLDEN_MATCH_SCORE, {'reason': f'Golden Match (Name Ratio >= {GOLDEN_MATCH_RATIO}%)'} # Domain (mit Gate) dom1 = mrec.get('normalized_domain','') dom2 = crec.get('normalized_domain','') domain_match = 1 if (dom1 and dom1 == dom2) else 0 # Location city_match = 1 if (mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort')) else 0 country_match = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land')) else 0 # Name (bereinigt) clean1, toks1 = clean_name_for_scoring(n1_raw, dynamic_stopwords) clean2, toks2 = clean_name_for_scoring(n2_raw, dynamic_stopwords) # --- NEU: Gewichteter Name-Score basierend auf TF-IDF --- name_score = 0 overlapping_tokens = toks1 & toks2 if overlapping_tokens: # Score ist die Summe der Gewichte der übereinstimmenden Wörter name_score = sum(term_weights.get(token, 0) for token in overlapping_tokens) # Bonus für hohe prozentuale Übereinstimmung der seltenen Wörter if toks1: overlap_percentage = len(overlapping_tokens) / len(toks1) name_score *= (1 + overlap_percentage) # --- NEU: Überarbeitete Gesamt-Score-Berechnung --- # Basis-Score-Komponenten score_domain = 100 if domain_match else 0 score_location = 20 if (city_match and country_match) else 0 # Gesamtscore total = name_score * 15 + score_domain + score_location # Name hat jetzt viel mehr Einfluss # Strafen penalties = 0 if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match: penalties += 40 if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match: penalties += 30 total -= penalties comp = { 'name_score': round(name_score,1), 'domain_match': domain_match, 'city_match': city_match, 'country_match': country_match, 'penalties': penalties, 'overlapping_tokens': list(overlapping_tokens) } return max(0, round(total)), comp # --- Indexe --- def build_indexes(crm_df: pd.DataFrame, dynamic_stopwords: set): records = list(crm_df.to_dict('records')) # Domain-Index domain_index = {} for r in records: d = r.get('normalized_domain') if d: domain_index.setdefault(d, []).append(r) # Token-Index token_index = {} for idx, r in enumerate(records): _, toks = clean_name_for_scoring(r.get('normalized_name',''), dynamic_stopwords) for t in set(toks): token_index.setdefault(t, []).append(idx) # Speichere Index statt ganzem Record return records, domain_index, token_index def choose_rarest_token(norm_name: str, term_weights: dict, dynamic_stopwords: set): _, toks = clean_name_for_scoring(norm_name, dynamic_stopwords) if not toks: return None # Seltenstes Token hat höchstes Gewicht (höchsten IDF-Score) rarest = max(toks, key=lambda t: term_weights.get(t, 0)) return rarest if term_weights.get(rarest, 0) > 0 else None # --- Hauptfunktion --- def main(job_id=None, interactive=False): logger.info("Starte Duplikats-Check v3.0 (Weighted Scoring & Interactive Mode)") update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...") try: sheet = GoogleSheetHandler() logger.info("GoogleSheetHandler initialisiert") except Exception as e: logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}") update_status(job_id, "Fehlgeschlagen", f"Init GoogleSheetHandler fehlgeschlagen: {e}") sys.exit(1) # Daten laden update_status(job_id, "Läuft", "Lade CRM- und Matching-Daten...") crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME) match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) total = len(match_df) if match_df is not None else 0 logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze") if crm_df is None or crm_df.empty or match_df is None or match_df.empty: logger.critical("Leere Daten in einem der Sheets. Abbruch.") update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.") return # Normalisierung update_status(job_id, "Läuft", "Normalisiere Daten...") crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name) crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url) crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip() crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip() match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name) match_df['normalized_domain'] = match_df['CRM Website'].astype(str).apply(simple_normalize_url) match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip() match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip() def build_city_tokens(crm_df, match_df): cities = set() for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique(): for t in _tokenize(s): if len(t) >= 3: cities.add(t) return cities global CITY_TOKENS CITY_TOKENS = build_city_tokens(crm_df, match_df) logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}") # --- NEU: TF-IDF und Index-Erstellung --- dynamic_stopwords = get_dynamic_stopwords(crm_df) term_weights = build_term_weights(crm_df, dynamic_stopwords) crm_records, domain_index, token_index = build_indexes(crm_df, dynamic_stopwords) logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}") # --- Matching --- results = [] logger.info("Starte Matching-Prozess…") for idx, mrow in match_df.to_dict('index').items(): processed = idx + 1 progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'" logger.info(progress_message) if processed % 5 == 0 or processed == total: update_status(job_id, "Läuft", progress_message) candidate_indices = set() used_block = '' # Blocking via Domain if mrow.get('normalized_domain'): candidates_from_domain = domain_index.get(mrow['normalized_domain'], []) for c in candidates_from_domain: # Finde den Index des Records, um Duplikate zu vermeiden # Dies ist ineffizient, für eine genaue Index-Logik müsste der domain_index auch indices speichern for i, record in enumerate(crm_records): if record['CRM Name'] == c['CRM Name'] and record['CRM Website'] == c['CRM Website']: candidate_indices.add(i) break if candidate_indices: used_block = f"domain:{mrow['normalized_domain']}" # Blocking via seltenstes Token if not candidate_indices: rtok = choose_rarest_token(mrow.get('normalized_name',''), term_weights, dynamic_stopwords) if rtok: indices_from_token = token_index.get(rtok, []) candidate_indices.update(indices_from_token) used_block = f"token:{rtok}" # Prefilter als Fallback if not candidate_indices: pf = [] n1 = mrow.get('normalized_name','') clean1, _ = clean_name_for_scoring(n1, dynamic_stopwords) if clean1: for i, r in enumerate(crm_records): n2 = r.get('normalized_name','') clean2, _ = clean_name_for_scoring(n2, dynamic_stopwords) if not clean2: continue pr = fuzz.partial_ratio(clean1, clean2) if pr >= PREFILTER_MIN_PARTIAL: pf.append((pr, i)) pf.sort(key=lambda x: x[0], reverse=True) candidate_indices.update([i for _, i in pf[:PREFILTER_LIMIT]]) used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}" candidates = [crm_records[i] for i in candidate_indices] logger.info(f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}' -> {len(candidates)} Kandidaten (Block={used_block})") if not candidates: results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) continue scored = [] for cr in candidates: score, comp = calculate_similarity(mrow, cr, term_weights, dynamic_stopwords) scored.append({'name': cr.get('CRM Name',''), 'score': score, 'comp': comp, 'record': cr}) scored.sort(key=lambda x: x['score'], reverse=True) for cand in scored[:5]: logger.debug(f" Kandidat: {cand['name']} | Score={cand['score']} | Comp={cand['comp']}") best_match = scored[0] if scored else None # --- NEU: Interaktiver Modus --- if interactive and best_match and len(scored) > 1: best_score = best_match['score'] second_best_score = scored[1]['score'] if best_score > INTERACTIVE_SCORE_MIN and (best_score - second_best_score) < INTERACTIVE_SCORE_DIFF: print("\n" + "="*50) print(f"AMBIGUOUS MATCH for '{mrow['CRM Name']}'") print(f"Top candidates have very similar scores.") print(f" - Match: '{mrow['CRM Name']}' | {mrow['normalized_domain']} | {mrow['CRM Ort']}, {mrow['CRM Land']}") print("-"*50) for i, item in enumerate(scored[:5]): cr = item['record'] print(f"[{i+1}] Candidate: '{cr['CRM Name']}' | {cr['normalized_domain']} | {cr['CRM Ort']}, {cr['CRM Land']}") print(f" Score: {item['score']} | Details: {item['comp']}") print("[0] No match") choice = -1 while choice < 0 or choice > len(scored[:5]): try: choice = int(input(f"Please select the best match (1-{len(scored[:5])}) or 0 for no match: ")) except ValueError: choice = -1 if choice > 0: best_match = scored[choice-1] logger.info(f"User selected candidate {choice}: '{best_match['name']}'") elif choice == 0: best_match = None # User decided no match logger.info("User selected no match.") print("="*50 + "\n") if best_match and best_match['score'] >= SCORE_THRESHOLD: # Schwache Matches (ohne Domain/Ort) brauchen höheren Threshold is_weak = best_match['comp'].get('domain_match', 0) == 0 and not (best_match['comp'].get('city_match', 0) and best_match['comp'].get('country_match', 0)) applied_threshold = SCORE_THRESHOLD_WEAK if is_weak else SCORE_THRESHOLD if best_match['score'] >= applied_threshold: results.append({'Match': best_match['name'], 'Score': best_match['score'], 'Match_Grund': str(best_match['comp'])}) logger.info(f" --> Match: '{best_match['name']}' ({best_match['score']}) | TH={applied_threshold}{' (weak)' if is_weak else ''}") else: results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below WEAK threshold | {str(best_match['comp'])}"}) logger.info(f" --> No Match (below weak TH): '{best_match['name']}' ({best_match['score']}) | TH={applied_threshold}") elif best_match: results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below threshold | {str(best_match['comp'])}"}) logger.info(f" --> No Match (below TH): '{best_match['name']}' ({best_match['score']})") else: results.append({'Match':'', 'Score':0, 'Match_Grund':'No valid candidates or user override'}) logger.info(f" --> No Match (no candidates)") # --- Ergebnisse zurückschreiben --- logger.info("Matching-Prozess abgeschlossen. Bereite Ergebnisse für den Upload vor...") update_status(job_id, "Läuft", "Schreibe Ergebnisse zurück ins Sheet...") result_df = pd.DataFrame(results) # Löschen der Ergebnisspalten, falls sie bereits im Sheet existieren cols_to_drop_from_match = ['Match', 'Score', 'Match_Grund'] match_df_clean = match_df.drop(columns=[col for col in cols_to_drop_from_match if col in match_df.columns], errors='ignore') final_df = pd.concat([match_df_clean, result_df], axis=1) cols_to_drop = ['normalized_name', 'normalized_domain'] final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore') upload_df = final_df.astype(str).replace({'nan': '', 'None': ''}) data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist() logger.info(f"Versuche, {len(data_to_write) - 1} Ergebniszeilen in das Sheet '{MATCHING_SHEET_NAME}' zu schreiben...") ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) if ok: logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.") update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.") else: logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.") update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.") if __name__=='__main__': parser = argparse.ArgumentParser(description="Duplicate Checker v3.0") parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.") parser.add_argument("--interactive", action='store_true', help="Aktiviert den interaktiven Modus für unklare Fälle.") args = parser.parse_args() Config.load_api_keys() main(job_id=args.job_id, interactive=args.interactive)