Files
Brancheneinstufung2/duplicate_checker.py

451 lines
20 KiB
Python

import os
import sys
import re
import argparse
import json
import logging
import pandas as pd
from datetime import datetime
from collections import Counter
from thefuzz import fuzz
from helpers import normalize_company_name, simple_normalize_url, serp_website_lookup
from config import Config
from google_sheet_handler import GoogleSheetHandler
STATUS_DIR = "job_status"
def update_status(job_id, status, progress_message):
if not job_id: return
status_file = os.path.join(STATUS_DIR, f"{job_id}.json")
try:
# Lese alte Daten, um Action und Startzeit zu erhalten
try:
with open(status_file, 'r') as f:
data = json.load(f)
except FileNotFoundError:
data = {}
data.update({"status": status, "progress": progress_message})
with open(status_file, 'w') as f:
json.dump(data, f)
except Exception as e:
logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}")
# duplicate_checker.py v2.15
# Quality-first ++: Domain-Gate, Location-Penalties, Smart Blocking (IDF-light),
# Serp-Trust, Weak-Threshold, City-Bias-Guard, Prefilter tightened, Metrics
# Build timestamp is injected into logfile name.
# --- Konfiguration ---
CRM_SHEET_NAME = "CRM_Accounts"
MATCHING_SHEET_NAME = "Matching_Accounts"
SCORE_THRESHOLD = 80 # Standard-Schwelle
SCORE_THRESHOLD_WEAK= 95 # Schwelle, wenn weder Domain noch (City&Country) matchen
MIN_NAME_FOR_DOMAIN = 70 # Domain-Score nur, wenn Name >= 70 ODER Ort+Land matchen
CITY_MISMATCH_PENALTY = 30
COUNTRY_MISMATCH_PENALTY = 40
PREFILTER_MIN_PARTIAL = 70 # (vorher 60)
PREFILTER_LIMIT = 30 # (vorher 50)
LOG_DIR = "Log"
now = datetime.now().strftime('%Y-%m-%d_%H-%M')
LOG_FILE = f"{now}_duplicate_check_v2.15.txt"
# --- Logging Setup ---
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR, exist_ok=True)
log_path = os.path.join(LOG_DIR, LOG_FILE)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
for h in list(root.handlers):
root.removeHandler(h)
formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s")
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
root.addHandler(ch)
fh = logging.FileHandler(log_path, mode='a', encoding='utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
root.addHandler(fh)
logger = logging.getLogger(__name__)
logger.info(f"Logging to console and file: {log_path}")
logger.info(f"Starting duplicate_checker.py v2.15 | Build: {now}")
# --- SerpAPI Key laden ---
try:
Config.load_api_keys()
serp_key = Config.API_KEYS.get('serpapi')
if not serp_key:
logger.warning("SerpAPI Key nicht gefunden; Serp-Fallback deaktiviert.")
except Exception as e:
logger.warning(f"Fehler beim Laden API-Keys: {e}")
serp_key = None
# --- Stop-/City-Tokens ---
STOP_TOKENS_BASE = {
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl',
'holding','gruppe','group','international','solutions','solution','service','services',
'deutschland','austria','germany','technik','technology','technologies','systems','systeme',
'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel',
'international','company','gesellschaft','mbh&co','mbhco','werke','werk','renkhoff','sonnenschutztechnik'
}
CITY_TOKENS = set() # dynamisch befüllt nach Datennormalisierung
# --- Utilities ---
def _tokenize(s: str):
if not s:
return []
return re.split(r"[^a-z0-9]+", str(s).lower())
def split_tokens(name: str):
"""Tokens für Indexing/Scoring (Basis-Stop + dynamische City-Tokens)."""
if not name:
return []
tokens = [t for t in _tokenize(name) if len(t) >= 3]
stop_union = STOP_TOKENS_BASE | CITY_TOKENS
return [t for t in tokens if t not in stop_union]
def clean_name_for_scoring(norm_name: str):
"""Entfernt Stop- & City-Tokens. Leerer Output => kein sinnvoller Namevergleich."""
toks = split_tokens(norm_name)
return " ".join(toks), set(toks)
def assess_serp_trust(company_name: str, url: str) -> str:
"""Vertrauen 'hoch/mittel/niedrig' anhand Token-Vorkommen in Domain."""
if not url:
return 'n/a'
host = simple_normalize_url(url) or ''
host = host.replace('www.', '')
name_toks = [t for t in split_tokens(normalize_company_name(company_name)) if len(t) >= 3]
if any(t in host for t in name_toks if len(t) >= 4):
return 'hoch'
if any(t in host for t in name_toks if len(t) == 3):
return 'mittel'
return 'niedrig'
# --- Similarity ---
def calculate_similarity(mrec: dict, crec: dict, token_freq: Counter):
# Domain (mit Gate)
dom1 = mrec.get('normalized_domain','')
dom2 = crec.get('normalized_domain','')
m_domain_use = mrec.get('domain_use_flag', 0)
domain_flag_raw = 1 if (m_domain_use == 1 and dom1 and dom1 == dom2) else 0
# Location flags
city_match = 1 if (mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort')) else 0
country_match = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land')) else 0
# Name (nur sinnvolle Tokens)
n1 = mrec.get('normalized_name','')
n2 = crec.get('normalized_name','')
clean1, toks1 = clean_name_for_scoring(n1)
clean2, toks2 = clean_name_for_scoring(n2)
# Overlaps
overlap_clean = toks1 & toks2
# city-only overlap check (wenn nach Clean nichts übrig, aber Roh-Overlap evtl. Städte; wir cappen Score)
raw_overlap = set(_tokenize(n1)) & set(_tokenize(n2))
city_only_overlap = (not overlap_clean) and any(t in CITY_TOKENS for t in raw_overlap)
# Name-Score
if clean1 and clean2:
ts = fuzz.token_set_ratio(clean1, clean2)
pr = fuzz.partial_ratio(clean1, clean2)
ss = fuzz.token_sort_ratio(clean1, clean2)
name_score = max(ts, pr, ss)
else:
name_score = 0
if city_only_overlap and name_score > 70:
name_score = 70 # cap
# Rare-token-overlap (IDF-light): benutze seltensten Token aus mrec
rtoks_sorted = sorted(list(toks1), key=lambda t: (token_freq.get(t, 10**9), -len(t)))
rare_token = rtoks_sorted[0] if rtoks_sorted else None
rare_overlap = 1 if (rare_token and rare_token in toks2) else 0
# Domain Gate
domain_gate_ok = (name_score >= MIN_NAME_FOR_DOMAIN) or (city_match and country_match)
domain_used = 1 if (domain_flag_raw and domain_gate_ok) else 0
# Basisscore
total = domain_used*100 + name_score*1.0 + (1 if (city_match and country_match) else 0)*20
# Penalties
penalties = 0
if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match:
penalties += COUNTRY_MISMATCH_PENALTY
if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match:
penalties += CITY_MISMATCH_PENALTY
total -= penalties
# Bonus für starke Name-only Fälle
name_bonus = 1 if (domain_used == 0 and not (city_match and country_match) and name_score >= 85 and rare_overlap==1) else 0
if name_bonus:
total += 20
comp = {
'domain_raw': domain_flag_raw,
'domain_used': domain_used,
'domain_gate_ok': int(domain_gate_ok),
'name': round(name_score,1),
'city_match': city_match,
'country_match': country_match,
'penalties': penalties,
'name_bonus': name_bonus,
'rare_overlap': rare_overlap,
'city_only_overlap': int(city_only_overlap)
}
return round(total), comp
# --- Indexe ---
def build_indexes(crm_df: pd.DataFrame):
records = list(crm_df.to_dict('records'))
# Domain-Index
domain_index = {}
for r in records:
d = r.get('normalized_domain')
if d:
domain_index.setdefault(d, []).append(r)
# Token-Frequenzen (auf gereinigten Tokens)
token_freq = Counter()
for r in records:
_, toks = clean_name_for_scoring(r.get('normalized_name',''))
for t in set(toks):
token_freq[t] += 1
# Token-Index
token_index = {}
for r in records:
_, toks = clean_name_for_scoring(r.get('normalized_name',''))
for t in set(toks):
token_index.setdefault(t, []).append(r)
return records, domain_index, token_freq, token_index
def choose_rarest_token(norm_name: str, token_freq: Counter):
_, toks = clean_name_for_scoring(norm_name)
if not toks:
return None
lst = sorted(list(toks), key=lambda x: (token_freq.get(x, 10**9), -len(x)))
return lst[0] if lst else None
# --- Hauptfunktion ---
def main(job_id=None):
logger.info("Starte Duplikats-Check v2.15 (Quality-first++)")
update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...")
try:
sheet = GoogleSheetHandler()
logger.info("GoogleSheetHandler initialisiert")
except Exception as e:
logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}")
update_status(job_id, "Fehlgeschlagen", f"Init GoogleSheetHandler fehlgeschlagen: {e}")
sys.exit(1)
# Daten laden
update_status(job_id, "Läuft", "Lade CRM- und Matching-Daten...")
crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME)
match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {0 if match_df is None else len(match_df)} Matching-Datensätze")
if crm_df is None or crm_df.empty or match_df is None or match_df.empty:
logger.critical("Leere Daten in einem der Sheets. Abbruch.")
update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.")
return
# SerpAPI nur für Matching (B und E leer)
# Annahme: serp_key wird global geladen, z.B. durch Config.load_api_keys()
if Config.API_KEYS.get('serpapi'):
if 'Gefundene Website' not in match_df.columns:
match_df['Gefundene Website'] = ''
b_empty = match_df['CRM Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na'])
e_empty = match_df['Gefundene Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na'])
empty_mask = b_empty & e_empty
empty_count = int(empty_mask.sum())
if empty_count > 0:
update_status(job_id, "Läuft", f"Suche Websites für {empty_count} Firmen via SerpAPI...")
logger.info(f"Serp-Fallback für Matching: {empty_count} Firmen ohne URL in B/E")
found_cnt = 0
trust_stats = Counter()
for idx, row in match_df[empty_mask].iterrows():
company = row['CRM Name']
try:
url = serp_website_lookup(company)
if url and 'k.A.' not in url:
if not str(url).startswith(('http://','https://')):
url = 'https://' + str(url).lstrip()
trust = assess_serp_trust(company, url)
match_df.at[idx, 'Gefundene Website'] = url
match_df.at[idx, 'Serp Vertrauen'] = trust
trust_stats[trust] += 1
logger.info(f" ✓ URL gefunden: '{company}' -> {url} (Vertrauen: {trust})")
found_cnt += 1
else:
logger.debug(f" ✗ Keine eindeutige URL: '{company}' -> {url}")
except Exception as e:
logger.warning(f" ! Serp-Fehler für '{company}': {e}")
logger.info(f"Serp-Fallback beendet: {found_cnt}/{empty_count} URLs ergänzt | Trust: {dict(trust_stats)}")
else:
logger.info("Serp-Fallback übersprungen: B oder E bereits befüllt (keine fehlenden Matching-URLs)")
# Normalisierung CRM
update_status(job_id, "Läuft", "Normalisiere CRM-Daten...")
crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name)
crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url)
crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip()
crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip()
crm_df['block_key'] = crm_df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
crm_df['domain_use_flag'] = 1
# Normalisierung Matching
update_status(job_id, "Läuft", "Normalisiere Matching-Daten...")
match_df['Gefundene Website'] = match_df.get('Gefundene Website', pd.Series(index=match_df.index, dtype=object))
match_df['Serp Vertrauen'] = match_df.get('Serp Vertrauen', pd.Series(index=match_df.index, dtype=object))
match_df['Effektive Website'] = match_df['CRM Website'].fillna('').astype(str).str.strip()
mask_eff = match_df['Effektive Website'] == ''
match_df.loc[mask_eff, 'Effektive Website'] = match_df['Gefundene Website'].fillna('').astype(str).str.strip()
match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name)
match_df['normalized_domain'] = match_df['Effektive Website'].astype(str).apply(simple_normalize_url)
match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip()
match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip()
match_df['block_key'] = match_df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
def _domain_use(row):
if str(row.get('CRM Website','')).strip(): return 1
trust = str(row.get('Serp Vertrauen','')).lower()
return 1 if trust == 'hoch' else 0
match_df['domain_use_flag'] = match_df.apply(_domain_use, axis=1)
# City-Tokens dynamisch bauen
def build_city_tokens(crm_df, match_df):
cities = set()
for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique():
for t in _tokenize(s):
if len(t) >= 3: cities.add(t)
return cities
global CITY_TOKENS
CITY_TOKENS = build_city_tokens(crm_df, match_df)
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
# Blocking-Indizes
update_status(job_id, "Läuft", "Erstelle Blocking-Indizes...")
crm_records, domain_index, token_freq, token_index = build_indexes(crm_df)
logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}")
# Matching
results = []
metrics = Counter()
total = len(match_df)
logger.info("Starte Matching-Prozess…")
for idx, mrow in match_df.to_dict('index').items():
processed = idx + 1 # Annahme, dass der Index 0-basiert ist
progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'"
logger.info(progress_message)
if processed % 5 == 0: # Status alle 5 Zeilen aktualisieren
update_status(job_id, "Läuft", progress_message)
candidates = []
used_block = ''
if mrow.get('normalized_domain') and mrow.get('domain_use_flag') == 1:
candidates = domain_index.get(mrow['normalized_domain'], [])
used_block = f"domain:{mrow['normalized_domain']}"
if not candidates:
rtok = choose_rarest_token(mrow.get('normalized_name',''), token_freq)
if rtok:
candidates = token_index.get(rtok, [])
used_block = f"token:{rtok}"
if not candidates:
pf = []
n1 = mrow.get('normalized_name','')
rtok = choose_rarest_token(n1, token_freq)
clean1, toks1 = clean_name_for_scoring(n1)
if clean1:
for r in crm_records:
n2 = r.get('normalized_name','')
clean2, toks2 = clean_name_for_scoring(n2)
if not clean2: continue
if rtok and rtok not in toks2: continue
pr = fuzz.partial_ratio(clean1, clean2)
if pr >= PREFILTER_MIN_PARTIAL:
pf.append((pr, r))
pf.sort(key=lambda x: x[0], reverse=True)
candidates = [r for _, r in pf[:PREFILTER_LIMIT]]
used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}"
logger.info(f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}' -> {len(candidates)} Kandidaten (Block={used_block})")
if not candidates:
results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'})
continue
scored = []
for cr in candidates:
score, comp = calculate_similarity(mrow, cr, token_freq)
scored.append((cr.get('CRM Name',''), score, comp))
scored.sort(key=lambda x: x[1], reverse=True)
for cand_name, sc, comp in scored[:5]:
logger.debug(f" Kandidat: {cand_name} | Score={sc} | Comp={comp}")
best_name, best_score, best_comp = scored[0]
weak = (best_comp.get('domain_used') == 0 and not (best_comp.get('city_match') and best_comp.get('country_match')))
applied_threshold = SCORE_THRESHOLD_WEAK if weak else SCORE_THRESHOLD
weak_guard_fail = (weak and best_comp.get('rare_overlap') == 0)
if not weak_guard_fail and best_score >= applied_threshold:
results.append({'Match': best_name, 'Score': best_score, 'Match_Grund': str(best_comp)})
metrics['matches_total'] += 1
if best_comp.get('domain_used') == 1: metrics['matches_domain'] += 1
if best_comp.get('city_match') and best_comp.get('country_match'): metrics['matches_with_loc'] += 1
if best_comp.get('domain_used') == 0 and best_comp.get('name') >= 85 and not (best_comp.get('city_match') and best_comp.get('country_match')):
metrics['matches_name_only'] += 1
logger.info(f" --> Match: '{best_name}' ({best_score}) {best_comp} | TH={applied_threshold}{' weak' if weak else ''}")
else:
reason = 'weak_guard_no_rare' if weak_guard_fail else 'below_threshold'
results.append({'Match':'', 'Score': best_score, 'Match_Grund': f"{best_comp} | {reason} TH={applied_threshold}"})
logger.info(f" --> Kein Match (Score={best_score}) {best_comp} | {reason} TH={applied_threshold}")
# Ergebnisse zurückschreiben
update_status(job_id, "Läuft", "Schreibe Ergebnisse zurück ins Sheet...")
res_df = pd.DataFrame(results, index=match_df.index)
write_df = match_df.copy()
write_df['Match'] = res_df['Match']
write_df['Score'] = res_df['Score']
write_df['Match_Grund'] = res_df['Match_Grund']
drop_cols = ['normalized_name','normalized_domain','block_key','Effektive Website','domain_use_flag']
for c in drop_cols:
if c in write_df.columns:
write_df.drop(columns=[c], inplace=True)
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
backup_path = os.path.join(Config.LOG_DIR, f"{now}_backup_{MATCHING_SHEET_NAME}.csv")
try:
write_df.to_csv(backup_path, index=False, encoding='utf-8')
logger.info(f"Lokales Backup geschrieben: {backup_path}")
except Exception as e:
logger.warning(f"Backup fehlgeschlagen: {e}")
data = [write_df.columns.tolist()] + write_df.fillna('').values.tolist()
ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data)
if ok:
logger.info("Ergebnisse erfolgreich geschrieben")
update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.")
else:
logger.error("Fehler beim Schreiben ins Google Sheet")
update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.")
# Summary
serp_counts = Counter((str(x).lower() for x in write_df.get('Serp Vertrauen', [])))
logger.info("===== Summary =====")
logger.info(f"Matches total: {metrics['matches_total']} | mit Domain: {metrics['matches_domain']} | mit Ort: {metrics['matches_with_loc']} | nur Name: {metrics['matches_name_only']}")
logger.info(f"Serp Vertrauen: {dict(serp_counts)}")
logger.info(f"Config: TH={SCORE_THRESHOLD}, TH_WEAK={SCORE_THRESHOLD_WEAK}, MIN_NAME_FOR_DOMAIN={MIN_NAME_FOR_DOMAIN}, Penalties(city={CITY_MISMATCH_PENALTY},country={COUNTRY_MISMATCH_PENALTY}), Prefilter(partial>={PREFILTER_MIN_PARTIAL}, limit={PREFILTER_LIMIT})")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.")
args = parser.parse_args()
main(job_id=args.job_id)