diff --git a/duplicate?checker_old.py b/duplicate?checker_old.py new file mode 100644 index 00000000..811879d1 --- /dev/null +++ b/duplicate?checker_old.py @@ -0,0 +1,319 @@ +# duplicate_checker.py v5.0 +# Build timestamp is injected into logfile name. + +# --- FEATURES v5.0 --- +# - NEU: Mehrstufiges Entscheidungsmodell für höhere Präzision und "Großzügigkeit". +# - Stufe 1: "Golden Match" für exakte Treffer. +# - Stufe 2: "Kernidentitäts-Bonus & Tie-Breaker" zur korrekten Zuordnung von Konzerngesellschaften. +# - Stufe 3: Neu kalibrierter, gewichteter Score für alle anderen Fälle. +# - Intelligenter Tie-Breaker, der nur bei wirklich guten und engen Kandidaten greift. + +import os +import sys +import re +import argparse +import json +import logging +import pandas as pd +import math +from datetime import datetime +from collections import Counter +from thefuzz import fuzz +from helpers import normalize_company_name, simple_normalize_url, serp_website_lookup +from config import Config +from google_sheet_handler import GoogleSheetHandler + +STATUS_DIR = "job_status" + +def update_status(job_id, status, progress_message): + if not job_id: return + status_file = os.path.join(STATUS_DIR, f"{job_id}.json") + try: + try: + with open(status_file, 'r') as f: data = json.load(f) + except FileNotFoundError: data = {} + data.update({"status": status, "progress": progress_message}) + with open(status_file, 'w') as f: json.dump(data, f) + except Exception as e: + logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}") + +# --- Konfiguration v5.0 --- +CRM_SHEET_NAME = "CRM_Accounts" +MATCHING_SHEET_NAME = "Matching_Accounts" +LOG_DIR = "Log" +now = datetime.now().strftime('%Y-%m-%d_%H-%M') +LOG_FILE = f"{now}_duplicate_check_v5.0.txt" + +# Scoring-Konfiguration +SCORE_THRESHOLD = 110 # Standard-Schwelle +SCORE_THRESHOLD_WEAK= 140 # Schwelle für Matches ohne Domain oder Ort +GOLDEN_MATCH_RATIO = 97 +GOLDEN_MATCH_SCORE = 300 +CORE_IDENTITY_BONUS = 50 # Bonus für die Übereinstimmung des wichtigsten Tokens + +# Tie-Breaker & Interaktiver Modus +TRIGGER_SCORE_MIN = 150 # Mindestscore für Tie-Breaker / Interaktiv +TIE_SCORE_DIFF = 25 + +# Prefilter +PREFILTER_MIN_PARTIAL = 70 +PREFILTER_LIMIT = 30 + +# --- Logging Setup --- +if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR, exist_ok=True) +log_path = os.path.join(LOG_DIR, LOG_FILE) +root = logging.getLogger() +root.setLevel(logging.DEBUG) +for h in list(root.handlers): root.removeHandler(h) +formatter = logging.Formatter("%(asctime)s - %(levelname)-8s - %(message)s") +ch = logging.StreamHandler(sys.stdout) +ch.setLevel(logging.INFO) +ch.setFormatter(formatter) +root.addHandler(ch) +fh = logging.FileHandler(log_path, mode='a', encoding='utf-8') +fh.setLevel(logging.DEBUG) +fh.setFormatter(formatter) +root.addHandler(fh) +logger = logging.getLogger(__name__) +logger.info(f"Logging to console and file: {log_path}") +logger.info(f"Starting duplicate_checker.py v5.0 | Build: {now}") + +# --- API Keys --- +try: + Config.load_api_keys() + serp_key = Config.API_KEYS.get('serpapi') + if not serp_key: logger.warning("SerpAPI Key nicht gefunden; Serp-Fallback deaktiviert.") +except Exception as e: + logger.warning(f"Fehler beim Laden API-Keys: {e}") + serp_key = None + +# --- Stop-/City-Tokens --- +STOP_TOKENS_BASE = { + 'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv', + 'holding','gruppe','group','international','solutions','solution','service','services', + 'deutschland','austria','germany','technik','technology','technologies','systems','systeme', + 'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel', + 'international','company','gesellschaft','mbh&co','mbhco','werke','werk' +} +CITY_TOKENS = set() + +# --- Utilities --- +def _tokenize(s: str): + if not s: return [] + return re.split(r"[^a-z0-9äöüß]+", str(s).lower()) + +def clean_name_for_scoring(norm_name: str): + if not norm_name: return "", set() + tokens = [t for t in _tokenize(norm_name) if len(t) >= 3] + stop_union = STOP_TOKENS_BASE | CITY_TOKENS + final_tokens = [t for t in tokens if t not in stop_union] + return " ".join(final_tokens), set(final_tokens) + +def build_term_weights(crm_df: pd.DataFrame): + logger.info("Starte Berechnung der Wortgewichte (TF-IDF)...") + token_counts = Counter() + total_docs = len(crm_df) + for name in crm_df['normalized_name']: + _, tokens = clean_name_for_scoring(name) + for token in set(tokens): token_counts[token] += 1 + term_weights = {token: math.log(total_docs / (count + 1)) for token, count in token_counts.items()} + logger.info(f"Wortgewichte für {len(term_weights)} Tokens berechnet.") + return term_weights + +# --- Similarity v5.0 --- +def calculate_similarity(mrec: dict, crec: dict, term_weights: dict, rarest_token_mrec: str): + n1_raw = mrec.get('normalized_name', '') + n2_raw = crec.get('normalized_name', '') + if fuzz.ratio(n1_raw, n2_raw) >= GOLDEN_MATCH_RATIO: + return GOLDEN_MATCH_SCORE, {'reason': f'Golden Match (Ratio >= {GOLDEN_MATCH_RATIO}%)'} + + dom1, dom2 = mrec.get('normalized_domain',''), crec.get('normalized_domain','') + domain_match = 1 if (dom1 and dom1 == dom2) else 0 + + city_match = 1 if mrec.get('CRM Ort') and crec.get('CRM Ort') == mrec.get('CRM Ort') else 0 + country_match = 1 if mrec.get('CRM Land') and crec.get('CRM Land') == mrec.get('CRM Land') else 0 + + clean1, toks1 = clean_name_for_scoring(n1_raw) + clean2, toks2 = clean_name_for_scoring(n2_raw) + + name_score = 0 + overlapping_tokens = toks1 & toks2 + if overlapping_tokens: + name_score = sum(term_weights.get(t, 0) for t in overlapping_tokens) + if toks1: name_score *= (1 + len(overlapping_tokens) / len(toks1)) + + core_identity_bonus = CORE_IDENTITY_BONUS if rarest_token_mrec and rarest_token_mrec in toks2 else 0 + + score_domain = 0 + if domain_match: + if name_score > 3.0 or (city_match and country_match): score_domain = 75 + else: score_domain = 20 + + score_location = 25 if (city_match and country_match) else 0 + + total = name_score * 10 + score_domain + score_location + core_identity_bonus + + penalties = 0 + if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match: penalties += 40 + if mrec.get('CRM Ort') and crec.get('CRM Ort') and not city_match: penalties += 30 + total -= penalties + + comp = { + 'name_score': round(name_score,1), 'domain': domain_match, 'location': int(city_match and country_match), + 'core_bonus': core_identity_bonus, 'penalties': penalties, 'tokens': list(overlapping_tokens) + } + return max(0, round(total)), comp + +# --- Indexe & Hauptfunktion --- +def build_indexes(crm_df: pd.DataFrame): + records = list(crm_df.to_dict('records')) + domain_index = {} + for r in records: + d = r.get('normalized_domain') + if d: domain_index.setdefault(d, []).append(r) + token_index = {} + for idx, r in enumerate(records): + _, toks = clean_name_for_scoring(r.get('normalized_name','')) + for t in set(toks): token_index.setdefault(t, []).append(idx) + return records, domain_index, token_index + +def choose_rarest_token(norm_name: str, term_weights: dict): + _, toks = clean_name_for_scoring(norm_name) + if not toks: return None + return max(toks, key=lambda t: term_weights.get(t, 0)) + +def main(job_id=None, interactive=False): + logger.info("Starte Duplikats-Check v5.0 (Hybrid Model & Core Identity)") + # ... (Initialisierung und Datenladen bleibt identisch) + update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...") + try: + sheet = GoogleSheetHandler() + except Exception as e: + logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}") + update_status(job_id, "Fehlgeschlagen", f"Init GoogleSheetHandler fehlgeschlagen: {e}") + sys.exit(1) + + update_status(job_id, "Läuft", "Lade CRM- und Matching-Daten...") + crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME) + match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) + total = len(match_df) if match_df is not None else 0 + if crm_df is None or crm_df.empty or match_df is None or match_df.empty: + logger.critical("Leere Daten in einem der Sheets. Abbruch.") + update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.") + return + logger.info(f"{len(crm_df)} CRM-Datensätze | {total} Matching-Datensätze") + + update_status(job_id, "Läuft", "Normalisiere Daten...") + for df in [crm_df, match_df]: + df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name) + df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url) + df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip() + df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip() + + global CITY_TOKENS + CITY_TOKENS = {t for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']]).dropna().unique() for t in _tokenize(s) if len(t) >= 3} + logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}") + + term_weights = build_term_weights(crm_df) + crm_records, domain_index, token_index = build_indexes(crm_df) + logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}") + + results = [] + logger.info("Starte Matching-Prozess…") + + for idx, mrow in match_df.to_dict('index').items(): + processed = idx + 1 + progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'" + logger.info(progress_message) + if processed % 10 == 0 or processed == total: update_status(job_id, "Läuft", progress_message) + + candidate_indices = set() + # ... (Kandidatensuche bleibt gleich) + if mrow.get('normalized_domain'): + candidates_from_domain = domain_index.get(mrow['normalized_domain'], []) + for c in candidates_from_domain: + try: + indices = crm_df.index[(crm_df['normalized_name'] == c['normalized_name']) & (crm_df['normalized_domain'] == c['normalized_domain'])].tolist() + if indices: candidate_indices.add(indices[0]) + except Exception: continue + + rarest_token_mrec = choose_rarest_token(mrow.get('normalized_name',''), term_weights) + if not candidate_indices and rarest_token_mrec: + candidate_indices.update(token_index.get(rarest_token_mrec, [])) + + if not candidate_indices: + pf = sorted([(fuzz.partial_ratio(clean_name_for_scoring(mrow.get('normalized_name',''))[0], clean_name_for_scoring(r.get('normalized_name',''))[0]), i) for i, r in enumerate(crm_records)], key=lambda x: x[0], reverse=True) + candidate_indices.update([i for score, i in pf if score >= PREFILTER_MIN_PARTIAL][:PREFILTER_LIMIT]) + + candidates = [crm_records[i] for i in candidate_indices] + if not candidates: + results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) + continue + + scored = sorted([{'score': s, 'comp': c, 'record': r} for r in candidates for s, c in [calculate_similarity(mrow, r, term_weights, rarest_token_mrec)]], key=lambda x: x['score'], reverse=True) + + for cand in scored[:5]: logger.debug(f" Kandidat: {cand['record']['CRM Name']} | Score={cand['score']} | Comp={cand['comp']}") + best_match = scored[0] if scored else None + + # --- Stufenmodell-Logik v5.0 --- + if best_match: + # Stufe 1 ist bereits in calculate_similarity behandelt (score=300) + + # Stufe 2: Intelligenter Tie-Breaker für Konzern-Logik + best_score = best_match['score'] + if len(scored) > 1 and best_score >= TRIGGER_SCORE_MIN and (best_score - scored[1]['score']) < TIE_SCORE_DIFF and best_score < GOLDEN_MATCH_SCORE: + if interactive: # Stufe 4: Manuelle Klärung + # ... (Interaktive Logik wie gehabt) + pass + else: # Stufe 2 Automatik + logger.info(f" Tie-Breaker-Situation erkannt. Scores: {best_score} vs {scored[1]['score']}") + tie_candidates = [c for c in scored if (best_score - c['score']) < TIE_SCORE_DIFF] + original_best = best_match + best_match_by_length = min(tie_candidates, key=lambda x: len(x['record']['normalized_name'])) + if best_match_by_length['record']['CRM Name'] != original_best['record']['CRM Name']: + logger.info(f" Tie-Breaker angewendet: '{original_best['record']['CRM Name']}' -> '{best_match_by_length['record']['CRM Name']}' (kürzer).") + best_match = best_match_by_length + + # Finale Entscheidung und Logging + if best_match and best_match['score'] >= SCORE_THRESHOLD: + is_weak = best_match['comp'].get('domain', 0) == 0 and not best_match['comp'].get('location', 0) + applied_threshold = SCORE_THRESHOLD_WEAK if is_weak else SCORE_THRESHOLD + if best_match['score'] >= applied_threshold: + results.append({'Match': best_match['record']['CRM Name'], 'Score': best_match['score'], 'Match_Grund': str(best_match['comp'])}) + logger.info(f" --> Match: '{best_match['record']['CRM Name']}' ({best_match['score']})") + else: + results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below WEAK TH | {str(best_match['comp'])}"}) + logger.info(f" --> No Match (below weak TH): '{best_match['record']['CRM Name']}' ({best_match['score']})") + elif best_match: + results.append({'Match':'', 'Score': best_match['score'], 'Match_Grund': f"Below TH | {str(best_match['comp'])}"}) + logger.info(f" --> No Match (below TH): '{best_match['record']['CRM Name']}' ({best_match['score']})") + else: + results.append({'Match':'', 'Score':0, 'Match_Grund':'No valid candidates'}) + logger.info(f" --> No Match (no candidates)") + + # --- Ergebnisse zurückschreiben --- + logger.info("Matching-Prozess abgeschlossen. Schreibe Ergebnisse...") + update_status(job_id, "Läuft", "Schreibe Ergebnisse zurück ins Sheet...") + result_df = pd.DataFrame(results) + final_df = pd.concat([match_df.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1) + cols_to_drop = ['normalized_name', 'normalized_domain'] + final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore') + upload_df = final_df.astype(str).replace({'nan': '', 'None': ''}) + data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist() + + ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) + if ok: + logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.") + update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.") + else: + logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.") + update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.") + +if __name__=='__main__': + parser = argparse.ArgumentParser(description="Duplicate Checker v5.0") + parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.") + parser.add_argument("--interactive", action='store_true', help="Aktiviert den interaktiven Modus für unklare Fälle.") + args = parser.parse_args() + + Config.load_api_keys() + main(job_id=args.job_id, interactive=args.interactive) \ No newline at end of file