From 2a7d54671396b2a68bc81e3b907a2db22c1f2566 Mon Sep 17 00:00:00 2001 From: Floke Date: Fri, 8 Aug 2025 06:28:35 +0000 Subject: [PATCH] feat(duplicate-checker): quality-first Matching (Domain-Gate, Location-Penalties, Smart Blocking) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Domain-Gate: Domain-Score (100) nur, wenn Name >= MIN_NAME_FOR_DOMAIN (default 70) ODER Ort+Land exakt matchen - Location-Penalties: City-Mismatch -30, Country-Mismatch -40 (wenn Felder befüllt) - Smart Blocking: Domain-Index -> seltenster Name-Token (Stopwörter gefiltert) -> Prefilter (partial_ratio >= 60, Top 50) - Name-Score: max(token_set_ratio, partial_ratio, token_sort_ratio) + Name-only Bonus (+20) bei starken Namen - SerpAPI nur für Matching-Accounts: schreibt "Gefundene Website"; Domain wird NUR bei Vertrauen=hoch genutzt - Serp-Trust: hoch/mittel/niedrig (Token-Check gegen Domain) - Transparenz: neue Spalten "Match", "Score", "Match_Grund", "Gefundene Website", "Serp Vertrauen" - Safe Writeback: Originalspalten bleiben erhalten; interne Felder werden vor Write entfernt - Logs: Log/{$timestamp}_duplicate_check_v2.13.txt, Summary-Metriken am Ende - Backup: Log/{$timestamp}_backup_Matching_Accounts.csv BREAKING CHANGES: none --- duplicate_checker.py | 293 +++++++++++++++++++++++++++++++++---------- 1 file changed, 227 insertions(+), 66 deletions(-) diff --git a/duplicate_checker.py b/duplicate_checker.py index 37b8cbe6..a1c0fd14 100644 --- a/duplicate_checker.py +++ b/duplicate_checker.py @@ -1,23 +1,30 @@ import os import sys +import re import logging import pandas as pd from datetime import datetime +from collections import Counter from thefuzz import fuzz from helpers import normalize_company_name, simple_normalize_url, serp_website_lookup from config import Config from google_sheet_handler import GoogleSheetHandler -# duplicate_checker.py v2.12 (Serp-URL als neue Spalte; Matching nutzt sie nur bei Leerwerten) -# Version: 2025-08-08_10-20 +# duplicate_checker.py v2.13 (Quality-first: Domain-Gate, Location-Penalties, Smart Blocking, Serp-Trust, Metrics) +# Version-Build: dynamic timestamp below # --- Konfiguration --- CRM_SHEET_NAME = "CRM_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts" -SCORE_THRESHOLD = 80 # Score-Schwelle +SCORE_THRESHOLD = 80 # Schwellwert fürs Auto-Match +MIN_NAME_FOR_DOMAIN = 70 # Domain-Match gilt nur, wenn Name >= 70 ODER Ort matcht +CITY_MISMATCH_PENALTY = 30 +COUNTRY_MISMATCH_PENALTY = 40 +PREFILTER_MIN_PARTIAL = 60 # Vorfilter über gesamte CRM-Liste bei fehlenden Kandidaten +PREFILTER_LIMIT = 50 # Max. Kandidaten aus Vorfilter LOG_DIR = "Log" now = datetime.now().strftime('%Y-%m-%d_%H-%M') -LOG_FILE = f"{now}_duplicate_check_v2.12.txt" +LOG_FILE = f"{now}_duplicate_check_v2.13.txt" # --- Logging Setup --- if not os.path.exists(LOG_DIR): @@ -38,7 +45,7 @@ fh.setFormatter(formatter) root.addHandler(fh) logger = logging.getLogger(__name__) logger.info(f"Logging to console and file: {log_path}") -logger.info(f"Starting duplicate_checker.py v2.12 | Version: {now}") +logger.info(f"Starting duplicate_checker.py v2.13 | Build: {now}") # --- SerpAPI Key laden --- try: @@ -50,13 +57,47 @@ except Exception as e: logger.warning(f"Fehler beim Laden API-Keys: {e}") serp_key = None +STOP_TOKENS = { + 'gmbh','mbh','ag','kg','ug','ohg','se','co','kg','kgaa','inc','llc','ltd','sarl', + 'holding','gruppe','group','international','solutions','solution','service','services', + 'deutschland','austria','germany','technik','technology','technologies','systems','systeme', + 'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel' +} + +# --- Utilitys --- +def split_tokens(name: str): + if not name: + return [] + return [t for t in str(name).split() if len(t) >= 3 and t not in STOP_TOKENS] + +def assess_serp_trust(company_name: str, url: str) -> str: + """Einfache Vertrauensstufe für recherchierte URL: hoch/mittel/niedrig.""" + if not url: + return 'n/a' + host = simple_normalize_url(url) or '' + host = host.replace('www.', '') + tokens = [t for t in split_tokens(normalize_company_name(company_name)) if len(t) >= 4] + if any(t in host for t in tokens): + return 'hoch' + tokens3 = [t for t in split_tokens(normalize_company_name(company_name)) if len(t) == 3] + if any(t in host for t in tokens3): + return 'mittel' + return 'niedrig' + # --- Ähnlichkeitsberechnung --- -def calculate_similarity(record1, record2): - dom1 = record1.get('normalized_domain','') - dom2 = record2.get('normalized_domain','') - domain_flag = 1 if dom1 and dom1 == dom2 else 0 - loc_flag = 1 if (record1.get('CRM Ort')==record2.get('CRM Ort') and record1.get('CRM Land')==record2.get('CRM Land')) else 0 - n1, n2 = record1.get('normalized_name',''), record2.get('normalized_name','') +def calculate_similarity(mrec: dict, crec: dict): + # Domain-Komponente (mit Gate) + dom1 = mrec.get('normalized_domain','') + dom2 = crec.get('normalized_domain','') + m_domain_use = mrec.get('domain_use_flag', 0) # 1 nur wenn original URL oder Serp-Vertrauen hoch + domain_flag_raw = 1 if (m_domain_use == 1 and dom1 and dom1 == dom2) else 0 + + # Location + city_match = 1 if (mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort')) else 0 + country_match = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land')) else 0 + + # Name + n1, n2 = mrec.get('normalized_name',''), crec.get('normalized_name','') if n1 and n2: ts = fuzz.token_set_ratio(n1,n2) pr = fuzz.partial_ratio(n1,n2) @@ -64,13 +105,76 @@ def calculate_similarity(record1, record2): name_score = max(ts,pr,ss) else: name_score = 0 - bonus_flag = 1 if domain_flag==0 and loc_flag==0 and name_score>=85 else 0 - total = domain_flag*100 + name_score*1.0 + loc_flag*20 + bonus_flag*20 - return round(total), domain_flag, name_score, loc_flag, bonus_flag + + # Domain-Gate: Domain zählt nur, wenn Name >= MIN_NAME_FOR_DOMAIN ODER Ort+Land passt + domain_gate_ok = (name_score >= MIN_NAME_FOR_DOMAIN) or (city_match and country_match) + domain_flag = 1 if (domain_flag_raw and domain_gate_ok) else 0 + + # Basisscore + total = domain_flag*100 + name_score*1.0 + (1 if (city_match and country_match) else 0)*20 + + # Penalties bei Mismatch (nur anwenden, wenn entsprechende Felder befüllt und kein voller Location-Match) + penalties = 0 + if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match: + penalties += COUNTRY_MISMATCH_PENALTY + if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match: + penalties += CITY_MISMATCH_PENALTY + + total -= penalties + + # Bonus für reine Name-Matches (keine Domain, kein Ort) wenn stark + bonus_flag = 1 if (domain_flag == 0 and not (city_match and country_match) and name_score >= 85) else 0 + if bonus_flag: + total += 20 + + return ( + round(total), + { + 'domain_raw': domain_flag_raw, + 'domain_used': domain_flag, + 'domain_gate_ok': int(domain_gate_ok), + 'name': round(name_score,1), + 'city_match': city_match, + 'country_match': country_match, + 'penalties': penalties, + 'name_bonus': bonus_flag + } + ) + +# --- Blocking vorbereiten --- +def build_indexes(crm_df: pd.DataFrame): + records = list(crm_df.to_dict('records')) + # Domain-Index + domain_index = {} + for r in records: + d = r.get('normalized_domain') + if d: + domain_index.setdefault(d, []).append(r) + # Token-Frequenzen + token_freq = Counter() + for r in records: + for t in set(split_tokens(r.get('normalized_name',''))): + token_freq[t] += 1 + # Token-Index (nur sinnvolle Tokens) + token_index = {} + for r in records: + toks = [t for t in set(split_tokens(r.get('normalized_name',''))) if token_freq[t] > 0] + for t in toks: + token_index.setdefault(t, []).append(r) + return records, domain_index, token_freq, token_index + + +def choose_rarest_token(norm_name: str, token_freq: Counter): + toks = [t for t in split_tokens(norm_name) if len(t) >= 4 and token_freq.get(t, 0) > 0] + if not toks: + return None + # Rarest (kleinste Frequenz), zweitkriterium längster Token + toks.sort(key=lambda x: (token_freq.get(x, 0), -len(x))) + return toks[0] # --- Hauptfunktion --- def main(): - logger.info("Starte Duplikats-Check v2.12 (Serp-URL als neue Spalte)") + logger.info("Starte Duplikats-Check v2.13 (Quality-first)") try: sheet = GoogleSheetHandler() logger.info("GoogleSheetHandler initialisiert") @@ -78,52 +182,54 @@ def main(): logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}") sys.exit(1) - logger.info(f"Lade CRM-Daten aus '{CRM_SHEET_NAME}'...") + # Daten laden crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME) - logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze geladen") - logger.info(f"Lade Matching-Daten aus '{MATCHING_SHEET_NAME}'...") match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) - logger.info(f"{0 if match_df is None else len(match_df)} Matching-Datensätze geladen") + logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {0 if match_df is None else len(match_df)} Matching-Datensätze") if crm_df is None or crm_df.empty or match_df is None or match_df.empty: logger.critical("Leere Daten in einem der Sheets. Abbruch.") return - # --- SerpAPI-Fallback für leere Domains (nur MATCHING) --- + # SerpAPI nur für Matching (fehlende URLs) → in 'Gefundene Website' speichern if serp_key: empty_mask = match_df['CRM Website'].fillna('').astype(str).str.strip() == '' empty_count = int(empty_mask.sum()) if empty_count > 0: logger.info(f"Serp-Fallback für Matching: {empty_count} Firmen ohne URL") found_cnt = 0 + trust_stats = Counter() for idx, row in match_df[empty_mask].iterrows(): company = row['CRM Name'] try: url = serp_website_lookup(company) if url and 'k.A.' not in url: - # Schema ergänzen, falls nötig if not str(url).startswith(('http://','https://')): url = 'https://' + str(url).lstrip() + trust = assess_serp_trust(company, url) match_df.at[idx, 'Gefundene Website'] = url - logger.info(f" ✓ URL gefunden: '{company}' -> {url}") + match_df.at[idx, 'Serp Vertrauen'] = trust + trust_stats[trust] += 1 + logger.info(f" ✓ URL gefunden: '{company}' -> {url} (Vertrauen: {trust})") found_cnt += 1 else: logger.debug(f" ✗ Keine eindeutige URL: '{company}' -> {url}") except Exception as e: logger.warning(f" ! Serp-Fehler für '{company}': {e}") - logger.info(f"Serp-Fallback beendet: {found_cnt}/{empty_count} URLs ergänzt") + logger.info(f"Serp-Fallback beendet: {found_cnt}/{empty_count} URLs ergänzt | Trust: {dict(trust_stats)}") else: logger.info("Serp-Fallback übersprungen: keine fehlenden Matching-URLs") - # --- Normalisierung --- - # CRM-Daten normalisieren (nutzt ausschließlich CRM Website) + # Normalisierung CRM crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name) crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url) crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip() crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip() crm_df['block_key'] = crm_df['normalized_name'].apply(lambda x: x.split()[0] if x else None) + crm_df['domain_use_flag'] = 1 # CRM-Domain gilt immer als vertrauenswürdig - # Matching-Daten normalisieren (nutzt effektive Website = CRM Website oder Gefundene Website) + # Normalisierung Matching (Effektive Website: Original oder Gefundene, aber Domain nur nutzen bei Vertrauen=hoch) match_df['Gefundene Website'] = match_df.get('Gefundene Website', pd.Series(index=match_df.index, dtype=object)) + match_df['Serp Vertrauen'] = match_df.get('Serp Vertrauen', pd.Series(index=match_df.index, dtype=object)) match_df['Effektive Website'] = match_df['CRM Website'].fillna('').astype(str).str.strip() mask_eff = match_df['Effektive Website'] == '' match_df.loc[mask_eff, 'Effektive Website'] = match_df['Gefundene Website'].fillna('').astype(str).str.strip() @@ -134,64 +240,112 @@ def main(): match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip() match_df['block_key'] = match_df['normalized_name'].apply(lambda x: x.split()[0] if x else None) + # Domain-Vertrauen/Use-Flag + def _domain_use(row): + if str(row.get('CRM Website','')).strip(): + return 1 + trust = str(row.get('Serp Vertrauen','')).lower() + return 1 if trust == 'hoch' else 0 + match_df['domain_use_flag'] = match_df.apply(_domain_use, axis=1) + # Debug-Sample logger.debug(f"CRM-Sample: {crm_df.iloc[0][['normalized_name','normalized_domain','block_key']].to_dict()}") - logger.debug(f"Matching-Sample: {match_df.iloc[0][['normalized_name','normalized_domain','block_key','Effektive Website','Gefundene Website']].to_dict()}") + logger.debug(f"Matching-Sample: {match_df.iloc[0][['normalized_name','normalized_domain','block_key','Effektive Website','Gefundene Website','Serp Vertrauen','domain_use_flag']].to_dict()}") - # Blocking-Index erstellen - crm_index = {} - for _, row in crm_df.iterrows(): - key = row['block_key'] - if key: - crm_index.setdefault(key,[]).append(row) - logger.info(f"Blocking-Index mit {len(crm_index)} Keys erstellt") + # Blocking-Indizes + crm_records, domain_index, token_freq, token_index = build_indexes(crm_df) + logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}") # Matching - results=[] - total=len(match_df) - logger.info("Starte Matching-Prozess...") - for i,mrow in match_df.iterrows(): - key = mrow['block_key'] - cands = crm_index.get(key,[]) - used_src = 'recherchiert' if (str(mrow.get('CRM Website','')).strip()=='' and str(mrow.get('Gefundene Website','')).strip()!='') else 'original' - logger.info(f"Prüfe {i+1}/{total}: '{mrow['CRM Name']}' -> {len(cands)} Kandidaten (Website-Quelle: {used_src})") - if not cands: - results.append({'Match':'','Score':0}) - continue - scored=[] - for crow in cands: - sc,dm,ns,lm,bf=calculate_similarity(mrow,crow) - scored.append((crow['CRM Name'],sc,dm,ns,lm,bf)) - for name,sc,dm,ns,lm,bf in sorted(scored,key=lambda x:x[1],reverse=True)[:3]: - logger.debug(f" Kandidat: {name}, Score={sc}, Dom={dm}, Name={ns}, Ort={lm}, Bonus={bf}") - best_name,best_score,dm,ns,lm,bf=max(scored,key=lambda x:x[1]) - if best_score>=SCORE_THRESHOLD: - results.append({'Match':best_name,'Score':best_score}) - logger.info(f" --> Match: '{best_name}' ({best_score}) [Dom={dm},Name={ns},Ort={lm},Bonus={bf}]") - else: - results.append({'Match':'','Score':best_score}) - logger.info(f" --> Kein Match (Score={best_score}) [Dom={dm},Name={ns},Ort={lm},Bonus={bf}]") + results = [] + metrics = Counter() + total = len(match_df) + logger.info("Starte Matching-Prozess…") + for i, mrow in match_df.to_dict('records'): + pass - # Ergebnisse zurückschreiben (SAFE: alle Originalspalten + neue, ohne interne Normalisierungsfelder) - logger.info("Schreibe Ergebnisse ins Sheet (SAFE in-place, keine Spaltenverluste)...") + # iterate safely with index + for idx, mrow in match_df.to_dict('index').items(): + name_disp = mrow.get('CRM Name','') + # Kandidatenwahl + candidates = [] + used_block = '' + if mrow.get('normalized_domain') and mrow.get('domain_use_flag') == 1: + candidates = domain_index.get(mrow['normalized_domain'], []) + used_block = f"domain:{mrow['normalized_domain']}" + if not candidates: + rtok = choose_rarest_token(mrow.get('normalized_name',''), token_freq) + if rtok: + candidates = token_index.get(rtok, []) + used_block = f"token:{rtok}" + if not candidates: + # Prefilter über gesamte CRM-Liste + pf = [] + n1 = mrow.get('normalized_name','') + for r in crm_records: + n2 = r.get('normalized_name','') + if not n1 or not n2: + continue + pr = fuzz.partial_ratio(n1, n2) + if pr >= PREFILTER_MIN_PARTIAL: + pf.append((pr, r)) + pf.sort(key=lambda x: x[0], reverse=True) + candidates = [r for _, r in pf[:PREFILTER_LIMIT]] + used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}" + + logger.info(f"Prüfe {idx+1}/{total}: '{name_disp}' -> {len(candidates)} Kandidaten (Block={used_block})") + if not candidates: + results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) + continue + + scored = [] + for cr in candidates: + score, comp = calculate_similarity(mrow, cr) + scored.append((cr.get('CRM Name',''), score, comp)) + scored.sort(key=lambda x: x[1], reverse=True) + + # Log Top5 + for cand_name, sc, comp in scored[:5]: + logger.debug(f" Kandidat: {cand_name} | Score={sc} | Comp={comp}") + + best_name, best_score, best_comp = scored[0] + + # Metriken + if best_score >= SCORE_THRESHOLD: + results.append({'Match': best_name, 'Score': best_score, 'Match_Grund': str(best_comp)}) + metrics['matches_total'] += 1 + if best_comp.get('domain_used') == 1: + metrics['matches_domain'] += 1 + if best_comp.get('city_match') and best_comp.get('country_match'): + metrics['matches_with_loc'] += 1 + if best_comp.get('domain_used') == 0 and best_comp.get('name') >= 85 and not (best_comp.get('city_match') and best_comp.get('country_match')): + metrics['matches_name_only'] += 1 + logger.info(f" --> Match: '{best_name}' ({best_score}) {best_comp}") + else: + results.append({'Match':'', 'Score': best_score, 'Match_Grund': str(best_comp)}) + logger.info(f" --> Kein Match (Score={best_score}) {best_comp}") + + # Ergebnisse zurückschreiben (SAFE: alle Originalspalten + neue, ohne interne Felder) + logger.info("Schreibe Ergebnisse ins Sheet (SAFE in-place, keine Spaltenverluste)…") res_df = pd.DataFrame(results, index=match_df.index) write_df = match_df.copy() - # Ergebnisse anfügen write_df['Match'] = res_df['Match'] write_df['Score'] = res_df['Score'] - # Interne Arbeitsfelder entfernen - drop_cols = ['normalized_name', 'normalized_domain', 'block_key', 'Effektive Website'] + write_df['Match_Grund'] = res_df['Match_Grund'] + + drop_cols = ['normalized_name','normalized_domain','block_key','Effektive Website','domain_use_flag'] for c in drop_cols: if c in write_df.columns: write_df.drop(columns=[c], inplace=True) - # Lokales Backup der originalen Matching-Daten inkl. neuer Spalten + + # Backup backup_path = os.path.join(LOG_DIR, f"{now}_backup_{MATCHING_SHEET_NAME}.csv") try: write_df.to_csv(backup_path, index=False, encoding='utf-8') logger.info(f"Lokales Backup geschrieben: {backup_path}") except Exception as e: logger.warning(f"Backup fehlgeschlagen: {e}") - # Schreiben + data = [write_df.columns.tolist()] + write_df.fillna('').values.tolist() ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data) if ok: @@ -199,5 +353,12 @@ def main(): else: logger.error("Fehler beim Schreiben ins Google Sheet") + # Abschluss-Metriken + serp_counts = Counter((str(x).lower() for x in write_df.get('Serp Vertrauen', []))) + logger.info("===== Summary =====") + logger.info(f"Matches total: {metrics['matches_total']} | mit Domain: {metrics['matches_domain']} | mit Ort: {metrics['matches_with_loc']} | nur Name: {metrics['matches_name_only']}") + logger.info(f"Serp Vertrauen: {dict(serp_counts)}") + logger.info(f"Config: TH={SCORE_THRESHOLD}, MIN_NAME_FOR_DOMAIN={MIN_NAME_FOR_DOMAIN}, Penalties(city={CITY_MISMATCH_PENALTY},country={COUNTRY_MISMATCH_PENALTY}), Prefilter(partial>={PREFILTER_MIN_PARTIAL}, limit={PREFILTER_LIMIT})") + if __name__=='__main__': - main() + main() \ No newline at end of file