diff --git a/duplicate_checker.py b/duplicate_checker.py index c88a99dc..e278f23e 100644 --- a/duplicate_checker.py +++ b/duplicate_checker.py @@ -1,173 +1,117 @@ -# duplicate_checker.py (v2.0 + Transparenz) - -import logging +import re import pandas as pd -from thefuzz import fuzz -from config import Config -from helpers import normalize_company_name, simple_normalize_url, create_log_filename +import recordlinkage +from rapidfuzz import fuzz from google_sheet_handler import GoogleSheetHandler -import time # --- Konfiguration --- -CRM_SHEET_NAME = "CRM_Accounts" +CRM_SHEET_NAME = "CRM_Accounts" MATCHING_SHEET_NAME = "Matching_Accounts" -SCORE_THRESHOLD = 80 +SCORE_THRESHOLD = 0.8 +WEIGHTS = { + 'domain': 0.5, + 'name': 0.4, + 'city': 0.1, +} -# --- VOLLSTÄNDIGES LOGGING SETUP --- -LOG_LEVEL = logging.DEBUG if Config.DEBUG else logging.INFO -LOG_FORMAT = '%(asctime)s - %(levelname)-8s - %(name)s - %(message)s' - -root_logger = logging.getLogger() -root_logger.setLevel(LOG_LEVEL) - -# Handler nur hinzufügen, wenn noch keine konfiguriert sind, um Dopplung zu vermeiden -if not root_logger.handlers: - stream_handler = logging.StreamHandler() - stream_handler.setFormatter(logging.Formatter(LOG_FORMAT)) - root_logger.addHandler(stream_handler) - - log_file_path = create_log_filename("duplicate_check_v2_final") - if log_file_path: - file_handler = logging.FileHandler(log_file_path, mode='a', encoding='utf-8') - file_handler.setFormatter(logging.Formatter(LOG_FORMAT)) - root_logger.addHandler(file_handler) -else: - log_file_path = next((h.baseFilename for h in root_logger.handlers if isinstance(h, logging.FileHandler)), None) - -logger = logging.getLogger(__name__) - -def calculate_similarity_with_details(record1, record2): +# --- Hilfsfunktionen --- +def normalize_company_name(name: str) -> str: """ - Berechnet einen gewichteten Ähnlichkeits-Score und gibt den Score und den Grund zurück. - Basierend auf der v2.0 Scoring-Logik. + Vereinfachte Normalisierung von Firmennamen: + - Unicode‑safe Kleinschreibung + - Umlaute in ae/oe/ue, ß in ss + - Entfernen von Rechtsformen und Stop-Wörtern """ - scores = {'name': 0, 'location': 0, 'domain': 0} - - domain1 = record1.get('normalized_domain') - domain2 = record2.get('normalized_domain') - if domain1 and domain1 != 'k.a.' and domain1 == domain2: - scores['domain'] = 100 - - name1 = record1.get('normalized_name') - name2 = record2.get('normalized_name') - if name1 and name2: - name_similarity = fuzz.token_set_ratio(name1, name2) - scores['name'] = round(name_similarity * 0.7) + s = str(name).casefold() + for src, dst in [('ä','ae'), ('ö','oe'), ('ü','ue'), ('ß','ss')]: + s = s.replace(src, dst) + # Nur alphanumerisch und Leerzeichen + s = re.sub(r'[^a-z0-9\s]', ' ', s) + stops = ['gmbh','ag','kg','ug','ohg','holding','group','international'] + tokens = [t for t in s.split() if t and t not in stops] + return ' '.join(tokens) - ort1 = record1.get('CRM Ort') - ort2 = record2.get('CRM Ort') - land1 = record1.get('CRM Land') - land2 = record2.get('CRM Land') - if ort1 and ort1 == ort2 and land1 and land1 == land2: - scores['location'] = 20 - - total_score = sum(scores.values()) - - reasons = [] - if scores['domain'] > 0: reasons.append(f"Domain({scores['domain']})") - if scores['name'] > 0: reasons.append(f"Name({scores['name']})") - if scores['location'] > 0: reasons.append(f"Ort({scores['location']})") - reason_text = " + ".join(reasons) if reasons else "Keine Übereinstimmung" - return round(total_score), reason_text +def normalize_domain(url: str) -> str: + """Root-Domain extrahieren, Protokoll und www entfernen""" + s = str(url).casefold().strip() + s = re.sub(r'^https?://', '', s) + s = s.split('/')[0] + return s.removeprefix('www.') + def main(): - """Hauptfunktion zum Laden, Vergleichen und Schreiben der Daten.""" - start_time = time.time() - logger.info("Starte den Duplikats-Check (v2.0 mit Blocking und Maximum Logging)...") - logger.info(f"Logdatei: {log_file_path}") - - try: - sheet_handler = GoogleSheetHandler() - except Exception as e: - logger.critical(f"FEHLER bei Initialisierung des GoogleSheetHandler: {e}") + # Google Sheets laden + sheet_handler = GoogleSheetHandler() + crm_df = sheet_handler.get_sheet_as_dataframe(CRM_SHEET_NAME) + match_df = sheet_handler.get_sheet_as_dataframe(MATCHING_SHEET_NAME) + if crm_df is None or crm_df.empty or match_df is None or match_df.empty: + print("Fehler: Leere Daten in einem der Tabs. Abbruch.") return - logger.info(f"Lade Master-Daten aus '{CRM_SHEET_NAME}'...") - crm_df = sheet_handler.get_sheet_as_dataframe(CRM_SHEET_NAME) - if crm_df is None or crm_df.empty: return + # Normalisierung + for df in (crm_df, match_df): + df['norm_name'] = df['CRM Name'].fillna('').apply(normalize_company_name) + df['norm_domain'] = df['CRM Website'].fillna('').apply(normalize_domain) + df['city'] = df['CRM Ort'].fillna('').apply(lambda x: str(x).casefold().strip()) - logger.info(f"Lade zu prüfende Daten aus '{MATCHING_SHEET_NAME}'...") - matching_df = sheet_handler.get_sheet_as_dataframe(MATCHING_SHEET_NAME) - if matching_df is None or matching_df.empty: return - original_matching_df = matching_df.copy() + # Blocking per Domain + indexer = recordlinkage.Index() + indexer.block('norm_domain') + candidate_pairs = indexer.index(crm_df, match_df) - logger.info("Normalisiere Daten für den Vergleich...") - for df in [crm_df, matching_df]: - df['normalized_name'] = df['CRM Name'].astype(str).apply(normalize_company_name) - df['normalized_domain'] = df['CRM Website'].astype(str).apply(simple_normalize_url) - df['CRM Ort'] = df['CRM Ort'].astype(str).str.lower().str.strip() - df['CRM Land'] = df['CRM Land'].astype(str).str.lower().str.strip() - df['block_key'] = df['normalized_name'].apply(lambda x: x.split()[0] if x and x.split() else None) + # Vergleichsregeln definieren + compare = recordlinkage.Compare() + compare.exact('norm_domain', 'norm_domain', label='domain') + compare.string('norm_name', 'norm_name', method='jarowinkler', label='name_sim') + compare.exact('city', 'city', label='city') + features = compare.compute(candidate_pairs, crm_df, match_df) - logger.info("Erstelle Index für CRM-Daten zur Beschleunigung...") - crm_index = {} - crm_records = crm_df.to_dict('records') - for record in crm_records: - key = record['block_key'] - if key: - if key not in crm_index: - crm_index[key] = [] - crm_index[key].append(record) + # Gewichte und Score + features['score'] = ( + WEIGHTS['domain'] * features['domain'] + + WEIGHTS['name'] * features['name_sim'] + + WEIGHTS['city'] * features['city'] + ) - logger.info("Starte Matching-Prozess...") - results = [] - - for match_record in matching_df.to_dict('records'): - best_score = -1 - best_match_name = "" - best_reason = "" - - logger.info(f"--- Prüfe: '{match_record.get('CRM Name', 'N/A')}' ---") - logger.debug(f" [Normalisiert: '{match_record.get('normalized_name')}', Domain: '{match_record.get('normalized_domain')}', Key: '{match_record.get('block_key')}']") + # Bestes Match pro neuer Zeile + matches = features.reset_index() + best = matches.sort_values(['level_1','score'], ascending=[True, False]) \ + .drop_duplicates('level_1') + best = best[best['score'] >= SCORE_THRESHOLD] \ + .rename(columns={'level_0':'crm_idx','level_1':'match_idx'}) - block_key = match_record.get('block_key') - candidates = crm_index.get(block_key, []) - - if not candidates: - logger.debug(" -> Keine Kandidaten im Index gefunden. Überspringe Vergleich.") - results.append({ - 'Potenzieller Treffer im CRM': "", 'Ähnlichkeits-Score': 0, 'Matching-Grund': "Keine Kandidaten" - }) - continue + # Merges + crm_df = crm_df.reset_index() + match_df = match_df.reset_index() + merged = (best + .merge(crm_df, left_on='crm_idx', right_on='index') + .merge(match_df, left_on='match_idx', right_on='index', suffixes=('_CRM','_NEW')) + ) - logger.debug(f" -> Vergleiche mit {len(candidates)} Kandidaten aus Block '{block_key}'.") - - for crm_row in candidates: - score, reason = calculate_similarity_with_details(match_record, crm_row) - - if score > 0: - logger.debug(f" - Kandidat: '{crm_row.get('CRM Name', 'N/A')}' -> Score: {score} (Grund: {reason})") - - if score > best_score: - best_score = score - best_match_name = crm_row.get('CRM Name', 'N/A') - best_reason = reason - - logger.info(f" --> Bester Treffer: '{best_match_name}' mit Score {best_score} (Grund: {best_reason})") - - results.append({ - 'Potenzieller Treffer im CRM': best_match_name if best_score >= SCORE_THRESHOLD else "", - 'Ähnlichkeits-Score': best_score, - 'Matching-Grund': best_reason - }) + # Ausgabe aufbauen + output = match_df[['CRM Name','CRM Website','CRM Ort','CRM Land']].copy() + output['Matched CRM Name'] = '' + output['Matched CRM Website'] = '' + output['Matched CRM Ort'] = '' + output['Matched CRM Land'] = '' + output['Score'] = 0.0 - logging.info("Matching abgeschlossen. Schreibe Ergebnisse zurück ins Sheet...") - result_df = pd.DataFrame(results) - - output_df = pd.concat([original_matching_df.reset_index(drop=True), result_df], axis=1) - - data_to_write = [output_df.columns.values.tolist()] + output_df.values.tolist() - - success = sheet_handler.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write) + for _, row in merged.iterrows(): + i = int(row['match_idx']) + output.at[i, 'Matched CRM Name'] = row['CRM Name_CRM'] + output.at[i, 'Matched CRM Website'] = row['CRM Website_CRM'] + output.at[i, 'Matched CRM Ort'] = row['CRM Ort_CRM'] + output.at[i, 'Matched CRM Land'] = row['CRM Land_CRM'] + output.at[i, 'Score'] = row['score'] + + # Zurückschreiben ins Google Sheet + data = [output.columns.tolist()] + output.values.tolist() + success = sheet_handler.clear_and_write_data(MATCHING_SHEET_NAME, data) if success: - logger.info(f"Ergebnisse erfolgreich in das Tabellenblatt '{MATCHING_SHEET_NAME}' geschrieben.") + print(f"Erfolgreich: {len(best)} Matches mit Score ≥ {SCORE_THRESHOLD}") else: - logger.error("FEHLER beim Schreiben der Ergebnisse ins Google Sheet.") + print("Fehler beim Schreiben ins Google Sheet.") - end_time = time.time() - logger.info(f"Gesamtdauer des Duplikats-Checks: {end_time - start_time:.2f} Sekunden.") - logger.info(f"===== Skript beendet =====") - -if __name__ == "__main__": - main() \ No newline at end of file +if __name__ == '__main__': + main()