Files
Brancheneinstufung2/_legacy_gsheets_system/company_deduplicator.py
Floke c6a37a3c17 feat(company-explorer): Initial Web UI & Backend with Enrichment Flow
This commit introduces the foundational elements for the new "Company Explorer" web application, marking a significant step away from the legacy Google Sheets / CLI system.

Key changes include:
- Project Structure: A new  directory with separate  (FastAPI) and  (React/Vite) components.
- Data Persistence: Migration from Google Sheets to a local SQLite database () using SQLAlchemy.
- Core Utilities: Extraction and cleanup of essential helper functions (LLM wrappers, text utilities) into .
- Backend Services: , ,  for AI-powered analysis, and  logic.
- Frontend UI: Basic React application with company table, import wizard, and dynamic inspector sidebar.
- Docker Integration: Updated  and  for multi-stage builds and sideloading.
- Deployment & Access: Integrated into central Nginx proxy and dashboard, accessible via .

Lessons Learned & Fixed during development:
- Frontend Asset Loading: Addressed issues with Vite's  path and FastAPI's .
- TypeScript Configuration: Added  and .
- Database Schema Evolution: Solved  errors by forcing a new database file and correcting  override.
- Logging: Implemented robust file-based logging ().

This new foundation provides a powerful and maintainable platform for future B2B robotics lead generation.
2026-01-07 17:55:08 +00:00

673 lines
29 KiB
Python

