diff --git a/duplicate_checker.py b/duplicate_checker.py index 468c6e4c..36a269e3 100644 --- a/duplicate_checker.py +++ b/duplicate_checker.py @@ -1,3 +1,12 @@ +# duplicate_checker.py v3.0 +# Build timestamp is injected into logfile name. + +# --- NEUE FEATURES v3.0 --- +# - Golden-Rule: Fast exakte Namens-Matches (>98%) werden immer als Treffer gewertet. +# - Weighted Scoring (TF-IDF): Einzigartige Wörter im Firmennamen erhalten mehr Gewicht als häufige Füllwörter. +# - Interaktiver Modus: Bei unklaren Fällen kann der Nutzer manuell den besten Kandidaten auswählen. +# - Umfassend überarbeitete Scoring-Logik für höhere Präzision. + import os import sys import re @@ -5,6 +14,7 @@ import argparse import json import logging import pandas as pd +import math from datetime import datetime from collections import Counter from thefuzz import fuzz @@ -18,7 +28,6 @@ def update_status(job_id, status, progress_message): if not job_id: return status_file = os.path.join(STATUS_DIR, f"{job_id}.json") try: - # Lese alte Daten, um Action und Startzeit zu erhalten try: with open(status_file, 'r') as f: data = json.load(f) @@ -32,25 +41,26 @@ def update_status(job_id, status, progress_message): except Exception as e: logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}") - -# duplicate_checker.py v2.15 -# Quality-first ++: Domain-Gate, Location-Penalties, Smart Blocking (IDF-light), -# Serp-Trust, Weak-Threshold, City-Bias-Guard, Prefilter tightened, Metrics -# Build timestamp is injected into logfile name. - # --- Konfiguration --- CRM_SHEET_NAME = "CRM_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts" -SCORE_THRESHOLD = 80 # Standard-Schwelle -SCORE_THRESHOLD_WEAK= 95 # Schwelle, wenn weder Domain noch (City&Country) matchen -MIN_NAME_FOR_DOMAIN = 70 # Domain-Score nur, wenn Name >= 70 ODER Ort+Land matchen -CITY_MISMATCH_PENALTY = 30 -COUNTRY_MISMATCH_PENALTY = 40 -PREFILTER_MIN_PARTIAL = 70 # (vorher 60) -PREFILTER_LIMIT = 30 # (vorher 50) -LOG_DIR = "Log" +LOG_DIR = "Log" now = datetime.now().strftime('%Y-%m-%d_%H-%M') -LOG_FILE = f"{now}_duplicate_check_v2.15.txt" +LOG_FILE = f"{now}_duplicate_check_v3.0.txt" + +# Scoring-Konfiguration +SCORE_THRESHOLD = 100 # Standard-Schwelle für einen Match +SCORE_THRESHOLD_WEAK= 130 # Schwelle für Matches ohne Domain oder Ort +GOLDEN_MATCH_RATIO = 98 # Ratio, ab der ein Namens-Match als "Golden Match" gilt +GOLDEN_MATCH_SCORE = 300 # Score, der bei einem Golden Match vergeben wird + +# Interaktiver Modus Konfiguration +INTERACTIVE_SCORE_MIN = 100 # Mindestscore des besten Kandidaten, um den interaktiven Modus zu triggern +INTERACTIVE_SCORE_DIFF = 20 # Maximaler Score-Unterschied zum zweitbesten Kandidaten, um den Modus zu triggern + +# Prefilter-Konfiguration +PREFILTER_MIN_PARTIAL = 70 +PREFILTER_LIMIT = 30 # --- Logging Setup --- if not os.path.exists(LOG_DIR): @@ -71,7 +81,7 @@ fh.setFormatter(formatter) root.addHandler(fh) logger = logging.getLogger(__name__) logger.info(f"Logging to console and file: {log_path}") -logger.info(f"Starting duplicate_checker.py v2.15 | Build: {now}") +logger.info(f"Starting duplicate_checker.py v3.0 | Build: {now}") # --- SerpAPI Key laden --- try: @@ -89,119 +99,122 @@ STOP_TOKENS_BASE = { 'holding','gruppe','group','international','solutions','solution','service','services', 'deutschland','austria','germany','technik','technology','technologies','systems','systeme', 'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel', - 'international','company','gesellschaft','mbh&co','mbhco','werke','werk','renkhoff','sonnenschutztechnik' + 'international','company','gesellschaft','mbh&co','mbhco','werke','werk' } -CITY_TOKENS = set() # dynamisch befüllt nach Datennormalisierung +CITY_TOKENS = set() # dynamisch befüllt # --- Utilities --- def _tokenize(s: str): - if not s: - return [] - return re.split(r"[^a-z0-9]+", str(s).lower()) + if not s: return [] + return re.split(r"[^a-z0-9äöüß]+", str(s).lower()) -def split_tokens(name: str): - """Tokens für Indexing/Scoring (Basis-Stop + dynamische City-Tokens).""" - if not name: - return [] - tokens = [t for t in _tokenize(name) if len(t) >= 3] - stop_union = STOP_TOKENS_BASE | CITY_TOKENS - return [t for t in tokens if t not in stop_union] +def clean_name_for_scoring(norm_name: str, dynamic_stopwords: set): + """Entfernt Stop- & City-Tokens sowie dynamische Stopwords.""" + if not norm_name: return "", set() + + tokens = [t for t in _tokenize(norm_name) if len(t) >= 3] + stop_union = STOP_TOKENS_BASE | CITY_TOKENS | dynamic_stopwords + + final_tokens = [t for t in tokens if t not in stop_union] + return " ".join(final_tokens), set(final_tokens) -def clean_name_for_scoring(norm_name: str): - """Entfernt Stop- & City-Tokens. Leerer Output => kein sinnvoller Namevergleich.""" - toks = split_tokens(norm_name) - return " ".join(toks), set(toks) +# --- NEU: TF-IDF Logik (vereinfacht) --- +def build_term_weights(crm_df: pd.DataFrame, dynamic_stopwords: set): + """Erstellt ein Gewichts-Wörterbuch basierend auf der Seltenheit der Wörter (IDF).""" + logger.info("Starte Berechnung der Wortgewichte (TF-IDF)...") + token_counts = Counter() + total_docs = len(crm_df) + + for name in crm_df['normalized_name']: + _, tokens = clean_name_for_scoring(name, dynamic_stopwords) + for token in set(tokens): # Zähle jedes Wort nur einmal pro Firmenname + token_counts[token] += 1 + + term_weights = {} + for token, count in token_counts.items(): + # IDF-Formel: log(N / df) - je seltener das Wort, desto höher das Gewicht + idf = math.log(total_docs / (count + 1)) # +1 zur Glättung + term_weights[token] = idf + + logger.info(f"Wortgewichte für {len(term_weights)} Tokens berechnet.") + return term_weights -def assess_serp_trust(company_name: str, url: str) -> str: - """Vertrauen 'hoch/mittel/niedrig' anhand Token-Vorkommen in Domain.""" - if not url: - return 'n/a' - host = simple_normalize_url(url) or '' - host = host.replace('www.', '') - name_toks = [t for t in split_tokens(normalize_company_name(company_name)) if len(t) >= 3] - if any(t in host for t in name_toks if len(t) >= 4): - return 'hoch' - if any(t in host for t in name_toks if len(t) == 3): - return 'mittel' - return 'niedrig' +def get_dynamic_stopwords(crm_df: pd.DataFrame, threshold_percent=0.01): + """Identifiziert häufige Wörter im CRM-Datensatz, die als Stopwords behandelt werden sollen.""" + logger.info("Sammle dynamische Stopwords...") + token_counts = Counter() + for name in crm_df['normalized_name']: + tokens = [t for t in _tokenize(name) if len(t) >= 3 and t not in (STOP_TOKENS_BASE | CITY_TOKENS)] + for token in set(tokens): + token_counts[token] += 1 + + limit = len(crm_df) * threshold_percent + stopwords = {token for token, count in token_counts.items() if count > limit} + logger.info(f"{len(stopwords)} dynamische Stopwords identifiziert (z.B. 'stadtwerke', 'werke', ...)") + return stopwords # --- Similarity --- -def calculate_similarity(mrec: dict, crec: dict, token_freq: Counter): +def calculate_similarity(mrec: dict, crec: dict, term_weights: dict, dynamic_stopwords: set): + + # --- NEU: Golden-Rule für exakten Namens-Match --- + n1_raw = mrec.get('normalized_name', '') + n2_raw = crec.get('normalized_name', '') + if fuzz.ratio(n1_raw, n2_raw) >= GOLDEN_MATCH_RATIO: + return GOLDEN_MATCH_SCORE, {'reason': f'Golden Match (Name Ratio >= {GOLDEN_MATCH_RATIO}%)'} + # Domain (mit Gate) dom1 = mrec.get('normalized_domain','') dom2 = crec.get('normalized_domain','') - m_domain_use = mrec.get('domain_use_flag', 0) - domain_flag_raw = 1 if (m_domain_use == 1 and dom1 and dom1 == dom2) else 0 + domain_match = 1 if (dom1 and dom1 == dom2) else 0 - # Location flags + # Location city_match = 1 if (mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort')) else 0 country_match = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land')) else 0 + + # Name (bereinigt) + clean1, toks1 = clean_name_for_scoring(n1_raw, dynamic_stopwords) + clean2, toks2 = clean_name_for_scoring(n2_raw, dynamic_stopwords) - # Name (nur sinnvolle Tokens) - n1 = mrec.get('normalized_name','') - n2 = crec.get('normalized_name','') - clean1, toks1 = clean_name_for_scoring(n1) - clean2, toks2 = clean_name_for_scoring(n2) + # --- NEU: Gewichteter Name-Score basierend auf TF-IDF --- + name_score = 0 + overlapping_tokens = toks1 & toks2 + if overlapping_tokens: + # Score ist die Summe der Gewichte der übereinstimmenden Wörter + name_score = sum(term_weights.get(token, 0) for token in overlapping_tokens) + # Bonus für hohe prozentuale Übereinstimmung der seltenen Wörter + if toks1: + overlap_percentage = len(overlapping_tokens) / len(toks1) + name_score *= (1 + overlap_percentage) - # Overlaps - overlap_clean = toks1 & toks2 - # city-only overlap check (wenn nach Clean nichts übrig, aber Roh-Overlap evtl. Städte; wir cappen Score) - raw_overlap = set(_tokenize(n1)) & set(_tokenize(n2)) - city_only_overlap = (not overlap_clean) and any(t in CITY_TOKENS for t in raw_overlap) + # --- NEU: Überarbeitete Gesamt-Score-Berechnung --- + # Basis-Score-Komponenten + score_domain = 100 if domain_match else 0 + score_location = 20 if (city_match and country_match) else 0 - # Name-Score - if clean1 and clean2: - ts = fuzz.token_set_ratio(clean1, clean2) - pr = fuzz.partial_ratio(clean1, clean2) - ss = fuzz.token_sort_ratio(clean1, clean2) - name_score = max(ts, pr, ss) - else: - name_score = 0 + # Gesamtscore + total = name_score * 15 + score_domain + score_location # Name hat jetzt viel mehr Einfluss - if city_only_overlap and name_score > 70: - name_score = 70 # cap - - # Rare-token-overlap (IDF-light): benutze seltensten Token aus mrec - rtoks_sorted = sorted(list(toks1), key=lambda t: (token_freq.get(t, 10**9), -len(t))) - rare_token = rtoks_sorted[0] if rtoks_sorted else None - rare_overlap = 1 if (rare_token and rare_token in toks2) else 0 - - # Domain Gate - domain_gate_ok = (name_score >= MIN_NAME_FOR_DOMAIN) or (city_match and country_match) - domain_used = 1 if (domain_flag_raw and domain_gate_ok) else 0 - - # Basisscore - total = domain_used*100 + name_score*1.0 + (1 if (city_match and country_match) else 0)*20 - - # Penalties + # Strafen penalties = 0 if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match: - penalties += COUNTRY_MISMATCH_PENALTY + penalties += 40 if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match: - penalties += CITY_MISMATCH_PENALTY + penalties += 30 total -= penalties - - # Bonus für starke Name-only Fälle - name_bonus = 1 if (domain_used == 0 and not (city_match and country_match) and name_score >= 85 and rare_overlap==1) else 0 - if name_bonus: - total += 20 - + comp = { - 'domain_raw': domain_flag_raw, - 'domain_used': domain_used, - 'domain_gate_ok': int(domain_gate_ok), - 'name': round(name_score,1), + 'name_score': round(name_score,1), + 'domain_match': domain_match, 'city_match': city_match, 'country_match': country_match, 'penalties': penalties, - 'name_bonus': name_bonus, - 'rare_overlap': rare_overlap, - 'city_only_overlap': int(city_only_overlap) + 'overlapping_tokens': list(overlapping_tokens) } - return round(total), comp + + return max(0, round(total)), comp # --- Indexe --- -def build_indexes(crm_df: pd.DataFrame): +def build_indexes(crm_df: pd.DataFrame, dynamic_stopwords: set): records = list(crm_df.to_dict('records')) # Domain-Index domain_index = {} @@ -209,178 +222,126 @@ def build_indexes(crm_df: pd.DataFrame): d = r.get('normalized_domain') if d: domain_index.setdefault(d, []).append(r) - # Token-Frequenzen (auf gereinigten Tokens) - token_freq = Counter() - for r in records: - _, toks = clean_name_for_scoring(r.get('normalized_name','')) - for t in set(toks): - token_freq[t] += 1 + # Token-Index token_index = {} - for r in records: - _, toks = clean_name_for_scoring(r.get('normalized_name','')) + for idx, r in enumerate(records): + _, toks = clean_name_for_scoring(r.get('normalized_name',''), dynamic_stopwords) for t in set(toks): - token_index.setdefault(t, []).append(r) - return records, domain_index, token_freq, token_index + token_index.setdefault(t, []).append(idx) # Speichere Index statt ganzem Record + + return records, domain_index, token_index - -def choose_rarest_token(norm_name: str, token_freq: Counter): - _, toks = clean_name_for_scoring(norm_name) - if not toks: - return None - lst = sorted(list(toks), key=lambda x: (token_freq.get(x, 10**9), -len(x))) - return lst[0] if lst else None +def choose_rarest_token(norm_name: str, term_weights: dict, dynamic_stopwords: set): + _, toks = clean_name_for_scoring(norm_name, dynamic_stopwords) + if not toks: return None + # Seltenstes Token hat höchstes Gewicht (höchsten IDF-Score) + rarest = max(toks, key=lambda t: term_weights.get(t, 0)) + return rarest if term_weights.get(rarest, 0) > 0 else None # --- Hauptfunktion --- -def main(job_id=None): - logger.info("Starte Duplikats-Check v2.15 (Quality-first++)") - # NEU: Status-Update +def main(job_id=None, interactive=False): + logger.info("Starte Duplikats-Check v3.0 (Weighted Scoring & Interactive Mode)") update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...") try: sheet = GoogleSheetHandler() logger.info("GoogleSheetHandler initialisiert") except Exception as e: logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}") - # NEU: Status-Update bei Fehler update_status(job_id, "Fehlgeschlagen", f"Init GoogleSheetHandler fehlgeschlagen: {e}") sys.exit(1) # Daten laden - # NEU: Status-Update update_status(job_id, "Läuft", "Lade CRM- und Matching-Daten...") crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME) match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) - total = len(match_df) if match_df is not None else 0 # NEU: total hier definieren + total = len(match_df) if match_df is not None else 0 logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze") if crm_df is None or crm_df.empty or match_df is None or match_df.empty: logger.critical("Leere Daten in einem der Sheets. Abbruch.") - # NEU: Status-Update bei Fehler update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.") return - # SerpAPI nur für Matching (B und E leer) - if Config.API_KEYS.get('serpapi'): # Sicherer Zugriff auf den Key - if 'Gefundene Website' not in match_df.columns: - match_df['Gefundene Website'] = '' - b_empty = match_df['CRM Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na']) - e_empty = match_df['Gefundene Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na']) - empty_mask = b_empty & e_empty - empty_count = int(empty_mask.sum()) - if empty_count > 0: - # NEU: Status-Update - update_status(job_id, "Läuft", f"Suche Websites für {empty_count} Firmen via SerpAPI...") - logger.info(f"Serp-Fallback für Matching: {empty_count} Firmen ohne URL in B/E") - found_cnt = 0 - trust_stats = Counter() - for idx, row in match_df[empty_mask].iterrows(): - company = row['CRM Name'] - try: - url = serp_website_lookup(company) - if url and 'k.A.' not in url: - if not str(url).startswith(('http://','https://')): - url = 'https://' + str(url).lstrip() - trust = assess_serp_trust(company, url) - match_df.at[idx, 'Gefundene Website'] = url - match_df.at[idx, 'Serp Vertrauen'] = trust - trust_stats[trust] += 1 - logger.info(f" ✓ URL gefunden: '{company}' -> {url} (Vertrauen: {trust})") - found_cnt += 1 - else: - logger.debug(f" ✗ Keine eindeutige URL: '{company}' -> {url}") - except Exception as e: - logger.warning(f" ! Serp-Fehler für '{company}': {e}") - logger.info(f"Serp-Fallback beendet: {found_cnt}/{empty_count} URLs ergänzt | Trust: {dict(trust_stats)}") - else: - logger.info("Serp-Fallback übersprungen: B oder E bereits befüllt (keine fehlenden Matching-URLs)") - - # Normalisierung CRM + # Normalisierung update_status(job_id, "Läuft", "Normalisiere Daten...") crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name) crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url) crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip() crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip() - crm_df['block_key'] = crm_df['normalized_name'].apply(lambda x: x.split()[0] if x else None) - crm_df['domain_use_flag'] = 1 - - # Normalisierung Matching - match_df['Gefundene Website'] = match_df.get('Gefundene Website', pd.Series(index=match_df.index, dtype=object)) - match_df['Serp Vertrauen'] = match_df.get('Serp Vertrauen', pd.Series(index=match_df.index, dtype=object)) - match_df['Effektive Website'] = match_df['CRM Website'].fillna('').astype(str).str.strip() - mask_eff = match_df['Effektive Website'] == '' - match_df.loc[mask_eff, 'Effektive Website'] = match_df['Gefundene Website'].fillna('').astype(str).str.strip() - + match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name) - match_df['normalized_domain'] = match_df['Effektive Website'].astype(str).apply(simple_normalize_url) + match_df['normalized_domain'] = match_df['CRM Website'].astype(str).apply(simple_normalize_url) match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip() match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip() - match_df['block_key'] = match_df['normalized_name'].apply(lambda x: x.split()[0] if x else None) - - def _domain_use(row): - if str(row.get('CRM Website','')).strip(): - return 1 - trust = str(row.get('Serp Vertrauen','')).lower() - return 1 if trust == 'hoch' else 0 - match_df['domain_use_flag'] = match_df.apply(_domain_use, axis=1) - + def build_city_tokens(crm_df, match_df): cities = set() for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique(): for t in _tokenize(s): - if len(t) >= 3: - cities.add(t) + if len(t) >= 3: cities.add(t) return cities global CITY_TOKENS CITY_TOKENS = build_city_tokens(crm_df, match_df) logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}") - # Blocking-Indizes - update_status(job_id, "Läuft", "Erstelle Blocking-Indizes...") - crm_records, domain_index, token_freq, token_index = build_indexes(crm_df) + # --- NEU: TF-IDF und Index-Erstellung --- + dynamic_stopwords = get_dynamic_stopwords(crm_df) + term_weights = build_term_weights(crm_df, dynamic_stopwords) + crm_records, domain_index, token_index = build_indexes(crm_df, dynamic_stopwords) logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}") - # Matching + # --- Matching --- results = [] - metrics = Counter() logger.info("Starte Matching-Prozess…") for idx, mrow in match_df.to_dict('index').items(): processed = idx + 1 progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'" logger.info(progress_message) - # NEU: Status-Update in der Schleife if processed % 5 == 0 or processed == total: update_status(job_id, "Läuft", progress_message) - candidates = [] + candidate_indices = set() used_block = '' - if mrow.get('normalized_domain') and mrow.get('domain_use_flag') == 1: - candidates = domain_index.get(mrow['normalized_domain'], []) - used_block = f"domain:{mrow['normalized_domain']}" - if not candidates: - rtok = choose_rarest_token(mrow.get('normalized_name',''), token_freq) + + # Blocking via Domain + if mrow.get('normalized_domain'): + candidates_from_domain = domain_index.get(mrow['normalized_domain'], []) + for c in candidates_from_domain: + # Finde den Index des Records, um Duplikate zu vermeiden + # Dies ist ineffizient, für eine genaue Index-Logik müsste der domain_index auch indices speichern + for i, record in enumerate(crm_records): + if record['CRM Name'] == c['CRM Name'] and record['CRM Website'] == c['CRM Website']: + candidate_indices.add(i) + break + if candidate_indices: used_block = f"domain:{mrow['normalized_domain']}" + + # Blocking via seltenstes Token + if not candidate_indices: + rtok = choose_rarest_token(mrow.get('normalized_name',''), term_weights, dynamic_stopwords) if rtok: - candidates = token_index.get(rtok, []) + indices_from_token = token_index.get(rtok, []) + candidate_indices.update(indices_from_token) used_block = f"token:{rtok}" - if not candidates: + + # Prefilter als Fallback + if not candidate_indices: pf = [] n1 = mrow.get('normalized_name','') - rtok = choose_rarest_token(n1, token_freq) - clean1, toks1 = clean_name_for_scoring(n1) + clean1, _ = clean_name_for_scoring(n1, dynamic_stopwords) if clean1: - for r in crm_records: + for i, r in enumerate(crm_records): n2 = r.get('normalized_name','') - clean2, toks2 = clean_name_for_scoring(n2) - if not clean2: - continue - if rtok and rtok not in toks2: - continue + clean2, _ = clean_name_for_scoring(n2, dynamic_stopwords) + if not clean2: continue pr = fuzz.partial_ratio(clean1, clean2) if pr >= PREFILTER_MIN_PARTIAL: - pf.append((pr, r)) + pf.append((pr, i)) pf.sort(key=lambda x: x[0], reverse=True) - candidates = [r for _, r in pf[:PREFILTER_LIMIT]] + candidate_indices.update([i for _, i in pf[:PREFILTER_LIMIT]]) used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}" - + + candidates = [crm_records[i] for i in candidate_indices] logger.info(f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}' -> {len(candidates)} Kandidaten (Block={used_block})") if not candidates: results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) @@ -388,31 +349,65 @@ def main(job_id=None): scored = [] for cr in candidates: - score, comp = calculate_similarity(mrow, cr, token_freq) - scored.append((cr.get('CRM Name',''), score, comp)) - scored.sort(key=lambda x: x[1], reverse=True) + score, comp = calculate_similarity(mrow, cr, term_weights, dynamic_stopwords) + scored.append({'name': cr.get('CRM Name',''), 'score': score, 'comp': comp, 'record': cr}) + scored.sort(key=lambda x: x['score'], reverse=True) + + for cand in scored[:5]: + logger.debug(f" Kandidat: {cand['name']} | Score={cand['score']} | Comp={cand['comp']}") - for cand_name, sc, comp in scored[:5]: - logger.debug(f" Kandidat: {cand_name} | Score={sc} | Comp={comp}") + best_match = scored[0] if scored else None + + # --- NEU: Interaktiver Modus --- + if interactive and best_match and len(scored) > 1: + best_score = best_match['score'] + second_best_score = scored[1]['score'] + if best_score > INTERACTIVE_SCORE_MIN and (best_score - second_best_score) < INTERACTIVE_SCORE_DIFF: + print("\n" + "="*50) + print(f"AMBIGUOUS MATCH for '{mrow['CRM Name']}'") + print(f"Top candidates have very similar scores.") + print(f" - Match: '{mrow['CRM Name']}' | {mrow['normalized_domain']} | {mrow['CRM Ort']}, {mrow['CRM Land']}") + print("-"*50) + for i, item in enumerate(scored[:5]): + cr = item['record'] + print(f"[{i+1}] Candidate: '{cr['CRM Name']}' | {cr['normalized_domain']} | {cr['CRM Ort']}, {cr['CRM Land']}") + print(f" Score: {item['score']} | Details: {item['comp']}") + print("[0] No match") + + choice = -1 + while choice < 0 or choice > len(scored[:5]): + try: + choice = int(input(f"Please select the best match (1-{len(scored[:5])}) or 0 for no match: ")) + except ValueError: + choice = -1 + + if choice > 0: + best_match = scored[choice-1] + logger.info(f"User selected candidate {choice}: '{best_match['name']}'") + elif choice == 0: + best_match = None # User decided no match + logger.info("User selected no match.") + print("="*50 + "\n") - best_name, best_score, best_comp = scored[0] - weak = (best_comp.get('domain_used') == 0 and not (best_comp.get('city_match') and best_comp.get('country_match'))) - applied_threshold = SCORE_THRESHOLD_WEAK if weak else SCORE_THRESHOLD - weak_guard_fail = (weak and best_comp.get('rare_overlap') == 0) + if best_match and best_match['score'] >= SCORE_THRESHOLD: + # Schwache Matches (ohne Domain/Ort) brauchen höheren Threshold + is_weak = best_match['comp'].get('domain_match', 0) == 0 and not (best_match['comp'].get('city_match', 0) and best_match['comp'].get('country_match', 0)) + applied_threshold = SCORE_THRESHOLD_WEAK if is_weak else SCORE_THRESHOLD - if not weak_guard_fail and best_score >= applied_threshold: - results.append({'Match': best_name, 'Score': best_score, 'Match_Grund': str(best_comp)}) - metrics['matches_total'] += 1 - if best_comp.get('domain_used') == 1: metrics['matches_domain'] += 1 - if best_comp.get('city_match') and best_comp.get('country_match'): metrics['matches_with_loc'] += 1 - if best_comp.get('domain_used') == 0 and best_comp.get('name') >= 85 and not (best_comp.get('city_match') and best_comp.get('country_match')): - metrics['matches_name_only'] += 1 - logger.info(f" --> Match: '{best_name}' ({best_score}) {best_comp} | TH={applied_threshold}{' weak' if weak else ''}") + if best_match['score'] >= applied_threshold: + results.append({'Match': best_match['name'], 'Score': best_match['score'], 'Match_Grund': str(best_match['comp'])}) + logger.info(f" --> Match: '{best_match['name']}' ({best_match['score']}) | TH={applied_threshold}{' (weak)' if is_weak else ''}") + else: + results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below WEAK threshold | {str(best_match['comp'])}"}) + logger.info(f" --> No Match (below weak TH): '{best_match['name']}' ({best_match['score']}) | TH={applied_threshold}") + elif best_match: + results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below threshold | {str(best_match['comp'])}"}) + logger.info(f" --> No Match (below TH): '{best_match['name']}' ({best_match['score']})") else: - reason = 'weak_guard_no_rare' if weak_guard_fail else 'below_threshold' - results.append({'Match':'', 'Score': best_score, 'Match_Grund': f"{best_comp} | {reason} TH={applied_threshold}"}) - logger.info(f" --> Kein Match (Score={best_score}) {best_comp} | {reason} TH={applied_threshold}") + results.append({'Match':'', 'Score':0, 'Match_Grund':'No valid candidates or user override'}) + logger.info(f" --> No Match (no candidates)") + # --- Ergebnisse zurückschreiben --- logger.info("Matching-Prozess abgeschlossen. Bereite Ergebnisse für den Upload vor...") @@ -420,27 +415,20 @@ def main(job_id=None): result_df = pd.DataFrame(results) - # KORREKTUR: Füge die Ergebnisspalten zum MODIFIZIERTEN match_df hinzu, - # der die neuen URLs aus der SerpAPI-Suche enthält. - # Wir benutzen die Original-Indizes, um sicherzustellen, dass alles passt. - final_df = match_df.join(result_df) + # Löschen der Ergebnisspalten, falls sie bereits im Sheet existieren + cols_to_drop_from_match = ['Match', 'Score', 'Match_Grund'] + match_df_clean = match_df.drop(columns=[col for col in cols_to_drop_from_match if col in match_df.columns], errors='ignore') + + final_df = pd.concat([match_df_clean, result_df], axis=1) - # Bereinige die temporären Spalten für eine saubere Ausgabe - # KORREKTUR: 'block_key' statt 'block_keys' - cols_to_drop = ['normalized_name', 'normalized_domain', 'block_key', 'Effektive Website', 'domain_use_flag'] + cols_to_drop = ['normalized_name', 'normalized_domain'] final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore') - # NEU: Robuster Schreibprozess zur Vermeidung von Typ-Fehlern - # 1. Alle Spalten explizit in String konvertieren, um Inkompatibilitäten mit der API (z.B. numpy-Typen) zu vermeiden. - # 2. NaN/None-Werte mit einem leeren String füllen. upload_df = final_df.astype(str).replace({'nan': '', 'None': ''}) - - # Konvertiere in Liste von Listen für den Upload data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist() logger.info(f"Versuche, {len(data_to_write) - 1} Ergebniszeilen in das Sheet '{MATCHING_SHEET_NAME}' zu schreiben...") - # KORREKTUR: 'sheet' statt 'sheet_handler' verwenden ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) if ok: @@ -450,20 +438,12 @@ def main(job_id=None): logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.") update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.") - # Summary - # KORREKTUR: 'final_df' statt 'write_df' verwenden - serp_counts = Counter((str(x).lower() for x in final_df.get('Serp Vertrauen', []))) - logger.info("===== Summary =====") - logger.info(f"Matches total: {metrics['matches_total']} | mit Domain: {metrics['matches_domain']} | mit Ort: {metrics['matches_with_loc']} | nur Name: {metrics['matches_name_only']}") - logger.info(f"Serp Vertrauen: {dict(serp_counts)}") - logger.info(f"Config: TH={SCORE_THRESHOLD}, TH_WEAK={SCORE_THRESHOLD_WEAK}, MIN_NAME_FOR_DOMAIN={MIN_NAME_FOR_DOMAIN}, Penalties(city={CITY_MISMATCH_PENALTY},country={COUNTRY_MISMATCH_PENALTY}), Prefilter(partial>={PREFILTER_MIN_PARTIAL}, limit={PREFILTER_LIMIT})") - if __name__=='__main__': - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser(description="Duplicate Checker v3.0") parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.") + parser.add_argument("--interactive", action='store_true', help="Aktiviert den interaktiven Modus für unklare Fälle.") args = parser.parse_args() - # Lade API-Keys, bevor die main-Funktion startet Config.load_api_keys() - main(job_id=args.job_id) \ No newline at end of file + main(job_id=args.job_id, interactive=args.interactive) \ No newline at end of file