223 lines
8.5 KiB
Python
223 lines
8.5 KiB
Python
import logging
|
|
import re
|
|
from collections import Counter
|
|
from typing import List, Tuple, Dict, Any, Optional
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import select
|
|
|
|
# External libs (must be in requirements.txt)
|
|
from thefuzz import fuzz
|
|
from ..database import Company
|
|
from ..lib.core_utils import clean_text, normalize_string
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Configuration (Ported from Legacy) ---
|
|
SCORE_THRESHOLD = 80
|
|
SCORE_THRESHOLD_WEAK = 95
|
|
MIN_NAME_FOR_DOMAIN = 70
|
|
CITY_MISMATCH_PENALTY = 30
|
|
COUNTRY_MISMATCH_PENALTY = 40
|
|
|
|
STOP_TOKENS_BASE = {
|
|
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl',
|
|
'holding','gruppe','group','international','solutions','solution','service','services',
|
|
'deutschland','austria','germany','technik','technology','technologies','systems','systeme',
|
|
'logistik','logistics','industries','industrie','management','consulting','vertrieb','handel',
|
|
'international','company','gesellschaft','mbh&co','mbhco','werke','werk'
|
|
}
|
|
|
|
# ==============================================================================
|
|
# Helpers
|
|
# ==============================================================================
|
|
|
|
def _tokenize(s: str) -> List[str]:
|
|
if not s: return []
|
|
return re.split(r"[^a-z0-9]+", str(s).lower())
|
|
|
|
def split_tokens(name: str) -> List[str]:
|
|
if not name: return []
|
|
tokens = [t for t in _tokenize(name) if len(t) >= 3]
|
|
return [t for t in tokens if t not in STOP_TOKENS_BASE]
|
|
|
|
def clean_name_for_scoring(norm_name: str) -> Tuple[str, set]:
|
|
toks = split_tokens(norm_name)
|
|
return " ".join(toks), set(toks)
|
|
|
|
# ==============================================================================
|
|
# Core Deduplication Logic
|
|
# ==============================================================================
|
|
|
|
class Deduplicator:
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.reference_data = [] # Cache for DB records
|
|
self.domain_index = {}
|
|
self.token_freq = Counter()
|
|
self.token_index = {}
|
|
self._load_reference_data()
|
|
|
|
def _load_reference_data(self):
|
|
"""
|
|
Loads minimal dataset from DB into RAM for fast fuzzy matching.
|
|
Optimized for 10k-50k records.
|
|
"""
|
|
logger.info("Loading reference data for deduplication...")
|
|
# Include crm_id in the query
|
|
query = self.db.query(Company.id, Company.name, Company.website, Company.city, Company.country, Company.crm_id)
|
|
companies = query.all()
|
|
|
|
for c in companies:
|
|
norm_name = normalize_string(c.name)
|
|
norm_domain = normalize_string(c.website) # Simplified, should extract domain
|
|
|
|
record = {
|
|
'id': c.id,
|
|
'crm_id': c.crm_id,
|
|
'name': c.name,
|
|
'normalized_name': norm_name,
|
|
'normalized_domain': norm_domain,
|
|
'city': normalize_string(c.city),
|
|
'country': normalize_string(c.country)
|
|
}
|
|
self.reference_data.append(record)
|
|
|
|
# Build Indexes
|
|
if norm_domain and norm_domain != "k.a.":
|
|
self.domain_index.setdefault(norm_domain, []).append(record)
|
|
|
|
# Token Frequency
|
|
_, toks = clean_name_for_scoring(norm_name)
|
|
for t in toks:
|
|
self.token_freq[t] += 1
|
|
self.token_index.setdefault(t, []).append(record)
|
|
|
|
logger.info(f"Loaded {len(self.reference_data)} records for deduplication.")
|
|
|
|
def _choose_rarest_token(self, norm_name: str) -> Optional[str]:
|
|
_, toks = clean_name_for_scoring(norm_name)
|
|
if not toks: return None
|
|
# Sort by frequency (asc) then length (desc)
|
|
lst = sorted(list(toks), key=lambda x: (self.token_freq.get(x, 10**9), -len(x)))
|
|
return lst[0] if lst else None
|
|
|
|
def find_duplicates(self, candidate: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Checks a single candidate against the loaded index.
|
|
Returns list of matches with score >= Threshold.
|
|
"""
|
|
# Prepare Candidate
|
|
c_norm_name = normalize_string(candidate.get('name', ''))
|
|
c_norm_domain = normalize_string(candidate.get('website', ''))
|
|
c_city = normalize_string(candidate.get('city', ''))
|
|
c_country = normalize_string(candidate.get('country', ''))
|
|
|
|
candidates_to_check = {} # Map ID -> Record
|
|
|
|
# 1. Domain Match (Fastest)
|
|
if c_norm_domain and c_norm_domain != "k.a." and c_norm_domain in self.domain_index:
|
|
for r in self.domain_index[c_norm_domain]:
|
|
candidates_to_check[r['id']] = r
|
|
|
|
# 2. Rarest Token Match (Blocking)
|
|
rtok = self._choose_rarest_token(c_norm_name)
|
|
if rtok and rtok in self.token_index:
|
|
for r in self.token_index[rtok]:
|
|
candidates_to_check[r['id']] = r
|
|
|
|
if not candidates_to_check:
|
|
# Fallback: if no domain or rare token match, we might have an exact name match that wasn't indexed correctly (e.g. all tokens are stop words)
|
|
# This is rare but possible. We check reference_data directly if name is short and candidate pool is empty.
|
|
if len(c_norm_name) > 3:
|
|
for r in self.reference_data:
|
|
if r['normalized_name'] == c_norm_name:
|
|
candidates_to_check[r['id']] = r
|
|
|
|
if not candidates_to_check:
|
|
return []
|
|
|
|
# 3. Scoring
|
|
matches = []
|
|
for db_rec in candidates_to_check.values():
|
|
score, details = self._calculate_similarity(
|
|
cand={'n': c_norm_name, 'd': c_norm_domain, 'c': c_city, 'ct': c_country},
|
|
ref=db_rec
|
|
)
|
|
|
|
# Threshold Logic (Weak vs Strong)
|
|
# A match is "weak" if there is no domain match AND no location match
|
|
is_weak = (details['domain_match'] == 0 and not (details['loc_match']))
|
|
threshold = SCORE_THRESHOLD_WEAK if is_weak else SCORE_THRESHOLD
|
|
|
|
if score >= threshold:
|
|
matches.append({
|
|
'company_id': db_rec['id'],
|
|
'crm_id': db_rec['crm_id'],
|
|
'name': db_rec['name'],
|
|
'score': score,
|
|
'details': details
|
|
})
|
|
|
|
matches.sort(key=lambda x: x['score'], reverse=True)
|
|
return matches
|
|
|
|
def _calculate_similarity(self, cand, ref):
|
|
# Data Prep
|
|
n1, n2 = cand['n'], ref['normalized_name']
|
|
|
|
# Exact Name Shortcut
|
|
if n1 and n1 == n2:
|
|
return 100, {'exact': True, 'domain_match': 0, 'loc_match': 1 if (cand['c'] and ref['city'] and cand['c'] == ref['city']) else 0, 'name_score': 100, 'penalties': 0}
|
|
|
|
# Domain
|
|
d1, d2 = cand['d'], ref['normalized_domain']
|
|
domain_match = 1 if (d1 and d2 and d1 != "k.a." and d1 == d2) else 0
|
|
|
|
# Location
|
|
city_match = 1 if (cand['c'] and ref['city'] and cand['c'] == ref['city']) else 0
|
|
country_match = 1 if (cand['ct'] and ref['country'] and cand['ct'] == ref['country']) else 0
|
|
loc_match = city_match and country_match
|
|
|
|
# Name Fuzzy Score
|
|
clean1, _ = clean_name_for_scoring(n1)
|
|
clean2, _ = clean_name_for_scoring(n2)
|
|
|
|
if clean1 and clean2:
|
|
ts = fuzz.token_set_ratio(clean1, clean2)
|
|
pr = fuzz.partial_ratio(clean1, clean2)
|
|
ss = fuzz.token_sort_ratio(clean1, clean2)
|
|
name_score = max(ts, pr, ss)
|
|
else:
|
|
# If cleaning removed everything, fallback to raw fuzzy on normalized names
|
|
name_score = fuzz.ratio(n1, n2) if (n1 and n2) else 0
|
|
|
|
# Penalties
|
|
penalties = 0
|
|
if cand['ct'] and ref['country'] and not country_match:
|
|
penalties += COUNTRY_MISMATCH_PENALTY
|
|
if cand['c'] and ref['city'] and not city_match:
|
|
penalties += CITY_MISMATCH_PENALTY
|
|
|
|
# Final Calc
|
|
# Base weights: Domain is king (100), Name is mandatory (unless domain match)
|
|
total = 0
|
|
if domain_match:
|
|
total = 100
|
|
else:
|
|
total = name_score
|
|
|
|
if loc_match:
|
|
total += 10 # Bonus for location match
|
|
|
|
total -= penalties
|
|
|
|
# Capping
|
|
total = min(100, max(0, total))
|
|
|
|
return total, {
|
|
'name_score': name_score,
|
|
'domain_match': domain_match,
|
|
'loc_match': loc_match,
|
|
'penalties': penalties
|
|
}
|