import os
import sys
import re
import logging
import pandas as pd
from datetime import datetime
from collections import Counter
from thefuzz import fuzz
from helpers import normalize_company_name, simple_normalize_url, serp_website_lookup
from config import Config
from google_sheet_handler import GoogleSheetHandler
# duplicate_checker.py v2.15
# Quality-first ++: Domain-Gate, Location-Penalties, Smart Blocking (IDF-light),
# Serp-Trust, Weak-Threshold, City-Bias-Guard, Prefilter tightened, Metrics
# Build timestamp is injected into logfile name.
# --- Konfiguration ---
CRM_SHEET_NAME = "CRM_Accounts"
MATCHING_SHEET_NAME = "Matching_Accounts"
SCORE_THRESHOLD = 80 # Standard-Schwelle
SCORE_THRESHOLD_WEAK= 95 # Schwelle, wenn weder Domain noch (City&Country) matchen
MIN_NAME_FOR_DOMAIN = 70 # Domain-Score nur, wenn Name >= 70 ODER Ort+Land matchen
CITY_MISMATCH_PENALTY = 30
COUNTRY_MISMATCH_PENALTY = 40
PREFILTER_MIN_PARTIAL = 70 # (vorher 60)
PREFILTER_LIMIT = 30 # (vorher 50)
LOG_DIR = "Log"
now = datetime.now().strftime('%Y-%m-%d_%H-%M')
LOG_FILE = f"{now}_duplicate_check_v2.15.txt"
# --- Logging Setup ---
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR, exist_ok=True)
log_path = os.path.join(LOG_DIR, LOG_FILE)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
for h in list(root.handlers):
root.removeHandler(h)
formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s")
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
root.addHandler(ch)
fh = logging.FileHandler(log_path, mode='a', encoding='utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
root.addHandler(fh)
logger = logging.getLogger(__name__)
logger.info(f"Logging to console and file: {log_path}")
logger.info(f"Starting duplicate_checker.py v2.15 | Build: {now}")
# --- SerpAPI Key laden ---
try:
Config.load_api_keys()
serp_key = Config.API_KEYS.get('serpapi')
if not serp_key:
logger.warning("SerpAPI Key nicht gefunden; Serp-Fallback deaktiviert.")
except Exception as e:
logger.warning(f"Fehler beim Laden API-Keys: {e}")
serp_key = None
# --- Stop-/City-Tokens ---
STOP_TOKENS_BASE = {
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl',
'holding','gruppe','group','international','solutions','solution','service','services',
'deutschland','austria','germany','technik','technology','technologies','systems','systeme',
'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel',
'international','company','gesellschaft','mbh&co','mbhco','werke','werk','renkhoff','sonnenschutztechnik'
}
CITY_TOKENS = set() # dynamisch befüllt nach Datennormalisierung
# --- Utilities ---
def _tokenize(s: str):
if not s:
return []
return re.split(r"[^a-z0-9]+", str(s).lower())
def split_tokens(name: str):
"""Tokens für Indexing/Scoring (Basis-Stop + dynamische City-Tokens)."""
if not name:
return []
tokens = [t for t in _tokenize(name) if len(t) >= 3]
stop_union = STOP_TOKENS_BASE | CITY_TOKENS
return [t for t in tokens if t not in stop_union]
def clean_name_for_scoring(norm_name: str):
"""Entfernt Stop- & City-Tokens. Leerer Output => kein sinnvoller Namevergleich."""
toks = split_tokens(norm_name)
return " ".join(toks), set(toks)
def assess_serp_trust(company_name: str, url: str) -> str:
"""Vertrauen 'hoch/mittel/niedrig' anhand Token-Vorkommen in Domain."""
if not url:
return 'n/a'
host = simple_normalize_url(url) or ''
host = host.replace('www.', '')
name_toks = [t for t in split_tokens(normalize_company_name(company_name)) if len(t) >= 3]
if any(t in host for t in name_toks if len(t) >= 4):
return 'hoch'
if any(t in host for t in name_toks if len(t) == 3):
return 'mittel'
return 'niedrig'
# --- Similarity ---
def calculate_similarity(mrec: dict, crec: dict, token_freq: Counter):
n1 = mrec.get('normalized_name','')
n2 = crec.get('normalized_name','')
# NEU: Direkte Prämierung für exakten Namens-Match
if n1 and n1 == n2:
return 300, {'name': 100, 'exact_match': 1}
# Domain (mit Gate)
dom1 = mrec.get('normalized_domain','')
dom2 = crec.get('normalized_domain','')
m_domain_use = mrec.get('domain_use_flag', 0)
domain_flag_raw = 1 if (m_domain_use == 1 and dom1 and dom1 == dom2) else 0
# Location flags
city_match = 1 if (mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort')) else 0
country_match = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land')) else 0
# Name (nur sinnvolle Tokens)
n1 = mrec.get('normalized_name','')
n2 = crec.get('normalized_name','')
clean1, toks1 = clean_name_for_scoring(n1)
clean2, toks2 = clean_name_for_scoring(n2)
# Overlaps
overlap_clean = toks1 & toks2
# city-only overlap check (wenn nach Clean nichts übrig, aber Roh-Overlap evtl. Städte; wir cappen Score)
raw_overlap = set(_tokenize(n1)) & set(_tokenize(n2))
city_only_overlap = (not overlap_clean) and any(t in CITY_TOKENS for t in raw_overlap)
# Name-Score
if clean1 and clean2:
ts = fuzz.token_set_ratio(clean1, clean2)
pr = fuzz.partial_ratio(clean1, clean2)
ss = fuzz.token_sort_ratio(clean1, clean2)
name_score = max(ts, pr, ss)
else:
name_score = 0
if city_only_overlap and name_score > 70:
name_score = 70 # cap
# Rare-token-overlap (IDF-light): benutze seltensten Token aus mrec
rtoks_sorted = sorted(list(toks1), key=lambda t: (token_freq.get(t, 10**9), -len(t)))
rare_token = rtoks_sorted[0] if rtoks_sorted else None
rare_overlap = 1 if (rare_token and rare_token in toks2) else 0
# Domain Gate
domain_gate_ok = (name_score >= MIN_NAME_FOR_DOMAIN) or (city_match and country_match)
domain_used = 1 if (domain_flag_raw and domain_gate_ok) else 0
# Basisscore
total = domain_used*100 + name_score*1.0 + (1 if (city_match and country_match) else 0)*20
# Penalties
penalties = 0
if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match:
penalties += COUNTRY_MISMATCH_PENALTY
if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match:
penalties += CITY_MISMATCH_PENALTY
total -= penalties
# Bonus für starke Name-only Fälle
name_bonus = 1 if (domain_used == 0 and not (city_match and country_match) and name_score >= 85 and rare_overlap==1) else 0
if name_bonus:
total += 20
comp = {
'domain_raw': domain_flag_raw,
'domain_used': domain_used,
'domain_gate_ok': int(domain_gate_ok),
'name': round(name_score,1),
'city_match': city_match,
'country_match': country_match,
'penalties': penalties,
'name_bonus': name_bonus,
'rare_overlap': rare_overlap,
'city_only_overlap': int(city_only_overlap),
'is_parent_child': 0 # Standardwert
}
# Prüfen auf Parent-Child-Beziehung
n1_norm = mrec.get('normalized_name','')
n2_norm = crec.get('normalized_name','')
p1_norm = mrec.get('normalized_parent_name','')
p2_norm = crec.get('normalized_parent_name','')
if (n1_norm and p2_norm and n1_norm == p2_norm) or \
(n2_norm and p1_norm and n2_norm == p1_norm):
comp['is_parent_child'] = 1
# Wenn es eine Parent-Child-Beziehung ist, geben wir einen sehr hohen Score zurück,
# aber mit dem Flag, damit es später ignoriert werden kann.
return 500, comp # Sehr hoher Score, um es leicht erkennbar zu machen
return round(total), comp
# --- Indexe ---
def build_indexes(crm_df: pd.DataFrame):
records = list(crm_df.to_dict('records'))
# Domain-Index
domain_index = {}
for r in records:
d = r.get('normalized_domain')
if d:
domain_index.setdefault(d, []).append(r)
# Token-Frequenzen (auf gereinigten Tokens)
token_freq = Counter()
for r in records:
_, toks = clean_name_for_scoring(r.get('normalized_name',''))
for t in set(toks):
token_freq[t] += 1
# Token-Index
token_index = {}
for r in records:
_, toks = clean_name_for_scoring(r.get('normalized_name',''))
for t in set(toks):
token_index.setdefault(t, []).append(r)
return records, domain_index, token_freq, token_index
def choose_rarest_token(norm_name: str, token_freq: Counter):
_, toks = clean_name_for_scoring(norm_name)
if not toks:
return None
lst = sorted(list(toks), key=lambda x: (token_freq.get(x, 10**9), -len(x)))
return lst[0] if lst else None
def build_city_tokens(df1: pd.DataFrame, df2: pd.DataFrame = None):
"""Baut dynamisch ein Set von City-Tokens aus den Orts-Spalten."""
dfs = [df1]
if df2 is not None:
dfs.append(df2)
cities = set()
for s in pd.concat([df['CRM Ort'] for df in dfs], ignore_index=True).dropna().unique():
for t in _tokenize(s):
if len(t) >= 3:
cities.add(t)
return cities
def run_internal_deduplication():
"""Führt die interne Deduplizierung auf dem CRM_Accounts-Sheet durch."""
logger.info("Modus 'Interne Deduplizierung' gewählt.")
try:
sheet = GoogleSheetHandler()
logger.info("GoogleSheetHandler initialisiert")
except Exception as e:
logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}")
sys.exit(1)
# Daten laden
crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME)
if crm_df is None or crm_df.empty:
logger.critical("CRM-Sheet ist leer. Abbruch.")
return
# Eindeutige ID hinzufügen, um Zeilen zu identifizieren
crm_df['unique_id'] = crm_df.index
logger.info(f"{len(crm_df)} CRM-Datensätze geladen.")
# Normalisierung
crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name)
crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url)
crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip()
crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip()
crm_df['Parent Account'] = crm_df.get('Parent Account', pd.Series(index=crm_df.index, dtype=object)).astype(str).fillna('').str.strip()
crm_df['normalized_parent_name'] = crm_df['Parent Account'].apply(normalize_company_name)
crm_df['domain_use_flag'] = 1 # CRM-Domain gilt als vertrauenswürdig
# City-Tokens und Blocking-Indizes
global CITY_TOKENS
CITY_TOKENS = build_city_tokens(crm_df)
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
crm_records, domain_index, token_freq, token_index = build_indexes(crm_df)
logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}")
# --- Selbst-Vergleich ---
found_pairs = []
processed_pairs = set() # Verhindert (A,B) und (B,A)
total = len(crm_records)
logger.info("Starte internen Abgleich...")
for i, record1 in enumerate(crm_records):
if i % 100 == 0:
logger.info(f"Verarbeite Datensatz {i}/{total}...")
candidate_records = {}
# Kandidaten via Domain finden
domain = record1.get('normalized_domain')
if domain:
for record2 in domain_index.get(domain, []):
candidate_records[record2['unique_id']] = record2
# Kandidaten via seltenstem Token finden
rtok = choose_rarest_token(record1.get('normalized_name',''), token_freq)
if rtok:
for record2 in token_index.get(rtok, []):
candidate_records[record2['unique_id']] = record2
if not candidate_records:
continue
for record2 in candidate_records.values():
# Vergleiche nicht mit sich selbst
if record1['unique_id'] == record2['unique_id']:
continue
# Verhindere doppelte Vergleiche (A,B) vs (B,A)
pair_key = tuple(sorted((record1['unique_id'], record2['unique_id'])))
if pair_key in processed_pairs:
continue
processed_pairs.add(pair_key)
score, comp = calculate_similarity(record1, record2, token_freq)
# Wenn es eine bekannte Parent-Child-Beziehung ist, ignorieren wir sie.
if comp.get('is_parent_child') == 1:
logger.debug(f" -> Ignoriere bekannte Parent-Child-Beziehung: '{record1['CRM Name']}' <-> '{record2['CRM Name']}'")
continue
# Akzeptanzlogik (hier könnte man den Threshold anpassen)
if score >= SCORE_THRESHOLD:
duplicate_hint = ''
# Prüfen, ob beide Accounts keinen Parent Account haben
if not record1.get('Parent Account') and not record2.get('Parent Account'):
duplicate_hint = 'Potenziell fehlende Parent-Account-Beziehung'
pair_info = {
'id1': record1['unique_id'], 'name1': record1['CRM Name'],
'id2': record2['unique_id'], 'name2': record2['CRM Name'],
'score': score,
'details': str(comp),
'hint': duplicate_hint
}
found_pairs.append(pair_info)
logger.info(f" -> Potenzielles Duplikat gefunden: '{record1['CRM Name']}' <-> '{record2['CRM Name']}' (Score: {score}, Hint: {duplicate_hint})")
logger.info("\n===== Interner Abgleich abgeschlossen ====")
logger.info(f"Insgesamt {len(found_pairs)} potenzielle Duplikatspaare gefunden.")
if not found_pairs:
logger.info("Keine weiteren Schritte nötig.")
return
groups = group_duplicate_pairs(found_pairs)
logger.info(f"{len(groups)} eindeutige Duplikatsgruppen gebildet.")
if not groups:
logger.info("Keine Duplikate gefunden, die geschrieben werden müssen.")
return
# Schritt 4: IDs zuweisen und in Tabelle schreiben
crm_df['Duplicate_ID'] = ''
crm_df['Duplicate_Hint'] = '' # Neue Spalte für Hinweise
dup_counter = 1
for group in groups:
dup_id = f"Dup_{dup_counter:04d}"
dup_counter += 1
# IDs der Gruppe im DataFrame aktualisieren
crm_df.loc[crm_df['unique_id'].isin(group), 'Duplicate_ID'] = dup_id
# Hinweise für die Gruppe sammeln und setzen
group_hints = [p['hint'] for p in found_pairs if p['id1'] in group or p['id2'] in group and p['hint']]
if group_hints:
# Nur den ersten eindeutigen Hinweis pro Gruppe setzen, oder eine Zusammenfassung
unique_hints = list(set(group_hints))
crm_df.loc[crm_df['unique_id'].isin(group), 'Duplicate_Hint'] = "; ".join(unique_hints)
# Namen der Gruppenmitglieder für Log-Ausgabe sammeln
member_names = crm_df[crm_df['unique_id'].isin(group)]['CRM Name'].tolist()
logger.info(f"Gruppe {dup_id}: {member_names}")
# Bereinigen der Hilfsspalten vor dem Schreiben
crm_df.drop(columns=['unique_id', 'normalized_name', 'normalized_domain', 'domain_use_flag', 'normalized_parent_name'], inplace=True)
# Ergebnisse zurückschreiben
logger.info("Schreibe Ergebnisse mit Duplikats-IDs ins Sheet...")
backup_path = os.path.join(LOG_DIR, f"{now}_backup_internal_{CRM_SHEET_NAME}.csv")
try:
crm_df.to_csv(backup_path, index=False, encoding='utf-8')
logger.info(f"Lokales Backup geschrieben: {backup_path}")
except Exception as e:
logger.warning(f"Backup fehlgeschlagen: {e}")
data = [crm_df.columns.tolist()] + crm_df.fillna('').values.tolist()
ok = sheet.clear_and_write_data(CRM_SHEET_NAME, data)
if ok:
logger.info("Ergebnisse erfolgreich ins Google Sheet geschrieben.")
else:
logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.")
def group_duplicate_pairs(pairs: list) -> list:
"""Fasst eine Liste von Duplikatspaaren zu Gruppen zusammen."""
groups = []
for pair in pairs:
id1, id2 = pair['id1'], pair['id2']
group1_found = None
group2_found = None
for group in groups:
if id1 in group:
group1_found = group
if id2 in group:
group2_found = group
if group1_found and group2_found:
if group1_found is not group2_found: # Zwei unterschiedliche Gruppen verschmelzen
group1_found.update(group2_found)
groups.remove(group2_found)
elif group1_found: # Zu Gruppe 1 hinzufügen
group1_found.add(id2)
elif group2_found: # Zu Gruppe 2 hinzufügen
group2_found.add(id1)
else: # Neue Gruppe erstellen
groups.append({id1, id2})
return [set(g) for g in groups]
def run_external_comparison():
"""Führt den Vergleich zwischen CRM_Accounts und Matching_Accounts durch."""
logger.info("Modus 'Externer Vergleich' gewählt.")
try:
sheet = GoogleSheetHandler()
logger.info("GoogleSheetHandler initialisiert")
except Exception as e:
logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}")
sys.exit(1)
# Daten laden
crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME)
match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {0 if match_df is None else len(match_df)} Matching-Datensätze")
if crm_df is None or crm_df.empty or match_df is None or match_df.empty:
logger.critical("Leere Daten in einem der Sheets. Abbruch.")
return
# SerpAPI nur für Matching (B und E leer)
if serp_key:
if 'Gefundene Website' not in match_df.columns:
match_df['Gefundene Website'] = ''
b_empty = match_df['CRM Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na'])
e_empty = match_df['Gefundene Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na'])
empty_mask = b_empty & e_empty
empty_count = int(empty_mask.sum())
if empty_count > 0:
logger.info(f"Serp-Fallback für Matching: {empty_count} Firmen ohne URL in B/E")
found_cnt = 0
trust_stats = Counter()
for idx, row in match_df[empty_mask].iterrows():
company = row['CRM Name']
try:
url = serp_website_lookup(company)
if url and 'k.A.' not in url:
if not str(url).startswith(('http://','https://')):
url = 'https://' + str(url).lstrip()
trust = assess_serp_trust(company, url)
match_df.at[idx, 'Gefundene Website'] = url
match_df.at[idx, 'Serp Vertrauen'] = trust
trust_stats[trust] += 1
logger.info(f" ✓ URL gefunden: '{company}' -> {url} (Vertrauen: {trust})")
found_cnt += 1
else:
logger.debug(f" ✗ Keine eindeutige URL: '{company}' -> {url}")
except Exception as e:
logger.warning(f" ! Serp-Fehler für '{company}': {e}")
logger.info(f"Serp-Fallback beendet: {found_cnt}/{empty_count} URLs ergänzt | Trust: {dict(trust_stats)}")
else:
logger.info("Serp-Fallback übersprungen: B oder E bereits befüllt (keine fehlenden Matching-URLs)")
# Normalisierung CRM
crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name)
crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url)
crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip()
crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip()
crm_df['Parent Account'] = crm_df.get('Parent Account', pd.Series(index=crm_df.index, dtype=object)).astype(str).fillna('').str.strip()
crm_df['normalized_parent_name'] = crm_df['Parent Account'].apply(normalize_company_name)
crm_df['block_key'] = crm_df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
crm_df['domain_use_flag'] = 1 # CRM-Domain gilt als vertrauenswürdig
# Normalisierung Matching
match_df['Gefundene Website'] = match_df.get('Gefundene Website', pd.Series(index=match_df.index, dtype=object))
match_df['Serp Vertrauen'] = match_df.get('Serp Vertrauen', pd.Series(index=match_df.index, dtype=object))
match_df['Effektive Website'] = match_df['CRM Website'].fillna('').astype(str).str.strip()
mask_eff = match_df['Effektive Website'] == ''
match_df.loc[mask_eff, 'Effektive Website'] = match_df['Gefundene Website'].fillna('').astype(str).str.strip()
match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name)
match_df['normalized_domain'] = match_df['Effektive Website'].astype(str).apply(simple_normalize_url)
match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip()
match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip()
match_df['block_key'] = match_df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
# Domain-Vertrauen/Use-Flag
def _domain_use(row):
if str(row.get('CRM Website','')).strip():
return 1
trust = str(row.get('Serp Vertrauen','')).lower()
return 1 if trust == 'hoch' else 0
match_df['domain_use_flag'] = match_df.apply(_domain_use, axis=1)
# City-Tokens dynamisch bauen (nach Normalisierung von Ort)
global CITY_TOKENS
CITY_TOKENS = build_city_tokens(crm_df, match_df)
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
# Blocking-Indizes (nachdem CITY_TOKENS gesetzt wurde)
crm_records, domain_index, token_freq, token_index = build_indexes(crm_df)
logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}")
# Matching
results = []
metrics = Counter()
total = len(match_df)
logger.info("Starte Matching-Prozess…")
processed = 0
for idx, mrow in match_df.to_dict('index').items():
processed += 1
name_disp = mrow.get('CRM Name','')
# --- NEUE KANDIDATEN-SAMMELLOGIK ---
candidate_records = {} # Dict, um Duplikate zu vermeiden und Records zu speichern
used_blocks = []
# 1. Priorität: Exakter Namens-Match
mrec_norm_name = mrow.get('normalized_name')
if mrec_norm_name:
exact_matches = crm_df[crm_df['normalized_name'] == mrec_norm_name]
if not exact_matches.empty:
for _, record in exact_matches.to_dict('index').items():
candidate_records[record['CRM Name']] = record
used_blocks.append('exact_name')
# 2. Domain-Match
if mrow.get('normalized_domain') and mrow.get('domain_use_flag') == 1:
domain_cands = domain_index.get(mrow['normalized_domain'], [])
if domain_cands:
for record in domain_cands:
candidate_records[record['CRM Name']] = record
used_blocks.append('domain')
# 3. Rarest-Token-Match
rtok = choose_rarest_token(mrow.get('normalized_name',''), token_freq)
if rtok:
token_cands = token_index.get(rtok, [])
if token_cands:
for record in token_cands:
candidate_records[record['CRM Name']] = record
used_blocks.append('token')
# 4. Prefilter als Fallback, wenn wenige Kandidaten gefunden wurden
if len(candidate_records) < PREFILTER_LIMIT:
pf = []
n1 = mrow.get('normalized_name','')
rtok = choose_rarest_token(n1, token_freq)
clean1, toks1 = clean_name_for_scoring(n1)
if clean1:
for r in crm_records:
if r['CRM Name'] in candidate_records: continue # Nicht erneut prüfen
n2 = r.get('normalized_name','')
clean2, toks2 = clean_name_for_scoring(n2)
if not clean2 or (rtok and rtok not in toks2):
continue
pr = fuzz.partial_ratio(clean1, clean2)
if pr >= PREFILTER_MIN_PARTIAL:
pf.append((pr, r))
pf.sort(key=lambda x: x[0], reverse=True)
for _, record in pf[:PREFILTER_LIMIT]:
candidate_records[record['CRM Name']] = record
if pf: used_blocks.append('prefilter')
candidates = list(candidate_records.values())
logger.info(f"Prüfe {processed}/{total}: '{name_disp}' -> {len(candidates)} Kandidaten (Blocks={','.join(used_blocks)})")
if not candidates:
results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'})
continue
scored = []
for cr in candidates:
score, comp = calculate_similarity(mrow, cr, token_freq)
scored.append((cr.get('CRM Name',''), score, comp))
scored.sort(key=lambda x: x[1], reverse=True)
# Log Top5
for cand_name, sc, comp in scored[:5]:
logger.debug(f" Kandidat: {cand_name} | Score={sc} | Comp={comp}")
best_name, best_score, best_comp = scored[0]
# Akzeptanzlogik (Weak-Threshold + Guard)
weak = (best_comp.get('domain_used') == 0 and not (best_comp.get('city_match') and best_comp.get('country_match')))
applied_threshold = SCORE_THRESHOLD_WEAK if weak else SCORE_THRESHOLD
weak_guard_fail = (weak and best_comp.get('rare_overlap') == 0)
if not weak_guard_fail and best_score >= applied_threshold:
results.append({'Match': best_name, 'Score': best_score, 'Match_Grund': str(best_comp)})
metrics['matches_total'] += 1
if best_comp.get('domain_used') == 1:
metrics['matches_domain'] += 1
if best_comp.get('city_match') and best_comp.get('country_match'):
metrics['matches_with_loc'] += 1
if best_comp.get('domain_used') == 0 and best_comp.get('name') >= 85 and not (best_comp.get('city_match') and best_comp.get('country_match')):
metrics['matches_name_only'] += 1
logger.info(f" --> Match: '{best_name}' ({best_score}) {best_comp} | TH={applied_threshold}{' weak' if weak else ''}")
else:
reason = 'weak_guard_no_rare' if weak_guard_fail else 'below_threshold'
results.append({'Match':'', 'Score': best_score, 'Match_Grund': f"{best_comp} | {reason} TH={applied_threshold}"})
logger.info(f" --> Kein Match (Score={best_score}) {best_comp} | {reason} TH={applied_threshold}")
# Ergebnisse zurückschreiben (SAFE)
logger.info("Schreibe Ergebnisse ins Sheet (SAFE in-place, keine Spaltenverluste)…")
res_df = pd.DataFrame(results, index=match_df.index)
write_df = match_df.copy()
write_df['Match'] = res_df['Match']
write_df['Score'] = res_df['Score']
write_df['Match_Grund'] = res_df['Match_Grund']
drop_cols = ['normalized_name','normalized_domain','block_key','Effektive Website','domain_use_flag', 'normalized_parent_name']
for c in drop_cols:
if c in write_df.columns:
write_df.drop(columns=[c], inplace=True)
backup_path = os.path.join(LOG_DIR, f"{now}_backup_{MATCHING_SHEET_NAME}.csv")
try:
write_df.to_csv(backup_path, index=False, encoding='utf-8')
logger.info(f"Lokales Backup geschrieben: {backup_path}")
except Exception as e:
logger.warning(f"Backup fehlgeschlagen: {e}")
data = [write_df.columns.tolist()] + write_df.fillna('').values.tolist()
ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data)
if ok:
logger.info("Ergebnisse erfolgreich geschrieben")
else:
logger.error("Fehler beim Schreiben ins Google Sheet")
# Summary
serp_counts = Counter((str(x).lower() for x in write_df.get('Serp Vertrauen', [])))
logger.info("===== Summary =====")
logger.info(f"Matches total: {metrics['matches_total']} | mit Domain: {metrics['matches_domain']} | mit Ort: {metrics['matches_with_loc']} | nur Name: {metrics['matches_name_only']}")
logger.info(f"Serp Vertrauen: {dict(serp_counts)}")
logger.info(f"Config: TH={SCORE_THRESHOLD}, TH_WEAK={SCORE_THRESHOLD_WEAK}, MIN_NAME_FOR_DOMAIN={MIN_NAME_FOR_DOMAIN}, Penalties(city={CITY_MISMATCH_PENALTY},country={COUNTRY_MISMATCH_PENALTY}), Prefilter(partial>={PREFILTER_MIN_PARTIAL}, limit={PREFILTER_LIMIT})")
# --- Hauptfunktion ---
def main():
logger.info("Starte Duplikats-Check v3.0")
while True:
print("\nBitte wählen Sie den gewünschten Modus:")
print("1: Externer Vergleich (gleicht CRM_Accounts mit Matching_Accounts ab)")
print("2: Interne Deduplizierung (findet Duplikate innerhalb von CRM_Accounts)")
choice = input("Ihre Wahl (1 oder 2): ")
if choice == '1':
run_external_comparison()
break
elif choice == '2':
run_internal_deduplication()
break
else:
print("Ungültige Eingabe. Bitte geben Sie 1 oder 2 ein.")
if __name__=='__main__':
main()