Files
Brancheneinstufung2/duplicate_checker.py
Floke 538a0f2885 duplicate_checker.py aktualisiert
- Scoring-Formel und Multiplikatoren neu gewichtet, um einzigartige Namens-Tokens stärker zu bewerten ("Großzügigkeits-Boost").
- Schwellenwerte (Thresholds) erneut feinjustiert, um die Balance zwischen korrekten und falschen Treffern zu optimieren.
- Logik des Domain-Gates beibehalten und sichergestellt, dass es korrekt greift.
- Golden-Rule und Interaktiver Modus unverändert.
2025-09-05 08:10:28 +00:00

429 lines
19 KiB
Python

# duplicate_checker.py v3.2
# Build timestamp is injected into logfile name.
# --- ÄNDERUNGEN v3.2 ---
# - Scoring-Formel und Multiplikatoren neu gewichtet, um einzigartige Namens-Tokens stärker zu bewerten ("Großzügigkeits-Boost").
# - Schwellenwerte (Thresholds) erneut feinjustiert, um die Balance zwischen korrekten und falschen Treffern zu optimieren.
# - Logik des Domain-Gates beibehalten und sichergestellt, dass es korrekt greift.
# - Golden-Rule und Interaktiver Modus unverändert.
import os
import sys
import re
import argparse
import json
import logging
import pandas as pd
import math
from datetime import datetime
from collections import Counter
from thefuzz import fuzz
from helpers import normalize_company_name, simple_normalize_url, serp_website_lookup
from config import Config
from google_sheet_handler import GoogleSheetHandler
STATUS_DIR = "job_status"
def update_status(job_id, status, progress_message):
if not job_id: return
status_file = os.path.join(STATUS_DIR, f"{job_id}.json")
try:
try:
with open(status_file, 'r') as f:
data = json.load(f)
except FileNotFoundError:
data = {}
data.update({"status": status, "progress": progress_message})
with open(status_file, 'w') as f:
json.dump(data, f)
except Exception as e:
logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}")
# --- Konfiguration ---
CRM_SHEET_NAME = "CRM_Accounts"
MATCHING_SHEET_NAME = "Matching_Accounts"
LOG_DIR = "Log"
now = datetime.now().strftime('%Y-%m-%d_%H-%M')
LOG_FILE = f"{now}_duplicate_check_v3.2.txt"
# --- NEU: Angepasste Scoring-Konfiguration v3.2 ---
SCORE_THRESHOLD = 90 # Standard-Schwelle leicht angehoben
SCORE_THRESHOLD_WEAK= 120 # Schwelle für Matches ohne Domain oder Ort angepasst
GOLDEN_MATCH_RATIO = 95
GOLDEN_MATCH_SCORE = 300
MIN_NAME_SCORE_FOR_DOMAIN = 2.5 # Mindest-Namensscore, damit ein Domain-Match voll zählt
# Interaktiver Modus Konfiguration
INTERACTIVE_SCORE_MIN = 90
INTERACTIVE_SCORE_DIFF = 20
# Prefilter-Konfiguration
PREFILTER_MIN_PARTIAL = 70
PREFILTER_LIMIT = 30
# --- Logging Setup ---
# ... (Keine Änderungen hier)
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR, exist_ok=True)
log_path = os.path.join(LOG_DIR, LOG_FILE)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
for h in list(root.handlers):
root.removeHandler(h)
formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s")
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
root.addHandler(ch)
fh = logging.FileHandler(log_path, mode='a', encoding='utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
root.addHandler(fh)
logger = logging.getLogger(__name__)
logger.info(f"Logging to console and file: {log_path}")
logger.info(f"Starting duplicate_checker.py v3.2 | Build: {now}")
# --- SerpAPI Key laden ---
# ... (Keine Änderungen hier)
try:
Config.load_api_keys()
serp_key = Config.API_KEYS.get('serpapi')
if not serp_key:
logger.warning("SerpAPI Key nicht gefunden; Serp-Fallback deaktiviert.")
except Exception as e:
logger.warning(f"Fehler beim Laden API-Keys: {e}")
serp_key = None
# --- Stop-/City-Tokens ---
STOP_TOKENS_BASE = {
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv',
'holding','gruppe','group','international','solutions','solution','service','services',
'deutschland','austria','germany','technik','technology','technologies','systems','systeme',
'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel',
'international','company','gesellschaft','mbh&co','mbhco','werke','werk'
}
CITY_TOKENS = set()
# --- Utilities ---
def _tokenize(s: str):
if not s: return []
return re.split(r"[^a-z0-9äöüß]+", str(s).lower())
def clean_name_for_scoring(norm_name: str):
if not norm_name: return "", set()
tokens = [t for t in _tokenize(norm_name) if len(t) >= 2] # auch 2-Buchstaben-Tokens zulassen
stop_union = STOP_TOKENS_BASE | CITY_TOKENS
final_tokens = [t for t in tokens if t not in stop_union]
return " ".join(final_tokens), set(final_tokens)
# --- TF-IDF Logik ---
def build_term_weights(crm_df: pd.DataFrame):
logger.info("Starte Berechnung der Wortgewichte (TF-IDF)...")
token_counts = Counter()
total_docs = len(crm_df)
for name in crm_df['normalized_name']:
_, tokens = clean_name_for_scoring(name)
for token in set(tokens):
token_counts[token] += 1
term_weights = {}
for token, count in token_counts.items():
idf = math.log(total_docs / (count + 1))
term_weights[token] = idf
logger.info(f"Wortgewichte für {len(term_weights)} Tokens berechnet.")
return term_weights
# --- Similarity v3.2 ---
def calculate_similarity(mrec: dict, crec: dict, term_weights: dict):
n1_raw = mrec.get('normalized_name', '')
n2_raw = crec.get('normalized_name', '')
if fuzz.ratio(n1_raw, n2_raw) >= GOLDEN_MATCH_RATIO:
return GOLDEN_MATCH_SCORE, {'reason': f'Golden Match (Name Ratio >= {GOLDEN_MATCH_RATIO}%)', 'name_score': 100}
dom1 = mrec.get('normalized_domain','')
dom2 = crec.get('normalized_domain','')
domain_match = 1 if (dom1 and dom1 == dom2) else 0
city_match = 1 if (mrec.get('CRM Ort') and crec.get('CRM Ort') and mrec.get('CRM Ort') == crec.get('CRM Ort')) else 0
country_match = 1 if (mrec.get('CRM Land') and crec.get('CRM Land') and mrec.get('CRM Land') == crec.get('CRM Land')) else 0
clean1, toks1 = clean_name_for_scoring(n1_raw)
clean2, toks2 = clean_name_for_scoring(n2_raw)
# --- ÄNDERUNG v3.2: Gewichteter Token Set Score ---
# Belohnt Übereinstimmung, bestraft aber auch fehlende wichtige Wörter
name_score = 0
overlapping_tokens = toks1 & toks2
if overlapping_tokens:
sum_overlap = sum(term_weights.get(token, 0) for token in overlapping_tokens)
sum_toks1 = sum(term_weights.get(token, 0) for token in toks1)
sum_toks2 = sum(term_weights.get(token, 0) for token in toks2)
if (sum_toks1 + sum_toks2) > 0:
# Dice-Koeffizient auf Basis der Gewichte
name_score = (2 * sum_overlap) / (sum_toks1 + sum_toks2) * 100
# Domain-Gate
score_domain = 0
# Name Score für Domain Gate wird jetzt direkt aus der Ratio berechnet, nicht aus dem gewichteten Score
if domain_match:
if fuzz.token_set_ratio(clean1, clean2) > 60 or (city_match and country_match):
score_domain = 60 # Starker Bonus
else:
score_domain = 15 # Schwacher Bonus
score_location = 25 if (city_match and country_match) else 0
# --- ÄNDERUNG v3.2: Finale Score-Kalibrierung ---
total = name_score * 1.2 + score_domain + score_location
penalties = 0
if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match:
penalties += 40
if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match:
penalties += 30
total -= penalties
comp = {
'name_score': round(name_score,1),
'domain_match': domain_match,
'city_match': city_match,
'country_match': country_match,
'penalties': penalties,
'overlapping_tokens': list(overlapping_tokens)
}
return max(0, round(total)), comp
# --- Indexe & Hauptfunktion ---
# (Die folgenden Funktionen bleiben strukturell gleich, aber rufen jetzt die angepassten Helper auf)
def build_indexes(crm_df: pd.DataFrame):
records = list(crm_df.to_dict('records'))
domain_index = {}
for r in records:
d = r.get('normalized_domain')
if d: domain_index.setdefault(d, []).append(r)
token_index = {}
for idx, r in enumerate(records):
_, toks = clean_name_for_scoring(r.get('normalized_name',''))
for t in set(toks):
token_index.setdefault(t, []).append(idx)
return records, domain_index, token_index
def choose_rarest_token(norm_name: str, term_weights: dict):
_, toks = clean_name_for_scoring(norm_name)
if not toks: return None
rarest = max(toks, key=lambda t: term_weights.get(t, 0))
return rarest if term_weights.get(rarest, 0) > 0 else None
def main(job_id=None, interactive=False):
logger.info("Starte Duplikats-Check v3.2 (Final Recalibration)")
# ...
# (Code für Initialisierung und Datenladen bleibt identisch zu v3.1)
# ...
update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...")
try:
sheet = GoogleSheetHandler()
logger.info("GoogleSheetHandler initialisiert")
except Exception as e:
logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}")
update_status(job_id, "Fehlgeschlagen", f"Init GoogleSheetHandler fehlgeschlagen: {e}")
sys.exit(1)
update_status(job_id, "Läuft", "Lade CRM- und Matching-Daten...")
crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME)
match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
total = len(match_df) if match_df is not None else 0
logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze")
if crm_df is None or crm_df.empty or match_df is None or match_df.empty:
logger.critical("Leere Daten in einem der Sheets. Abbruch.")
update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.")
return
update_status(job_id, "Läuft", "Normalisiere Daten...")
crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name)
crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url)
crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip()
crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip()
match_df['normalized_name'] = match_df['CRM Name'].astype(str).apply(normalize_company_name)
match_df['normalized_domain'] = match_df['CRM Website'].astype(str).apply(simple_normalize_url)
match_df['CRM Ort'] = match_df['CRM Ort'].astype(str).str.lower().str.strip()
match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip()
def build_city_tokens(crm_df, match_df):
cities = set()
for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique():
for t in _tokenize(s):
if len(t) >= 3: cities.add(t)
return cities
global CITY_TOKENS
CITY_TOKENS = build_city_tokens(crm_df, match_df)
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
term_weights = build_term_weights(crm_df)
crm_records, domain_index, token_index = build_indexes(crm_df)
logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}")
results = []
logger.info("Starte Matching-Prozess…")
for idx, mrow in match_df.to_dict('index').items():
processed = idx + 1
progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'"
logger.info(progress_message)
if processed % 5 == 0 or processed == total:
update_status(job_id, "Läuft", progress_message)
candidate_indices = set()
used_block = ''
# ... (Kandidatensuche bleibt gleich) ...
if mrow.get('normalized_domain'):
candidates_from_domain = domain_index.get(mrow['normalized_domain'], [])
for c in candidates_from_domain:
try:
indices = crm_df.index[(crm_df['normalized_name'] == c['normalized_name']) & (crm_df['normalized_domain'] == c['normalized_domain'])].tolist()
if indices:
candidate_indices.add(indices[0])
except Exception:
continue
if candidate_indices: used_block = f"domain:{mrow['normalized_domain']}"
if not candidate_indices:
rtok = choose_rarest_token(mrow.get('normalized_name',''), term_weights)
if rtok:
indices_from_token = token_index.get(rtok, [])
candidate_indices.update(indices_from_token)
used_block = f"token:{rtok}"
if not candidate_indices:
pf = []
n1 = mrow.get('normalized_name','')
clean1, _ = clean_name_for_scoring(n1)
if clean1:
for i, r in enumerate(crm_records):
n2 = r.get('normalized_name','')
clean2, _ = clean_name_for_scoring(n2)
if not clean2: continue
pr = fuzz.partial_ratio(clean1, clean2)
if pr >= PREFILTER_MIN_PARTIAL:
pf.append((pr, i))
pf.sort(key=lambda x: x[0], reverse=True)
candidate_indices.update([i for _, i in pf[:PREFILTER_LIMIT]])
used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}"
candidates = [crm_records[i] for i in candidate_indices]
logger.info(f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}' -> {len(candidates)} Kandidaten (Block={used_block})")
if not candidates:
results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'})
continue
scored = []
for cr in candidates:
score, comp = calculate_similarity(mrow, cr, term_weights)
scored.append({'name': cr.get('CRM Name',''), 'score': score, 'comp': comp, 'record': cr})
scored.sort(key=lambda x: x['score'], reverse=True)
for cand in scored[:5]:
logger.debug(f" Kandidat: {cand['name']} | Score={cand['score']} | Comp={cand['comp']}")
best_match = scored[0] if scored else None
# Interaktiver Modus
if interactive and best_match and len(scored) > 1:
best_score = best_match['score']
second_best_score = scored[1]['score']
if best_score > INTERACTIVE_SCORE_MIN and (best_score - second_best_score) < INTERACTIVE_SCORE_DIFF and best_score < GOLDEN_MATCH_SCORE:
# ... (Interaktive Logik bleibt gleich) ...
print("\n" + "="*50)
print(f"AMBIGUOUS MATCH for '{mrow['CRM Name']}'")
print(f"Top candidates have very similar scores.")
print(f" - Match: '{mrow['CRM Name']}' | {mrow['normalized_domain']} | {mrow['CRM Ort']}, {mrow['CRM Land']}")
print("-"*50)
for i, item in enumerate(scored[:5]):
cr = item['record']
print(f"[{i+1}] Candidate: '{cr['CRM Name']}' | {cr['normalized_domain']} | {cr['CRM Ort']}, {cr['CRM Land']}")
print(f" Score: {item['score']} | Details: {item['comp']}")
print("[0] No match")
choice = -1
while choice < 0 or choice > len(scored[:5]):
try:
choice = int(input(f"Please select the best match (1-{len(scored[:5])}) or 0 for no match: "))
except ValueError:
choice = -1
if choice > 0:
best_match = scored[choice-1]
logger.info(f"User selected candidate {choice}: '{best_match['name']}'")
elif choice == 0:
best_match = None
logger.info("User selected no match.")
print("="*50 + "\n")
if best_match and best_match['score'] >= SCORE_THRESHOLD:
is_weak = best_match['comp'].get('domain_match', 0) == 0 and not (best_match['comp'].get('city_match', 0) and best_match['comp'].get('country_match', 0))
applied_threshold = SCORE_THRESHOLD_WEAK if is_weak else SCORE_THRESHOLD
if best_match['score'] >= applied_threshold:
results.append({'Match': best_match['name'], 'Score': best_match['score'], 'Match_Grund': str(best_match['comp'])})
logger.info(f" --> Match: '{best_match['name']}' ({best_match['score']}) | TH={applied_threshold}{' (weak)' if is_weak else ''}")
else:
results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below WEAK threshold | {str(best_match['comp'])}"})
logger.info(f" --> No Match (below weak TH): '{best_match['name']}' ({best_match['score']}) | TH={applied_threshold}")
elif best_match:
results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below threshold | {str(best_match['comp'])}"})
logger.info(f" --> No Match (below TH): '{best_match['name']}' ({best_match['score']})")
else:
results.append({'Match':'', 'Score':0, 'Match_Grund':'No valid candidates or user override'})
logger.info(f" --> No Match (no candidates)")
# --- Ergebnisse zurückschreiben (Logik unverändert) ---
logger.info("Matching-Prozess abgeschlossen. Bereite Ergebnisse für den Upload vor...")
# ... (Rest des Codes bleibt identisch) ...
update_status(job_id, "Läuft", "Schreibe Ergebnisse zurück ins Sheet...")
result_df = pd.DataFrame(results)
cols_to_drop_from_match = ['Match', 'Score', 'Match_Grund']
match_df_clean = match_df.drop(columns=[col for col in cols_to_drop_from_match if col in match_df.columns], errors='ignore')
final_df = pd.concat([match_df_clean.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1)
cols_to_drop = ['normalized_name', 'normalized_domain']
final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore')
upload_df = final_df.astype(str).replace({'nan': '', 'None': ''})
data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist()
logger.info(f"Versuche, {len(data_to_write) - 1} Ergebniszeilen in das Sheet '{MATCHING_SHEET_NAME}' zu schreiben...")
ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write)
if ok:
logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.")
update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.")
else:
logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.")
update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.")
if __name__=='__main__':
parser = argparse.ArgumentParser(description="Duplicate Checker v3.2")
parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.")
parser.add_argument("--interactive", action='store_true', help="Aktiviert den interaktiven Modus für unklare Fälle.")
args = parser.parse_args()
Config.load_api_keys()
main(job_id=args.job_id, interactive=args.interactive)