duplicate_checker_old.py aktualisiert

This commit is contained in:
2025-11-06 14:00:44 +00:00
parent a2e4e26fcb
commit e0a491f3c9

View File

@@ -1,21 +1,8 @@
# duplicate_checker.py v4.0
# Build timestamp is injected into logfile name.
# --- FEATURES v4.0 ---
# - NEU: "Kernidentitäts-Bonus": Ein hoher Bonus wird vergeben, wenn das seltenste (wichtigste) Token übereinstimmt.
# Dies fördert das "großzügige Matchen" auf Basis der Kernmarke (z.B. "ANDRITZ AG" vs. "ANDRITZ HYDRO").
# - NEU: Intelligenter "Shortest Name Tie-Breaker": Wird nur noch bei sehr hohen und sehr ähnlichen Scores angewendet.
# - Finale Kalibrierung der Score-Berechnung und Schwellenwerte für optimale Balance.
# - Golden-Rule für exakte Matches und Interaktiver Modus beibehalten.
import os import os
import sys import sys
import re import re
import argparse
import json
import logging import logging
import pandas as pd import pandas as pd
import math
from datetime import datetime from datetime import datetime
from collections import Counter from collections import Counter
from thefuzz import fuzz from thefuzz import fuzz
@@ -23,49 +10,26 @@ from helpers import normalize_company_name, simple_normalize_url, serp_website_l
from config import Config from config import Config
from google_sheet_handler import GoogleSheetHandler from google_sheet_handler import GoogleSheetHandler
STATUS_DIR = "job_status" # duplicate_checker.py v2.15
# Quality-first ++: Domain-Gate, Location-Penalties, Smart Blocking (IDF-light),
def update_status(job_id, status, progress_message): # Serp-Trust, Weak-Threshold, City-Bias-Guard, Prefilter tightened, Metrics
if not job_id: return # Build timestamp is injected into logfile name.
status_file = os.path.join(STATUS_DIR, f"{job_id}.json")
try:
try:
with open(status_file, 'r') as f:
data = json.load(f)
except FileNotFoundError:
data = {}
data.update({"status": status, "progress": progress_message})
with open(status_file, 'w') as f:
json.dump(data, f)
except Exception as e:
logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}")
# --- Konfiguration --- # --- Konfiguration ---
CRM_SHEET_NAME = "CRM_Accounts" CRM_SHEET_NAME = "CRM_Accounts"
MATCHING_SHEET_NAME = "Matching_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts"
LOG_DIR = "Log" SCORE_THRESHOLD = 80 # Standard-Schwelle
SCORE_THRESHOLD_WEAK= 95 # Schwelle, wenn weder Domain noch (City&Country) matchen
MIN_NAME_FOR_DOMAIN = 70 # Domain-Score nur, wenn Name >= 70 ODER Ort+Land matchen
CITY_MISMATCH_PENALTY = 30
COUNTRY_MISMATCH_PENALTY = 40
PREFILTER_MIN_PARTIAL = 70 # (vorher 60)
PREFILTER_LIMIT = 30 # (vorher 50)
LOG_DIR = "Log"
now = datetime.now().strftime('%Y-%m-%d_%H-%M') now = datetime.now().strftime('%Y-%m-%d_%H-%M')
LOG_FILE = f"{now}_duplicate_check_v4.0.txt" LOG_FILE = f"{now}_duplicate_check_v2.15.txt"
# --- Scoring-Konfiguration v4.0 ---
SCORE_THRESHOLD = 100 # Standard-Schwelle
SCORE_THRESHOLD_WEAK= 130 # Schwelle für Matches ohne Domain oder Ort
GOLDEN_MATCH_RATIO = 97
GOLDEN_MATCH_SCORE = 300
CORE_IDENTITY_BONUS = 60 # NEU: Bonus für die Übereinstimmung des wichtigsten Tokens
# Tie-Breaker & Interaktiver Modus Konfiguration
TRIGGER_SCORE_MIN = 150 # NEU: Mindestscore für Tie-Breaker / Interaktiv
TIE_SCORE_DIFF = 20
# Prefilter-Konfiguration
PREFILTER_MIN_PARTIAL = 70
PREFILTER_LIMIT = 30
# --- Logging Setup --- # --- Logging Setup ---
# ... (Keine Änderungen hier)
if not os.path.exists(LOG_DIR): if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR, exist_ok=True) os.makedirs(LOG_DIR, exist_ok=True)
log_path = os.path.join(LOG_DIR, LOG_FILE) log_path = os.path.join(LOG_DIR, LOG_FILE)
@@ -84,10 +48,9 @@ fh.setFormatter(formatter)
root.addHandler(fh) root.addHandler(fh)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.info(f"Logging to console and file: {log_path}") logger.info(f"Logging to console and file: {log_path}")
logger.info(f"Starting duplicate_checker.py v4.0 | Build: {now}") logger.info(f"Starting duplicate_checker.py v2.15 | Build: {now}")
# --- SerpAPI Key laden --- # --- SerpAPI Key laden ---
# ... (Keine Änderungen hier)
try: try:
Config.load_api_keys() Config.load_api_keys()
serp_key = Config.API_KEYS.get('serpapi') serp_key = Config.API_KEYS.get('serpapi')
@@ -99,306 +62,360 @@ except Exception as e:
# --- Stop-/City-Tokens --- # --- Stop-/City-Tokens ---
STOP_TOKENS_BASE = { STOP_TOKENS_BASE = {
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv', 'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl',
'holding','gruppe','group','international','solutions','solution','service','services', 'holding','gruppe','group','international','solutions','solution','service','services',
'deutschland','austria','germany','technik','technology','technologies','systems','systeme', 'deutschland','austria','germany','technik','technology','technologies','systems','systeme',
'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel', 'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel',
'international','company','gesellschaft','mbh&co','mbhco','werke','werk' 'international','company','gesellschaft','mbh&co','mbhco','werke','werk','renkhoff','sonnenschutztechnik'
} }
CITY_TOKENS = set() CITY_TOKENS = set() # dynamisch befüllt nach Datennormalisierung
# --- Utilities --- # --- Utilities ---
def _tokenize(s: str): def _tokenize(s: str):
if not s: return [] if not s:
return re.split(r"[^a-z0-9äöüß]+", str(s).lower()) return []
return re.split(r"[^a-z0-9]+", str(s).lower())
def split_tokens(name: str):
"""Tokens für Indexing/Scoring (Basis-Stop + dynamische City-Tokens)."""
if not name:
return []
tokens = [t for t in _tokenize(name) if len(t) >= 3]
stop_union = STOP_TOKENS_BASE | CITY_TOKENS
return [t for t in tokens if t not in stop_union]
def clean_name_for_scoring(norm_name: str): def clean_name_for_scoring(norm_name: str):
if not norm_name: return "", set() """Entfernt Stop- & City-Tokens. Leerer Output => kein sinnvoller Namevergleich."""
tokens = [t for t in _tokenize(norm_name) if len(t) >= 3] toks = split_tokens(norm_name)
stop_union = STOP_TOKENS_BASE | CITY_TOKENS return " ".join(toks), set(toks)
final_tokens = [t for t in tokens if t not in stop_union]
return " ".join(final_tokens), set(final_tokens)
def build_term_weights(crm_df: pd.DataFrame): def assess_serp_trust(company_name: str, url: str) -> str:
logger.info("Starte Berechnung der Wortgewichte (TF-IDF)...") """Vertrauen 'hoch/mittel/niedrig' anhand Token-Vorkommen in Domain."""
token_counts = Counter() if not url:
total_docs = len(crm_df) return 'n/a'
host = simple_normalize_url(url) or ''
for name in crm_df['normalized_name']: host = host.replace('www.', '')
_, tokens = clean_name_for_scoring(name) name_toks = [t for t in split_tokens(normalize_company_name(company_name)) if len(t) >= 3]
for token in set(tokens): if any(t in host for t in name_toks if len(t) >= 4):
token_counts[token] += 1 return 'hoch'
if any(t in host for t in name_toks if len(t) == 3):
term_weights = {} return 'mittel'
for token, count in token_counts.items(): return 'niedrig'
idf = math.log(total_docs / (count + 1))
term_weights[token] = idf
logger.info(f"Wortgewichte für {len(term_weights)} Tokens berechnet.")
return term_weights
# --- Similarity v4.0 ---
def calculate_similarity(mrec: dict, crec: dict, term_weights: dict):
n1_raw = mrec.get('normalized_name', '')
n2_raw = crec.get('normalized_name', '')
if fuzz.ratio(n1_raw, n2_raw) >= GOLDEN_MATCH_RATIO:
return GOLDEN_MATCH_SCORE, {'reason': f'Golden Match (Ratio >= {GOLDEN_MATCH_RATIO}%)', 'name_score': 100}
# --- Similarity ---
def calculate_similarity(mrec: dict, crec: dict, token_freq: Counter):
# Domain (mit Gate)
dom1 = mrec.get('normalized_domain','') dom1 = mrec.get('normalized_domain','')
dom2 = crec.get('normalized_domain','') dom2 = crec.get('normalized_domain','')
domain_match = 1 if (dom1 and dom1 == dom2) else 0 m_domain_use = mrec.get('domain_use_flag', 0)
domain_flag_raw = 1 if (m_domain_use == 1 and dom1 and dom1 == dom2) else 0
# Location flags
city_match = 1 if (mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort')) else 0 city_match = 1 if (mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort')) else 0
country_match = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land')) else 0 country_match = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land')) else 0
clean1, toks1 = clean_name_for_scoring(n1_raw)
clean2, toks2 = clean_name_for_scoring(n2_raw)
name_score = 0 # Name (nur sinnvolle Tokens)
overlapping_tokens = toks1 & toks2 n1 = mrec.get('normalized_name','')
if overlapping_tokens: n2 = crec.get('normalized_name','')
name_score = sum(term_weights.get(token, 0) for token in overlapping_tokens) clean1, toks1 = clean_name_for_scoring(n1)
if toks1: clean2, toks2 = clean_name_for_scoring(n2)
overlap_percentage = len(overlapping_tokens) / len(toks1)
name_score *= (1 + overlap_percentage)
# --- NEU v4.0: Kernidentitäts-Bonus --- # Overlaps
core_identity_bonus = 0 overlap_clean = toks1 & toks2
rarest_token_mrec = choose_rarest_token(n1_raw, term_weights) # city-only overlap check (wenn nach Clean nichts übrig, aber Roh-Overlap evtl. Städte; wir cappen Score)
if rarest_token_mrec and rarest_token_mrec in toks2: raw_overlap = set(_tokenize(n1)) & set(_tokenize(n2))
core_identity_bonus = CORE_IDENTITY_BONUS city_only_overlap = (not overlap_clean) and any(t in CITY_TOKENS for t in raw_overlap)
# Domain-Gate
score_domain = 0
if domain_match:
if name_score > 2.0 or (city_match and country_match):
score_domain = 70
else:
score_domain = 20
score_location = 25 if (city_match and country_match) else 0 # Name-Score
if clean1 and clean2:
ts = fuzz.token_set_ratio(clean1, clean2)
pr = fuzz.partial_ratio(clean1, clean2)
ss = fuzz.token_sort_ratio(clean1, clean2)
name_score = max(ts, pr, ss)
else:
name_score = 0
# Finale Score-Kalibrierung v4.0 if city_only_overlap and name_score > 70:
total = name_score * 10 + score_domain + score_location + core_identity_bonus name_score = 70 # cap
# Rare-token-overlap (IDF-light): benutze seltensten Token aus mrec
rtoks_sorted = sorted(list(toks1), key=lambda t: (token_freq.get(t, 10**9), -len(t)))
rare_token = rtoks_sorted[0] if rtoks_sorted else None
rare_overlap = 1 if (rare_token and rare_token in toks2) else 0
# Domain Gate
domain_gate_ok = (name_score >= MIN_NAME_FOR_DOMAIN) or (city_match and country_match)
domain_used = 1 if (domain_flag_raw and domain_gate_ok) else 0
# Basisscore
total = domain_used*100 + name_score*1.0 + (1 if (city_match and country_match) else 0)*20
# Penalties
penalties = 0 penalties = 0
if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match: if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match:
penalties += 40 penalties += COUNTRY_MISMATCH_PENALTY
if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match: if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match:
penalties += 30 penalties += CITY_MISMATCH_PENALTY
total -= penalties total -= penalties
comp = {
'name_score': round(name_score,1),
'domain_match': domain_match,
'location_match': int(city_match and country_match),
'core_bonus': core_identity_bonus,
'penalties': penalties,
'overlapping_tokens': list(overlapping_tokens)
}
return max(0, round(total)), comp
# --- Indexe & Hauptfunktion --- # Bonus für starke Name-only Fälle
name_bonus = 1 if (domain_used == 0 and not (city_match and country_match) and name_score >= 85 and rare_overlap==1) else 0
if name_bonus:
total += 20
comp = {
'domain_raw': domain_flag_raw,
'domain_used': domain_used,
'domain_gate_ok': int(domain_gate_ok),
'name': round(name_score,1),
'city_match': city_match,
'country_match': country_match,
'penalties': penalties,
'name_bonus': name_bonus,
'rare_overlap': rare_overlap,
'city_only_overlap': int(city_only_overlap)
}
return round(total), comp
# --- Indexe ---
def build_indexes(crm_df: pd.DataFrame): def build_indexes(crm_df: pd.DataFrame):
records = list(crm_df.to_dict('records')) records = list(crm_df.to_dict('records'))
# Domain-Index
domain_index = {} domain_index = {}
for r in records: for r in records:
d = r.get('normalized_domain') d = r.get('normalized_domain')
if d: domain_index.setdefault(d, []).append(r) if d:
domain_index.setdefault(d, []).append(r)
token_index = {} # Token-Frequenzen (auf gereinigten Tokens)
for idx, r in enumerate(records): token_freq = Counter()
for r in records:
_, toks = clean_name_for_scoring(r.get('normalized_name','')) _, toks = clean_name_for_scoring(r.get('normalized_name',''))
for t in set(toks): for t in set(toks):
token_index.setdefault(t, []).append(idx) token_freq[t] += 1
# Token-Index
return records, domain_index, token_index token_index = {}
for r in records:
_, toks = clean_name_for_scoring(r.get('normalized_name',''))
for t in set(toks):
token_index.setdefault(t, []).append(r)
return records, domain_index, token_freq, token_index
def choose_rarest_token(norm_name: str, term_weights: dict):
def choose_rarest_token(norm_name: str, token_freq: Counter):
_, toks = clean_name_for_scoring(norm_name) _, toks = clean_name_for_scoring(norm_name)
if not toks: return None if not toks:
rarest = max(toks, key=lambda t: term_weights.get(t, 0)) return None
return rarest if term_weights.get(rarest, 0) > 0 else None lst = sorted(list(toks), key=lambda x: (token_freq.get(x, 10**9), -len(x)))
return lst[0] if lst else None
def main(job_id=None, interactive=False): # --- Hauptfunktion ---
logger.info("Starte Duplikats-Check v4.0 (Core Identity Bonus)") def main():
# ... (Code für Initialisierung und Datenladen bleibt identisch) ... logger.info("Starte Duplikats-Check v2.15 (Quality-first++)")
update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...")
try: try:
sheet = GoogleSheetHandler() sheet = GoogleSheetHandler()
logger.info("GoogleSheetHandler initialisiert") logger.info("GoogleSheetHandler initialisiert")
except Exception as e: except Exception as e:
logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}") logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}")
update_status(job_id, "Fehlgeschlagen", f"Init GoogleSheetHandler fehlgeschlagen: {e}")
sys.exit(1) sys.exit(1)
update_status(job_id, "Läuft", "Lade CRM- und Matching-Daten...") # Daten laden
crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME) crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME)
match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
total = len(match_df) if match_df is not None else 0 logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {0 if match_df is None else len(match_df)} Matching-Datensätze")
logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze")
if crm_df is None or crm_df.empty or match_df is None or match_df.empty: if crm_df is None or crm_df.empty or match_df is None or match_df.empty:
logger.critical("Leere Daten in einem der Sheets. Abbruch.") logger.critical("Leere Daten in einem der Sheets. Abbruch.")
update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.")
return return
update_status(job_id, "Läuft", "Normalisiere Daten...") # SerpAPI nur für Matching (B und E leer)
if serp_key:
if 'Gefundene Website' not in match_df.columns:
match_df['Gefundene Website'] = ''
b_empty = match_df['CRM Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na'])
e_empty = match_df['Gefundene Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na'])
empty_mask = b_empty & e_empty
empty_count = int(empty_mask.sum())
if empty_count > 0:
logger.info(f"Serp-Fallback für Matching: {empty_count} Firmen ohne URL in B/E")
found_cnt = 0
trust_stats = Counter()
for idx, row in match_df[empty_mask].iterrows():
company = row['CRM Name']
try:
url = serp_website_lookup(company)
if url and 'k.A.' not in url:
if not str(url).startswith(('http://','https://')):
url = 'https://' + str(url).lstrip()
trust = assess_serp_trust(company, url)
match_df.at[idx, 'Gefundene Website'] = url
match_df.at[idx, 'Serp Vertrauen'] = trust
trust_stats[trust] += 1
logger.info(f" ✓ URL gefunden: '{company}' -> {url} (Vertrauen: {trust})")
found_cnt += 1
else:
logger.debug(f" ✗ Keine eindeutige URL: '{company}' -> {url}")
except Exception as e:
logger.warning(f" ! Serp-Fehler für '{company}': {e}")
logger.info(f"Serp-Fallback beendet: {found_cnt}/{empty_count} URLs ergänzt | Trust: {dict(trust_stats)}")
else:
logger.info("Serp-Fallback übersprungen: B oder E bereits befüllt (keine fehlenden Matching-URLs)")
# Normalisierung CRM
crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name) crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name)
crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url) crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url)
crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip() crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip()
crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip() crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip()
crm_df['block_key'] = crm_df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
crm_df['domain_use_flag'] = 1 # CRM-Domain gilt als vertrauenswürdig
# Normalisierung Matching
match_df['Gefundene Website'] = match_df.get('Gefundene Website', pd.Series(index=match_df.index, dtype=object))
match_df['Serp Vertrauen'] = match_df.get('Serp Vertrauen', pd.Series(index=match_df.index, dtype=object))
match_df['Effektive Website'] = match_df['CRM Website'].fillna('').astype(str).str.strip()
mask_eff = match_df['Effektive Website'] == ''
match_df.loc[mask_eff, 'Effektive Website'] = match_df['Gefundene Website'].fillna('').astype(str).str.strip()
match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name) match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name)
match_df['normalized_domain'] = match_df['CRM Website'].astype(str).apply(simple_normalize_url) match_df['normalized_domain'] = match_df['Effektive Website'].astype(str).apply(simple_normalize_url)
match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip() match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip()
match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip() match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip()
match_df['block_key'] = match_df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
# Domain-Vertrauen/Use-Flag
def _domain_use(row):
if str(row.get('CRM Website','')).strip():
return 1
trust = str(row.get('Serp Vertrauen','')).lower()
return 1 if trust == 'hoch' else 0
match_df['domain_use_flag'] = match_df.apply(_domain_use, axis=1)
# City-Tokens dynamisch bauen (nach Normalisierung von Ort)
def build_city_tokens(crm_df, match_df): def build_city_tokens(crm_df, match_df):
cities = set() cities = set()
for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique(): for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique():
for t in _tokenize(s): for t in _tokenize(s):
if len(t) >= 3: cities.add(t) if len(t) >= 3:
cities.add(t)
return cities return cities
global CITY_TOKENS global CITY_TOKENS
CITY_TOKENS = build_city_tokens(crm_df, match_df) CITY_TOKENS = build_city_tokens(crm_df, match_df)
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}") logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
term_weights = build_term_weights(crm_df) # Blocking-Indizes (nachdem CITY_TOKENS gesetzt wurde)
crm_records, domain_index, token_index = build_indexes(crm_df) crm_records, domain_index, token_freq, token_index = build_indexes(crm_df)
logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}") logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}")
# Matching
results = [] results = []
metrics = Counter()
total = len(match_df)
logger.info("Starte Matching-Prozess…") logger.info("Starte Matching-Prozess…")
processed = 0
for idx, mrow in match_df.to_dict('index').items(): for idx, mrow in match_df.to_dict('index').items():
processed = idx + 1 processed += 1
progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'" name_disp = mrow.get('CRM Name','')
logger.info(progress_message) # Kandidatenwahl
if processed % 5 == 0 or processed == total: candidates = []
update_status(job_id, "Läuft", progress_message)
candidate_indices = set()
used_block = '' used_block = ''
if mrow.get('normalized_domain') and mrow.get('domain_use_flag') == 1:
# ... (Kandidatensuche bleibt gleich) ... candidates = domain_index.get(mrow['normalized_domain'], [])
if mrow.get('normalized_domain'): used_block = f"domain:{mrow['normalized_domain']}"
candidates_from_domain = domain_index.get(mrow['normalized_domain'], []) if not candidates:
for c in candidates_from_domain: rtok = choose_rarest_token(mrow.get('normalized_name',''), token_freq)
try:
indices = crm_df.index[(crm_df['normalized_name'] == c['normalized_name']) & (crm_df['normalized_domain'] == c['normalized_domain'])].tolist()
if indices:
candidate_indices.add(indices[0])
except Exception:
continue
if candidate_indices: used_block = f"domain:{mrow['normalized_domain']}"
if not candidate_indices:
rtok = choose_rarest_token(mrow.get('normalized_name',''), term_weights)
if rtok: if rtok:
indices_from_token = token_index.get(rtok, []) candidates = token_index.get(rtok, [])
candidate_indices.update(indices_from_token)
used_block = f"token:{rtok}" used_block = f"token:{rtok}"
if not candidates:
if not candidate_indices: # Prefilter über gesamte CRM-Liste (strenger + limitierter; erfordert Rarest-Token-Overlap)
pf = [] pf = []
n1 = mrow.get('normalized_name','') n1 = mrow.get('normalized_name','')
clean1, _ = clean_name_for_scoring(n1) rtok = choose_rarest_token(n1, token_freq)
clean1, toks1 = clean_name_for_scoring(n1)
if clean1: if clean1:
for i, r in enumerate(crm_records): for r in crm_records:
n2 = r.get('normalized_name','') n2 = r.get('normalized_name','')
clean2, _ = clean_name_for_scoring(n2) clean2, toks2 = clean_name_for_scoring(n2)
if not clean2: continue if not clean2:
continue
if rtok and rtok not in toks2:
continue
pr = fuzz.partial_ratio(clean1, clean2) pr = fuzz.partial_ratio(clean1, clean2)
if pr >= PREFILTER_MIN_PARTIAL: if pr >= PREFILTER_MIN_PARTIAL:
pf.append((pr, i)) pf.append((pr, r))
pf.sort(key=lambda x: x[0], reverse=True) pf.sort(key=lambda x: x[0], reverse=True)
candidate_indices.update([i for _, i in pf[:PREFILTER_LIMIT]]) candidates = [r for _, r in pf[:PREFILTER_LIMIT]]
used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}" used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}"
candidates = [crm_records[i] for i in candidate_indices] logger.info(f"Prüfe {processed}/{total}: '{name_disp}' -> {len(candidates)} Kandidaten (Block={used_block})")
logger.info(f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}' -> {len(candidates)} Kandidaten (Block={used_block})")
if not candidates: if not candidates:
results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'})
continue continue
scored = [] scored = []
for cr in candidates: for cr in candidates:
score, comp = calculate_similarity(mrow, cr, term_weights) score, comp = calculate_similarity(mrow, cr, token_freq)
scored.append({'name': cr.get('CRM Name',''), 'score': score, 'comp': comp, 'record': cr}) scored.append((cr.get('CRM Name',''), score, comp))
scored.sort(key=lambda x: x['score'], reverse=True) scored.sort(key=lambda x: x[1], reverse=True)
for cand in scored[:5]:
logger.debug(f" Kandidat: {cand['name']} | Score={cand['score']} | Comp={cand['comp']}")
best_match = scored[0] if scored else None # Log Top5
for cand_name, sc, comp in scored[:5]:
logger.debug(f" Kandidat: {cand_name} | Score={sc} | Comp={comp}")
# --- Intelligenter Tie-Breaker v4.0 --- best_name, best_score, best_comp = scored[0]
if best_match and len(scored) > 1:
best_score = best_match['score']
second_best_score = scored[1]['score']
if best_score >= TRIGGER_SCORE_MIN and (best_score - second_best_score) < TIE_SCORE_DIFF and best_score < GOLDEN_MATCH_SCORE:
logger.info(f" Tie-Breaker-Situation erkannt für '{mrow['CRM Name']}'. Scores: {best_score} vs {second_best_score}")
tie_candidates = [c for c in scored if (best_score - c['score']) < TIE_SCORE_DIFF]
best_match_by_length = min(tie_candidates, key=lambda x: len(x['name']))
if best_match_by_length['name'] != best_match['name']:
logger.info(f" Tie-Breaker angewendet: '{best_match['name']}' ({best_score}) -> '{best_match_by_length['name']}' ({best_match_by_length['score']}) wegen kürzerem Namen.")
best_match = best_match_by_length
# Interaktiver Modus # Akzeptanzlogik (Weak-Threshold + Guard)
if interactive and best_match and len(scored) > 1: weak = (best_comp.get('domain_used') == 0 and not (best_comp.get('city_match') and best_comp.get('country_match')))
best_score = best_match['score'] applied_threshold = SCORE_THRESHOLD_WEAK if weak else SCORE_THRESHOLD
second_best_score = scored[1]['score'] weak_guard_fail = (weak and best_comp.get('rare_overlap') == 0)
if best_score > INTERACTIVE_SCORE_MIN and (best_score - second_best_score) < INTERACTIVE_SCORE_DIFF and best_score < GOLDEN_MATCH_SCORE:
# ... (Interaktive Logik bleibt gleich) ...
print("\n" + "="*50)
# ...
if best_match and best_match['score'] >= SCORE_THRESHOLD:
is_weak = best_match['comp'].get('domain_match', 0) == 0 and not (best_match['comp'].get('location_match', 0))
applied_threshold = SCORE_THRESHOLD_WEAK if is_weak else SCORE_THRESHOLD
if best_match['score'] >= applied_threshold: if not weak_guard_fail and best_score >= applied_threshold:
results.append({'Match': best_match['name'], 'Score': best_match['score'], 'Match_Grund': str(best_match['comp'])}) results.append({'Match': best_name, 'Score': best_score, 'Match_Grund': str(best_comp)})
logger.info(f" --> Match: '{best_match['name']}' ({best_match['score']}) | TH={applied_threshold}{' (weak)' if is_weak else ''}") metrics['matches_total'] += 1
else: if best_comp.get('domain_used') == 1:
results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below WEAK threshold | {str(best_match['comp'])}"}) metrics['matches_domain'] += 1
logger.info(f" --> No Match (below weak TH): '{best_match['name']}' ({best_match['score']}) | TH={applied_threshold}") if best_comp.get('city_match') and best_comp.get('country_match'):
elif best_match: metrics['matches_with_loc'] += 1
results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below threshold | {str(best_match['comp'])}"}) if best_comp.get('domain_used') == 0 and best_comp.get('name') >= 85 and not (best_comp.get('city_match') and best_comp.get('country_match')):
logger.info(f" --> No Match (below TH): '{best_match['name']}' ({best_match['score']})") metrics['matches_name_only'] += 1
logger.info(f" --> Match: '{best_name}' ({best_score}) {best_comp} | TH={applied_threshold}{' weak' if weak else ''}")
else: else:
results.append({'Match':'', 'Score':0, 'Match_Grund':'No valid candidates or user override'}) reason = 'weak_guard_no_rare' if weak_guard_fail else 'below_threshold'
logger.info(f" --> No Match (no candidates)") results.append({'Match':'', 'Score': best_score, 'Match_Grund': f"{best_comp} | {reason} TH={applied_threshold}"})
logger.info(f" --> Kein Match (Score={best_score}) {best_comp} | {reason} TH={applied_threshold}")
# --- Ergebnisse zurückschreiben (Logik unverändert) --- # Ergebnisse zurückschreiben (SAFE)
logger.info("Matching-Prozess abgeschlossen. Bereite Ergebnisse für den Upload vor...") logger.info("Schreibe Ergebnisse ins Sheet (SAFE in-place, keine Spaltenverluste)…")
# ... (Rest des Codes bleibt identisch) ... res_df = pd.DataFrame(results, index=match_df.index)
update_status(job_id, "Läuft", "Schreibe Ergebnisse zurück ins Sheet...") write_df = match_df.copy()
result_df = pd.DataFrame(results) write_df['Match'] = res_df['Match']
cols_to_drop_from_match = ['Match', 'Score', 'Match_Grund'] write_df['Score'] = res_df['Score']
match_df_clean = match_df.drop(columns=[col for col in cols_to_drop_from_match if col in match_df.columns], errors='ignore') write_df['Match_Grund'] = res_df['Match_Grund']
final_df = pd.concat([match_df_clean.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1)
cols_to_drop = ['normalized_name', 'normalized_domain'] drop_cols = ['normalized_name','normalized_domain','block_key','Effektive Website','domain_use_flag']
final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore') for c in drop_cols:
upload_df = final_df.astype(str).replace({'nan': '', 'None': ''}) if c in write_df.columns:
data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist() write_df.drop(columns=[c], inplace=True)
logger.info(f"Versuche, {len(data_to_write) - 1} Ergebniszeilen in das Sheet '{MATCHING_SHEET_NAME}' zu schreiben...")
ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) backup_path = os.path.join(LOG_DIR, f"{now}_backup_{MATCHING_SHEET_NAME}.csv")
try:
write_df.to_csv(backup_path, index=False, encoding='utf-8')
logger.info(f"Lokales Backup geschrieben: {backup_path}")
except Exception as e:
logger.warning(f"Backup fehlgeschlagen: {e}")
data = [write_df.columns.tolist()] + write_df.fillna('').values.tolist()
ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data)
if ok: if ok:
logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.") logger.info("Ergebnisse erfolgreich geschrieben")
update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.")
else: else:
logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.") logger.error("Fehler beim Schreiben ins Google Sheet")
update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.")
# Summary
serp_counts = Counter((str(x).lower() for x in write_df.get('Serp Vertrauen', [])))
logger.info("===== Summary =====")
logger.info(f"Matches total: {metrics['matches_total']} | mit Domain: {metrics['matches_domain']} | mit Ort: {metrics['matches_with_loc']} | nur Name: {metrics['matches_name_only']}")
logger.info(f"Serp Vertrauen: {dict(serp_counts)}")
logger.info(f"Config: TH={SCORE_THRESHOLD}, TH_WEAK={SCORE_THRESHOLD_WEAK}, MIN_NAME_FOR_DOMAIN={MIN_NAME_FOR_DOMAIN}, Penalties(city={CITY_MISMATCH_PENALTY},country={COUNTRY_MISMATCH_PENALTY}), Prefilter(partial>={PREFILTER_MIN_PARTIAL}, limit={PREFILTER_LIMIT})")
if __name__=='__main__': if __name__=='__main__':
parser = argparse.ArgumentParser(description="Duplicate Checker v4.0") main()
parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.")
parser.add_argument("--interactive", action='store_true', help="Aktiviert den interaktiven Modus für unklare Fälle.")
args = parser.parse_args()
Config.load_api_keys()
main(job_id=args.job_id, interactive=args.interactive)