import os import sys import re import argparse import json import logging import pandas as pd from datetime import datetime from collections import Counter from thefuzz import fuzz from helpers import normalize_company_name, simple_normalize_url, serp_website_lookup from config import Config from google_sheet_handler import GoogleSheetHandler STATUS_DIR = "job_status" def update_status(job_id, status, progress_message): if not job_id: return status_file = os.path.join(STATUS_DIR, f"{job_id}.json") try: # Lese alte Daten, um Action und Startzeit zu erhalten try: with open(status_file, 'r') as f: data = json.load(f) except FileNotFoundError: data = {} data.update({"status": status, "progress": progress_message}) with open(status_file, 'w') as f: json.dump(data, f) except Exception as e: logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}") # duplicate_checker.py v2.15 # Quality-first ++: Domain-Gate, Location-Penalties, Smart Blocking (IDF-light), # Serp-Trust, Weak-Threshold, City-Bias-Guard, Prefilter tightened, Metrics # Build timestamp is injected into logfile name. # --- Konfiguration --- CRM_SHEET_NAME = "CRM_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts" SCORE_THRESHOLD = 80 # Standard-Schwelle SCORE_THRESHOLD_WEAK= 95 # Schwelle, wenn weder Domain noch (City&Country) matchen MIN_NAME_FOR_DOMAIN = 70 # Domain-Score nur, wenn Name >= 70 ODER Ort+Land matchen CITY_MISMATCH_PENALTY = 30 COUNTRY_MISMATCH_PENALTY = 40 PREFILTER_MIN_PARTIAL = 70 # (vorher 60) PREFILTER_LIMIT = 30 # (vorher 50) LOG_DIR = "Log" now = datetime.now().strftime('%Y-%m-%d_%H-%M') LOG_FILE = f"{now}_duplicate_check_v2.15.txt" # --- Logging Setup --- if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True) log_path = os.path.join(LOG_DIR, LOG_FILE) root = logging.getLogger() root.setLevel(logging.DEBUG) for h in list(root.handlers): root.removeHandler(h) formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s") ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.INFO) ch.setFormatter(formatter) root.addHandler(ch) fh = logging.FileHandler(log_path, mode='a', encoding='utf-8') fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) root.addHandler(fh) logger = logging.getLogger(__name__) logger.info(f"Logging to console and file: {log_path}") logger.info(f"Starting duplicate_checker.py v2.15 | Build: {now}") # --- SerpAPI Key laden --- try: Config.load_api_keys() serp_key = Config.API_KEYS.get('serpapi') if not serp_key: logger.warning("SerpAPI Key nicht gefunden; Serp-Fallback deaktiviert.") except Exception as e: logger.warning(f"Fehler beim Laden API-Keys: {e}") serp_key = None # --- Stop-/City-Tokens --- STOP_TOKENS_BASE = { 'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'holding','gruppe','group','international','solutions','solution','service','services', 'deutschland','austria','germany','technik','technology','technologies','systems','systeme', 'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel', 'international','company','gesellschaft','mbh&co','mbhco','werke','werk','renkhoff','sonnenschutztechnik' } CITY_TOKENS = set() # dynamisch befüllt nach Datennormalisierung # --- Utilities --- def _tokenize(s: str): if not s: return [] return re.split(r"[^a-z0-9]+", str(s).lower()) def split_tokens(name: str): """Tokens für Indexing/Scoring (Basis-Stop + dynamische City-Tokens).""" if not name: return [] tokens = [t for t in _tokenize(name) if len(t) >= 3] stop_union = STOP_TOKENS_BASE | CITY_TOKENS return [t for t in tokens if t not in stop_union] def clean_name_for_scoring(norm_name: str): """Entfernt Stop- & City-Tokens. Leerer Output => kein sinnvoller Namevergleich.""" toks = split_tokens(norm_name) return " ".join(toks), set(toks) def assess_serp_trust(company_name: str, url: str) -> str: """Vertrauen 'hoch/mittel/niedrig' anhand Token-Vorkommen in Domain.""" if not url: return 'n/a' host = simple_normalize_url(url) or '' host = host.replace('www.', '') name_toks = [t for t in split_tokens(normalize_company_name(company_name)) if len(t) >= 3] if any(t in host for t in name_toks if len(t) >= 4): return 'hoch' if any(t in host for t in name_toks if len(t) == 3): return 'mittel' return 'niedrig' # --- Similarity --- def calculate_similarity(mrec: dict, crec: dict, token_freq: Counter): # Domain (mit Gate) dom1 = mrec.get('normalized_domain','') dom2 = crec.get('normalized_domain','') m_domain_use = mrec.get('domain_use_flag', 0) domain_flag_raw = 1 if (m_domain_use == 1 and dom1 and dom1 == dom2) else 0 # Location flags city_match = 1 if (mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort')) else 0 country_match = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land')) else 0 # Name (nur sinnvolle Tokens) n1 = mrec.get('normalized_name','') n2 = crec.get('normalized_name','') clean1, toks1 = clean_name_for_scoring(n1) clean2, toks2 = clean_name_for_scoring(n2) # Overlaps overlap_clean = toks1 & toks2 # city-only overlap check (wenn nach Clean nichts übrig, aber Roh-Overlap evtl. Städte; wir cappen Score) raw_overlap = set(_tokenize(n1)) & set(_tokenize(n2)) city_only_overlap = (not overlap_clean) and any(t in CITY_TOKENS for t in raw_overlap) # Name-Score if clean1 and clean2: ts = fuzz.token_set_ratio(clean1, clean2) pr = fuzz.partial_ratio(clean1, clean2) ss = fuzz.token_sort_ratio(clean1, clean2) name_score = max(ts, pr, ss) else: name_score = 0 if city_only_overlap and name_score > 70: name_score = 70 # cap # Rare-token-overlap (IDF-light): benutze seltensten Token aus mrec rtoks_sorted = sorted(list(toks1), key=lambda t: (token_freq.get(t, 10**9), -len(t))) rare_token = rtoks_sorted[0] if rtoks_sorted else None rare_overlap = 1 if (rare_token and rare_token in toks2) else 0 # Domain Gate domain_gate_ok = (name_score >= MIN_NAME_FOR_DOMAIN) or (city_match and country_match) domain_used = 1 if (domain_flag_raw and domain_gate_ok) else 0 # Basisscore total = domain_used*100 + name_score*1.0 + (1 if (city_match and country_match) else 0)*20 # Penalties penalties = 0 if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match: penalties += COUNTRY_MISMATCH_PENALTY if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match: penalties += CITY_MISMATCH_PENALTY total -= penalties # Bonus für starke Name-only Fälle name_bonus = 1 if (domain_used == 0 and not (city_match and country_match) and name_score >= 85 and rare_overlap==1) else 0 if name_bonus: total += 20 comp = { 'domain_raw': domain_flag_raw, 'domain_used': domain_used, 'domain_gate_ok': int(domain_gate_ok), 'name': round(name_score,1), 'city_match': city_match, 'country_match': country_match, 'penalties': penalties, 'name_bonus': name_bonus, 'rare_overlap': rare_overlap, 'city_only_overlap': int(city_only_overlap) } return round(total), comp # --- Indexe --- def build_indexes(crm_df: pd.DataFrame): records = list(crm_df.to_dict('records')) # Domain-Index domain_index = {} for r in records: d = r.get('normalized_domain') if d: domain_index.setdefault(d, []).append(r) # Token-Frequenzen (auf gereinigten Tokens) token_freq = Counter() for r in records: _, toks = clean_name_for_scoring(r.get('normalized_name','')) for t in set(toks): token_freq[t] += 1 # Token-Index token_index = {} for r in records: _, toks = clean_name_for_scoring(r.get('normalized_name','')) for t in set(toks): token_index.setdefault(t, []).append(r) return records, domain_index, token_freq, token_index def choose_rarest_token(norm_name: str, token_freq: Counter): _, toks = clean_name_for_scoring(norm_name) if not toks: return None lst = sorted(list(toks), key=lambda x: (token_freq.get(x, 10**9), -len(x))) return lst[0] if lst else None # --- Hauptfunktion --- def main(job_id=None): logger.info("Starte Duplikats-Check v2.15 (Quality-first++)") # NEU: Status-Update update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...") try: sheet = GoogleSheetHandler() logger.info("GoogleSheetHandler initialisiert") except Exception as e: logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}") # NEU: Status-Update bei Fehler update_status(job_id, "Fehlgeschlagen", f"Init GoogleSheetHandler fehlgeschlagen: {e}") sys.exit(1) # Daten laden # NEU: Status-Update update_status(job_id, "Läuft", "Lade CRM- und Matching-Daten...") crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME) match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) total = len(match_df) if match_df is not None else 0 # NEU: total hier definieren logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze") if crm_df is None or crm_df.empty or match_df is None or match_df.empty: logger.critical("Leere Daten in einem der Sheets. Abbruch.") # NEU: Status-Update bei Fehler update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.") return # SerpAPI nur für Matching (B und E leer) if Config.API_KEYS.get('serpapi'): # Sicherer Zugriff auf den Key if 'Gefundene Website' not in match_df.columns: match_df['Gefundene Website'] = '' b_empty = match_df['CRM Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na']) e_empty = match_df['Gefundene Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na']) empty_mask = b_empty & e_empty empty_count = int(empty_mask.sum()) if empty_count > 0: # NEU: Status-Update update_status(job_id, "Läuft", f"Suche Websites für {empty_count} Firmen via SerpAPI...") logger.info(f"Serp-Fallback für Matching: {empty_count} Firmen ohne URL in B/E") found_cnt = 0 trust_stats = Counter() for idx, row in match_df[empty_mask].iterrows(): company = row['CRM Name'] try: url = serp_website_lookup(company) if url and 'k.A.' not in url: if not str(url).startswith(('http://','https://')): url = 'https://' + str(url).lstrip() trust = assess_serp_trust(company, url) match_df.at[idx, 'Gefundene Website'] = url match_df.at[idx, 'Serp Vertrauen'] = trust trust_stats[trust] += 1 logger.info(f" ✓ URL gefunden: '{company}' -> {url} (Vertrauen: {trust})") found_cnt += 1 else: logger.debug(f" ✗ Keine eindeutige URL: '{company}' -> {url}") except Exception as e: logger.warning(f" ! Serp-Fehler für '{company}': {e}") logger.info(f"Serp-Fallback beendet: {found_cnt}/{empty_count} URLs ergänzt | Trust: {dict(trust_stats)}") else: logger.info("Serp-Fallback übersprungen: B oder E bereits befüllt (keine fehlenden Matching-URLs)") # Normalisierung CRM update_status(job_id, "Läuft", "Normalisiere Daten...") crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name) crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url) crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip() crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip() crm_df['block_key'] = crm_df['normalized_name'].apply(lambda x: x.split()[0] if x else None) crm_df['domain_use_flag'] = 1 # Normalisierung Matching match_df['Gefundene Website'] = match_df.get('Gefundene Website', pd.Series(index=match_df.index, dtype=object)) match_df['Serp Vertrauen'] = match_df.get('Serp Vertrauen', pd.Series(index=match_df.index, dtype=object)) match_df['Effektive Website'] = match_df['CRM Website'].fillna('').astype(str).str.strip() mask_eff = match_df['Effektive Website'] == '' match_df.loc[mask_eff, 'Effektive Website'] = match_df['Gefundene Website'].fillna('').astype(str).str.strip() match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name) match_df['normalized_domain'] = match_df['Effektive Website'].astype(str).apply(simple_normalize_url) match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip() match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip() match_df['block_key'] = match_df['normalized_name'].apply(lambda x: x.split()[0] if x else None) def _domain_use(row): if str(row.get('CRM Website','')).strip(): return 1 trust = str(row.get('Serp Vertrauen','')).lower() return 1 if trust == 'hoch' else 0 match_df['domain_use_flag'] = match_df.apply(_domain_use, axis=1) def build_city_tokens(crm_df, match_df): cities = set() for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique(): for t in _tokenize(s): if len(t) >= 3: cities.add(t) return cities global CITY_TOKENS CITY_TOKENS = build_city_tokens(crm_df, match_df) logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}") # Blocking-Indizes update_status(job_id, "Läuft", "Erstelle Blocking-Indizes...") crm_records, domain_index, token_freq, token_index = build_indexes(crm_df) logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}") # Matching results = [] metrics = Counter() logger.info("Starte Matching-Prozess…") for idx, mrow in match_df.to_dict('index').items(): processed = idx + 1 progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'" logger.info(progress_message) # NEU: Status-Update in der Schleife if processed % 5 == 0 or processed == total: update_status(job_id, "Läuft", progress_message) candidates = [] used_block = '' if mrow.get('normalized_domain') and mrow.get('domain_use_flag') == 1: candidates = domain_index.get(mrow['normalized_domain'], []) used_block = f"domain:{mrow['normalized_domain']}" if not candidates: rtok = choose_rarest_token(mrow.get('normalized_name',''), token_freq) if rtok: candidates = token_index.get(rtok, []) used_block = f"token:{rtok}" if not candidates: pf = [] n1 = mrow.get('normalized_name','') rtok = choose_rarest_token(n1, token_freq) clean1, toks1 = clean_name_for_scoring(n1) if clean1: for r in crm_records: n2 = r.get('normalized_name','') clean2, toks2 = clean_name_for_scoring(n2) if not clean2: continue if rtok and rtok not in toks2: continue pr = fuzz.partial_ratio(clean1, clean2) if pr >= PREFILTER_MIN_PARTIAL: pf.append((pr, r)) pf.sort(key=lambda x: x[0], reverse=True) candidates = [r for _, r in pf[:PREFILTER_LIMIT]] used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}" logger.info(f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}' -> {len(candidates)} Kandidaten (Block={used_block})") if not candidates: results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) continue scored = [] for cr in candidates: score, comp = calculate_similarity(mrow, cr, token_freq) scored.append((cr.get('CRM Name',''), score, comp)) scored.sort(key=lambda x: x[1], reverse=True) for cand_name, sc, comp in scored[:5]: logger.debug(f" Kandidat: {cand_name} | Score={sc} | Comp={comp}") best_name, best_score, best_comp = scored[0] weak = (best_comp.get('domain_used') == 0 and not (best_comp.get('city_match') and best_comp.get('country_match'))) applied_threshold = SCORE_THRESHOLD_WEAK if weak else SCORE_THRESHOLD weak_guard_fail = (weak and best_comp.get('rare_overlap') == 0) if not weak_guard_fail and best_score >= applied_threshold: results.append({'Match': best_name, 'Score': best_score, 'Match_Grund': str(best_comp)}) metrics['matches_total'] += 1 if best_comp.get('domain_used') == 1: metrics['matches_domain'] += 1 if best_comp.get('city_match') and best_comp.get('country_match'): metrics['matches_with_loc'] += 1 if best_comp.get('domain_used') == 0 and best_comp.get('name') >= 85 and not (best_comp.get('city_match') and best_comp.get('country_match')): metrics['matches_name_only'] += 1 logger.info(f" --> Match: '{best_name}' ({best_score}) {best_comp} | TH={applied_threshold}{' weak' if weak else ''}") else: reason = 'weak_guard_no_rare' if weak_guard_fail else 'below_threshold' results.append({'Match':'', 'Score': best_score, 'Match_Grund': f"{best_comp} | {reason} TH={applied_threshold}"}) logger.info(f" --> Kein Match (Score={best_score}) {best_comp} | {reason} TH={applied_threshold}") # --- Ergebnisse zurückschreiben --- logger.info("Matching-Prozess abgeschlossen. Bereite Ergebnisse für den Upload vor...") update_status(job_id, "Läuft", "Schreibe Ergebnisse zurück ins Sheet...") result_df = pd.DataFrame(results) # KORREKTUR: Füge die Ergebnisspalten zum MODIFIZIERTEN match_df hinzu, # der die neuen URLs aus der SerpAPI-Suche enthält. # Wir benutzen die Original-Indizes, um sicherzustellen, dass alles passt. final_df = match_df.join(result_df) # Bereinige die temporären Spalten für eine saubere Ausgabe # KORREKTUR: 'block_key' statt 'block_keys' cols_to_drop = ['normalized_name', 'normalized_domain', 'block_key', 'Effektive Website', 'domain_use_flag'] final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore') # NEU: Robuster Schreibprozess zur Vermeidung von Typ-Fehlern # 1. Alle Spalten explizit in String konvertieren, um Inkompatibilitäten mit der API (z.B. numpy-Typen) zu vermeiden. # 2. NaN/None-Werte mit einem leeren String füllen. upload_df = final_df.astype(str).replace({'nan': '', 'None': ''}) # Konvertiere in Liste von Listen für den Upload data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist() logger.info(f"Versuche, {len(data_to_write) - 1} Ergebniszeilen in das Sheet '{MATCHING_SHEET_NAME}' zu schreiben...") # KORREKTUR: 'sheet' statt 'sheet_handler' verwenden ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) if ok: logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.") update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.") else: logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.") update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.") # Summary # KORREKTUR: 'final_df' statt 'write_df' verwenden serp_counts = Counter((str(x).lower() for x in final_df.get('Serp Vertrauen', []))) logger.info("===== Summary =====") logger.info(f"Matches total: {metrics['matches_total']} | mit Domain: {metrics['matches_domain']} | mit Ort: {metrics['matches_with_loc']} | nur Name: {metrics['matches_name_only']}") logger.info(f"Serp Vertrauen: {dict(serp_counts)}") logger.info(f"Config: TH={SCORE_THRESHOLD}, TH_WEAK={SCORE_THRESHOLD_WEAK}, MIN_NAME_FOR_DOMAIN={MIN_NAME_FOR_DOMAIN}, Penalties(city={CITY_MISMATCH_PENALTY},country={COUNTRY_MISMATCH_PENALTY}), Prefilter(partial>={PREFILTER_MIN_PARTIAL}, limit={PREFILTER_LIMIT})") if __name__=='__main__': parser = argparse.ArgumentParser() parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.") args = parser.parse_args() # Lade API-Keys, bevor die main-Funktion startet Config.load_api_keys() main(job_id=args.job_id)