duplicate_checker.py aktualisiert

This commit is contained in:
2025-08-18 08:39:27 +00:00
parent ae78da320a
commit 9e74e6fa5f

View File

@@ -1,6 +1,8 @@
import os import os
import sys import sys
import re import re
import argparse
import json
import logging import logging
import pandas as pd import pandas as pd
from datetime import datetime from datetime import datetime
@@ -10,6 +12,19 @@ from helpers import normalize_company_name, simple_normalize_url, serp_website_l
from config import Config from config import Config
from google_sheet_handler import GoogleSheetHandler from google_sheet_handler import GoogleSheetHandler
STATUS_DIR = "job_status"
def update_status(job_id, status, progress_message):
"""Schreibt den aktuellen Status in eine JSON-Datei."""
if not job_id: return
status_file = os.path.join(STATUS_DIR, f"{job_id}.json")
try:
with open(status_file, 'w') as f:
json.dump({"status": status, "progress": progress_message}, f)
except Exception as e:
# Logge den Fehler, aber lasse das Hauptskript nicht abstürzen
logging.error(f"Konnte Statusdatei für Job {job_id} nicht schreiben: {e}")
# duplicate_checker.py v2.15 # duplicate_checker.py v2.15
# Quality-first ++: Domain-Gate, Location-Penalties, Smart Blocking (IDF-light), # Quality-first ++: Domain-Gate, Location-Penalties, Smart Blocking (IDF-light),
# Serp-Trust, Weak-Threshold, City-Bias-Guard, Prefilter tightened, Metrics # Serp-Trust, Weak-Threshold, City-Bias-Guard, Prefilter tightened, Metrics
@@ -209,25 +224,30 @@ def choose_rarest_token(norm_name: str, token_freq: Counter):
return lst[0] if lst else None return lst[0] if lst else None
# --- Hauptfunktion --- # --- Hauptfunktion ---
def main(): def main(job_id=None):
logger.info("Starte Duplikats-Check v2.15 (Quality-first++)") logger.info("Starte Duplikats-Check v2.15 (Quality-first++)")
update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...")
try: try:
sheet = GoogleSheetHandler() sheet = GoogleSheetHandler()
logger.info("GoogleSheetHandler initialisiert") logger.info("GoogleSheetHandler initialisiert")
except Exception as e: except Exception as e:
logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}") logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}")
update_status(job_id, "Fehlgeschlagen", f"Init GoogleSheetHandler fehlgeschlagen: {e}")
sys.exit(1) sys.exit(1)
# Daten laden # Daten laden
update_status(job_id, "Läuft", "Lade CRM- und Matching-Daten...")
crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME) crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME)
match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME) match_df = sheet.get_sheet_as_dataframe(MATCHING_SHEET_NAME)
logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {0 if match_df is None else len(match_df)} Matching-Datensätze") logger.info(f"{0 if crm_df is None else len(crm_df)} CRM-Datensätze | {0 if match_df is None else len(match_df)} Matching-Datensätze")
if crm_df is None or crm_df.empty or match_df is None or match_df.empty: if crm_df is None or crm_df.empty or match_df is None or match_df.empty:
logger.critical("Leere Daten in einem der Sheets. Abbruch.") logger.critical("Leere Daten in einem der Sheets. Abbruch.")
update_status(job_id, "Fehlgeschlagen", "Leere Daten in einem der Sheets.")
return return
# SerpAPI nur für Matching (B und E leer) # SerpAPI nur für Matching (B und E leer)
if serp_key: # Annahme: serp_key wird global geladen, z.B. durch Config.load_api_keys()
if Config.API_KEYS.get('serpapi'):
if 'Gefundene Website' not in match_df.columns: if 'Gefundene Website' not in match_df.columns:
match_df['Gefundene Website'] = '' match_df['Gefundene Website'] = ''
b_empty = match_df['CRM Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na']) b_empty = match_df['CRM Website'].fillna('').astype(str).str.strip().str.lower().isin(['','k.a.','k.a','n/a','na'])
@@ -235,6 +255,7 @@ def main():
empty_mask = b_empty & e_empty empty_mask = b_empty & e_empty
empty_count = int(empty_mask.sum()) empty_count = int(empty_mask.sum())
if empty_count > 0: if empty_count > 0:
update_status(job_id, "Läuft", f"Suche Websites für {empty_count} Firmen via SerpAPI...")
logger.info(f"Serp-Fallback für Matching: {empty_count} Firmen ohne URL in B/E") logger.info(f"Serp-Fallback für Matching: {empty_count} Firmen ohne URL in B/E")
found_cnt = 0 found_cnt = 0
trust_stats = Counter() trust_stats = Counter()
@@ -260,14 +281,16 @@ def main():
logger.info("Serp-Fallback übersprungen: B oder E bereits befüllt (keine fehlenden Matching-URLs)") logger.info("Serp-Fallback übersprungen: B oder E bereits befüllt (keine fehlenden Matching-URLs)")
# Normalisierung CRM # Normalisierung CRM
update_status(job_id, "Läuft", "Normalisiere CRM-Daten...")
crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name) crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name)
crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url) crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url)
crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip() crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip()
crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip() crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip()
crm_df['block_key'] = crm_df['normalized_name'].apply(lambda x: x.split()[0] if x else None) crm_df['block_key'] = crm_df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
crm_df['domain_use_flag'] = 1 # CRM-Domain gilt als vertrauenswürdig crm_df['domain_use_flag'] = 1
# Normalisierung Matching # Normalisierung Matching
update_status(job_id, "Läuft", "Normalisiere Matching-Daten...")
match_df['Gefundene Website'] = match_df.get('Gefundene Website', pd.Series(index=match_df.index, dtype=object)) match_df['Gefundene Website'] = match_df.get('Gefundene Website', pd.Series(index=match_df.index, dtype=object))
match_df['Serp Vertrauen'] = match_df.get('Serp Vertrauen', pd.Series(index=match_df.index, dtype=object)) match_df['Serp Vertrauen'] = match_df.get('Serp Vertrauen', pd.Series(index=match_df.index, dtype=object))
match_df['Effektive Website'] = match_df['CRM Website'].fillna('').astype(str).str.strip() match_df['Effektive Website'] = match_df['CRM Website'].fillna('').astype(str).str.strip()
@@ -280,27 +303,25 @@ def main():
match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip() match_df['CRM Land'] = match_df['CRM Land'].astype(str).str.lower().str.strip()
match_df['block_key'] = match_df['normalized_name'].apply(lambda x: x.split()[0] if x else None) match_df['block_key'] = match_df['normalized_name'].apply(lambda x: x.split()[0] if x else None)
# Domain-Vertrauen/Use-Flag
def _domain_use(row): def _domain_use(row):
if str(row.get('CRM Website','')).strip(): if str(row.get('CRM Website','')).strip(): return 1
return 1
trust = str(row.get('Serp Vertrauen','')).lower() trust = str(row.get('Serp Vertrauen','')).lower()
return 1 if trust == 'hoch' else 0 return 1 if trust == 'hoch' else 0
match_df['domain_use_flag'] = match_df.apply(_domain_use, axis=1) match_df['domain_use_flag'] = match_df.apply(_domain_use, axis=1)
# City-Tokens dynamisch bauen (nach Normalisierung von Ort) # City-Tokens dynamisch bauen
def build_city_tokens(crm_df, match_df): def build_city_tokens(crm_df, match_df):
cities = set() cities = set()
for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique(): for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique():
for t in _tokenize(s): for t in _tokenize(s):
if len(t) >= 3: if len(t) >= 3: cities.add(t)
cities.add(t)
return cities return cities
global CITY_TOKENS global CITY_TOKENS
CITY_TOKENS = build_city_tokens(crm_df, match_df) CITY_TOKENS = build_city_tokens(crm_df, match_df)
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}") logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
# Blocking-Indizes (nachdem CITY_TOKENS gesetzt wurde) # Blocking-Indizes
update_status(job_id, "Läuft", "Erstelle Blocking-Indizes...")
crm_records, domain_index, token_freq, token_index = build_indexes(crm_df) crm_records, domain_index, token_freq, token_index = build_indexes(crm_df)
logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}") logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}")
@@ -309,12 +330,14 @@ def main():
metrics = Counter() metrics = Counter()
total = len(match_df) total = len(match_df)
logger.info("Starte Matching-Prozess…") logger.info("Starte Matching-Prozess…")
processed = 0
for idx, mrow in match_df.to_dict('index').items(): for idx, mrow in match_df.to_dict('index').items():
processed += 1 processed = idx + 1 # Annahme, dass der Index 0-basiert ist
name_disp = mrow.get('CRM Name','') progress_message = f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}'"
# Kandidatenwahl logger.info(progress_message)
if processed % 5 == 0: # Status alle 5 Zeilen aktualisieren
update_status(job_id, "Läuft", progress_message)
candidates = [] candidates = []
used_block = '' used_block = ''
if mrow.get('normalized_domain') and mrow.get('domain_use_flag') == 1: if mrow.get('normalized_domain') and mrow.get('domain_use_flag') == 1:
@@ -326,7 +349,6 @@ def main():
candidates = token_index.get(rtok, []) candidates = token_index.get(rtok, [])
used_block = f"token:{rtok}" used_block = f"token:{rtok}"
if not candidates: if not candidates:
# Prefilter über gesamte CRM-Liste (strenger + limitierter; erfordert Rarest-Token-Overlap)
pf = [] pf = []
n1 = mrow.get('normalized_name','') n1 = mrow.get('normalized_name','')
rtok = choose_rarest_token(n1, token_freq) rtok = choose_rarest_token(n1, token_freq)
@@ -335,10 +357,8 @@ def main():
for r in crm_records: for r in crm_records:
n2 = r.get('normalized_name','') n2 = r.get('normalized_name','')
clean2, toks2 = clean_name_for_scoring(n2) clean2, toks2 = clean_name_for_scoring(n2)
if not clean2: if not clean2: continue
continue if rtok and rtok not in toks2: continue
if rtok and rtok not in toks2:
continue
pr = fuzz.partial_ratio(clean1, clean2) pr = fuzz.partial_ratio(clean1, clean2)
if pr >= PREFILTER_MIN_PARTIAL: if pr >= PREFILTER_MIN_PARTIAL:
pf.append((pr, r)) pf.append((pr, r))
@@ -346,7 +366,7 @@ def main():
candidates = [r for _, r in pf[:PREFILTER_LIMIT]] candidates = [r for _, r in pf[:PREFILTER_LIMIT]]
used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}" used_block = f"prefilter:{PREFILTER_MIN_PARTIAL}/{len(pf)}"
logger.info(f"Prüfe {processed}/{total}: '{name_disp}' -> {len(candidates)} Kandidaten (Block={used_block})") logger.info(f"Prüfe {processed}/{total}: '{mrow.get('CRM Name','')}' -> {len(candidates)} Kandidaten (Block={used_block})")
if not candidates: if not candidates:
results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'}) results.append({'Match':'', 'Score':0, 'Match_Grund':'keine Kandidaten'})
continue continue
@@ -357,13 +377,11 @@ def main():
scored.append((cr.get('CRM Name',''), score, comp)) scored.append((cr.get('CRM Name',''), score, comp))
scored.sort(key=lambda x: x[1], reverse=True) scored.sort(key=lambda x: x[1], reverse=True)
# Log Top5
for cand_name, sc, comp in scored[:5]: for cand_name, sc, comp in scored[:5]:
logger.debug(f" Kandidat: {cand_name} | Score={sc} | Comp={comp}") logger.debug(f" Kandidat: {cand_name} | Score={sc} | Comp={comp}")
best_name, best_score, best_comp = scored[0] best_name, best_score, best_comp = scored[0]
# Akzeptanzlogik (Weak-Threshold + Guard)
weak = (best_comp.get('domain_used') == 0 and not (best_comp.get('city_match') and best_comp.get('country_match'))) weak = (best_comp.get('domain_used') == 0 and not (best_comp.get('city_match') and best_comp.get('country_match')))
applied_threshold = SCORE_THRESHOLD_WEAK if weak else SCORE_THRESHOLD applied_threshold = SCORE_THRESHOLD_WEAK if weak else SCORE_THRESHOLD
weak_guard_fail = (weak and best_comp.get('rare_overlap') == 0) weak_guard_fail = (weak and best_comp.get('rare_overlap') == 0)
@@ -371,10 +389,8 @@ def main():
if not weak_guard_fail and best_score >= applied_threshold: if not weak_guard_fail and best_score >= applied_threshold:
results.append({'Match': best_name, 'Score': best_score, 'Match_Grund': str(best_comp)}) results.append({'Match': best_name, 'Score': best_score, 'Match_Grund': str(best_comp)})
metrics['matches_total'] += 1 metrics['matches_total'] += 1
if best_comp.get('domain_used') == 1: if best_comp.get('domain_used') == 1: metrics['matches_domain'] += 1
metrics['matches_domain'] += 1 if best_comp.get('city_match') and best_comp.get('country_match'): metrics['matches_with_loc'] += 1
if best_comp.get('city_match') and best_comp.get('country_match'):
metrics['matches_with_loc'] += 1
if best_comp.get('domain_used') == 0 and best_comp.get('name') >= 85 and not (best_comp.get('city_match') and best_comp.get('country_match')): if best_comp.get('domain_used') == 0 and best_comp.get('name') >= 85 and not (best_comp.get('city_match') and best_comp.get('country_match')):
metrics['matches_name_only'] += 1 metrics['matches_name_only'] += 1
logger.info(f" --> Match: '{best_name}' ({best_score}) {best_comp} | TH={applied_threshold}{' weak' if weak else ''}") logger.info(f" --> Match: '{best_name}' ({best_score}) {best_comp} | TH={applied_threshold}{' weak' if weak else ''}")
@@ -383,8 +399,8 @@ def main():
results.append({'Match':'', 'Score': best_score, 'Match_Grund': f"{best_comp} | {reason} TH={applied_threshold}"}) results.append({'Match':'', 'Score': best_score, 'Match_Grund': f"{best_comp} | {reason} TH={applied_threshold}"})
logger.info(f" --> Kein Match (Score={best_score}) {best_comp} | {reason} TH={applied_threshold}") logger.info(f" --> Kein Match (Score={best_score}) {best_comp} | {reason} TH={applied_threshold}")
# Ergebnisse zurückschreiben (SAFE) # Ergebnisse zurückschreiben
logger.info("Schreibe Ergebnisse ins Sheet (SAFE in-place, keine Spaltenverluste)…") update_status(job_id, "Läuft", "Schreibe Ergebnisse zurück ins Sheet...")
res_df = pd.DataFrame(results, index=match_df.index) res_df = pd.DataFrame(results, index=match_df.index)
write_df = match_df.copy() write_df = match_df.copy()
write_df['Match'] = res_df['Match'] write_df['Match'] = res_df['Match']
@@ -396,7 +412,8 @@ def main():
if c in write_df.columns: if c in write_df.columns:
write_df.drop(columns=[c], inplace=True) write_df.drop(columns=[c], inplace=True)
backup_path = os.path.join(LOG_DIR, f"{now}_backup_{MATCHING_SHEET_NAME}.csv") now = datetime.now().strftime("%Y-%m-%d_%H-%M")
backup_path = os.path.join(Config.LOG_DIR, f"{now}_backup_{MATCHING_SHEET_NAME}.csv")
try: try:
write_df.to_csv(backup_path, index=False, encoding='utf-8') write_df.to_csv(backup_path, index=False, encoding='utf-8')
logger.info(f"Lokales Backup geschrieben: {backup_path}") logger.info(f"Lokales Backup geschrieben: {backup_path}")
@@ -407,8 +424,10 @@ def main():
ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data) ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data)
if ok: if ok:
logger.info("Ergebnisse erfolgreich geschrieben") logger.info("Ergebnisse erfolgreich geschrieben")
update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.")
else: else:
logger.error("Fehler beim Schreiben ins Google Sheet") logger.error("Fehler beim Schreiben ins Google Sheet")
update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.")
# Summary # Summary
serp_counts = Counter((str(x).lower() for x in write_df.get('Serp Vertrauen', []))) serp_counts = Counter((str(x).lower() for x in write_df.get('Serp Vertrauen', [])))
@@ -418,4 +437,11 @@ def main():
logger.info(f"Config: TH={SCORE_THRESHOLD}, TH_WEAK={SCORE_THRESHOLD_WEAK}, MIN_NAME_FOR_DOMAIN={MIN_NAME_FOR_DOMAIN}, Penalties(city={CITY_MISMATCH_PENALTY},country={COUNTRY_MISMATCH_PENALTY}), Prefilter(partial>={PREFILTER_MIN_PARTIAL}, limit={PREFILTER_LIMIT})") logger.info(f"Config: TH={SCORE_THRESHOLD}, TH_WEAK={SCORE_THRESHOLD_WEAK}, MIN_NAME_FOR_DOMAIN={MIN_NAME_FOR_DOMAIN}, Penalties(city={CITY_MISMATCH_PENALTY},country={COUNTRY_MISMATCH_PENALTY}), Prefilter(partial>={PREFILTER_MIN_PARTIAL}, limit={PREFILTER_LIMIT})")
if __name__=='__main__': if __name__=='__main__':
main() # Hinzufügen des Argument-Parsers für die Job-ID
parser = argparse.ArgumentParser()
parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.")
# Fügen Sie hier weitere Argumente hinzu, die Ihr Skript benötigt
args = parser.parse_args()
# Rufen Sie die main-Funktion mit der Job-ID auf
main(job_id=args.job_id)