duplicate_checker.py aktualisiert

--- FEATURES v4.0 ---
- NEU: "Kernidentitäts-Bonus": Ein hoher Bonus wird vergeben, wenn das seltenste (wichtigste) Token übereinstimmt.
  Dies fördert das "großzügige Matchen" auf Basis der Kernmarke (z.B. "ANDRITZ AG" vs. "ANDRITZ HYDRO").
- NEU: Intelligenter "Shortest Name Tie-Breaker": Wird nur noch bei sehr hohen und sehr ähnlichen Scores angewendet.
- Finale Kalibrierung der Score-Berechnung und Schwellenwerte für optimale Balance.
- Golden-Rule für exakte Matches und Interaktiver Modus beibehalten.
This commit is contained in:
2025-09-05 09:39:56 +00:00
parent 7a273bf25a
commit f5af3023f8

View File

@@ -1,11 +1,12 @@
# duplicate_checker.py v3.3
# duplicate_checker.py v4.0
# Build timestamp is injected into logfile name.
# --- ÄNDERUNGEN v3.3 ---
# - NEU: "Shortest Name Tie-Breaker": Bei sehr ähnlichen Scores wird der Kandidat mit dem kürzeren Namen bevorzugt,
# um das Prinzip der "wirtschaftlichen Einheit" (z.B. Holding) besser abzubilden.
# - Scoring-Formel und Schwellenwerte erneut feinjustiert für finale Balance.
# - Golden-Rule und Interaktiver Modus beibehalten.
# --- FEATURES v4.0 ---
# - NEU: "Kernidentitäts-Bonus": Ein hoher Bonus wird vergeben, wenn das seltenste (wichtigste) Token übereinstimmt.
# Dies fördert das "großzügige Matchen" auf Basis der Kernmarke (z.B. "ANDRITZ AG" vs. "ANDRITZ HYDRO").
# - NEU: Intelligenter "Shortest Name Tie-Breaker": Wird nur noch bei sehr hohen und sehr ähnlichen Scores angewendet.
# - Finale Kalibrierung der Score-Berechnung und Schwellenwerte für optimale Balance.
# - Golden-Rule für exakte Matches und Interaktiver Modus beibehalten.
import os
import sys
@@ -25,7 +26,6 @@ from google_sheet_handler import GoogleSheetHandler
STATUS_DIR = "job_status"
def update_status(job_id, status, progress_message):
# ... (Keine Änderungen hier)
if not job_id: return
status_file = os.path.join(STATUS_DIR, f"{job_id}.json")
try:
@@ -47,21 +47,18 @@ CRM_SHEET_NAME = "CRM_Accounts"
MATCHING_SHEET_NAME = "Matching_Accounts"
LOG_DIR = "Log"
now = datetime.now().strftime('%Y-%m-%d_%H-%M')
LOG_FILE = f"{now}_duplicate_check_v3.3.txt"
LOG_FILE = f"{now}_duplicate_check_v4.0.txt"
# --- NEU: Angepasste Scoring-Konfiguration v3.3 ---
SCORE_THRESHOLD = 95 # Standard-Schwelle
SCORE_THRESHOLD_WEAK= 125 # Schwelle für Matches ohne Domain oder Ort
GOLDEN_MATCH_RATIO = 97 # Leicht großzügiger
# --- Scoring-Konfiguration v4.0 ---
SCORE_THRESHOLD = 100 # Standard-Schwelle
SCORE_THRESHOLD_WEAK= 130 # Schwelle für Matches ohne Domain oder Ort
GOLDEN_MATCH_RATIO = 97
GOLDEN_MATCH_SCORE = 300
MIN_NAME_SCORE_FOR_DOMAIN = 3.0
CORE_IDENTITY_BONUS = 60 # NEU: Bonus für die Übereinstimmung des wichtigsten Tokens
# Tie-Breaker Konfiguration
TIE_SCORE_DIFF = 15 # Max Score-Unterschied für Tie-Breaking
# Interaktiver Modus Konfiguration
INTERACTIVE_SCORE_MIN = 95
INTERACTIVE_SCORE_DIFF = 20
# Tie-Breaker & Interaktiver Modus Konfiguration
TRIGGER_SCORE_MIN = 150 # NEU: Mindestscore für Tie-Breaker / Interaktiv
TIE_SCORE_DIFF = 20
# Prefilter-Konfiguration
PREFILTER_MIN_PARTIAL = 70
@@ -87,7 +84,7 @@ fh.setFormatter(formatter)
root.addHandler(fh)
logger = logging.getLogger(__name__)
logger.info(f"Logging to console and file: {log_path}")
logger.info(f"Starting duplicate_checker.py v3.3 | Build: {now}")
logger.info(f"Starting duplicate_checker.py v4.0 | Build: {now}")
# --- SerpAPI Key laden ---
# ... (Keine Änderungen hier)
@@ -100,7 +97,6 @@ except Exception as e:
logger.warning(f"Fehler beim Laden API-Keys: {e}")
serp_key = None
# --- Stop-/City-Tokens ---
STOP_TOKENS_BASE = {
'gmbh','mbh','ag','kg','ug','ohg','se','co','kgaa','inc','llc','ltd','sarl', 'b.v', 'bv',
@@ -123,7 +119,6 @@ def clean_name_for_scoring(norm_name: str):
final_tokens = [t for t in tokens if t not in stop_union]
return " ".join(final_tokens), set(final_tokens)
# --- TF-IDF Logik ---
def build_term_weights(crm_df: pd.DataFrame):
logger.info("Starte Berechnung der Wortgewichte (TF-IDF)...")
token_counts = Counter()
@@ -142,7 +137,7 @@ def build_term_weights(crm_df: pd.DataFrame):
logger.info(f"Wortgewichte für {len(term_weights)} Tokens berechnet.")
return term_weights
# --- Similarity v3.3 ---
# --- Similarity v4.0 ---
def calculate_similarity(mrec: dict, crec: dict, term_weights: dict):
n1_raw = mrec.get('normalized_name', '')
@@ -168,17 +163,24 @@ def calculate_similarity(mrec: dict, crec: dict, term_weights: dict):
overlap_percentage = len(overlapping_tokens) / len(toks1)
name_score *= (1 + overlap_percentage)
# --- NEU v4.0: Kernidentitäts-Bonus ---
core_identity_bonus = 0
rarest_token_mrec = choose_rarest_token(n1_raw, term_weights)
if rarest_token_mrec and rarest_token_mrec in toks2:
core_identity_bonus = CORE_IDENTITY_BONUS
# Domain-Gate
score_domain = 0
if domain_match:
if name_score >= MIN_NAME_SCORE_FOR_DOMAIN:
score_domain = 75
if name_score > 2.0 or (city_match and country_match):
score_domain = 70
else:
score_domain = 20
score_location = 25 if (city_match and country_match) else 0
# --- ÄNDERUNG v3.3: Angepasste Gewichtung ---
total = name_score * 10 + score_domain + score_location
# Finale Score-Kalibrierung v4.0
total = name_score * 10 + score_domain + score_location + core_identity_bonus
penalties = 0
if mrec.get('CRM Land') and crec.get('CRM Land') and not country_match:
@@ -190,8 +192,8 @@ def calculate_similarity(mrec: dict, crec: dict, term_weights: dict):
comp = {
'name_score': round(name_score,1),
'domain_match': domain_match,
'city_match': city_match,
'country_match': country_match,
'location_match': int(city_match and country_match),
'core_bonus': core_identity_bonus,
'penalties': penalties,
'overlapping_tokens': list(overlapping_tokens)
}
@@ -221,7 +223,7 @@ def choose_rarest_token(norm_name: str, term_weights: dict):
return rarest if term_weights.get(rarest, 0) > 0 else None
def main(job_id=None, interactive=False):
logger.info("Starte Duplikats-Check v3.3 (Tie-Breaker Final Calibration)")
logger.info("Starte Duplikats-Check v4.0 (Core Identity Bonus)")
# ... (Code für Initialisierung und Datenladen bleibt identisch) ...
update_status(job_id, "Läuft", "Initialisiere GoogleSheetHandler...")
try:
@@ -332,16 +334,13 @@ def main(job_id=None, interactive=False):
best_match = scored[0] if scored else None
# --- NEU: "Shortest Name Tie-Breaker" Logik ---
# --- Intelligenter Tie-Breaker v4.0 ---
if best_match and len(scored) > 1:
best_score = best_match['score']
second_best_score = scored[1]['score']
# Wenn Scores sehr nah beieinander liegen UND es kein Golden Match ist
if best_score < GOLDEN_MATCH_SCORE and (best_score - second_best_score) < TIE_SCORE_DIFF:
if best_score >= TRIGGER_SCORE_MIN and (best_score - second_best_score) < TIE_SCORE_DIFF and best_score < GOLDEN_MATCH_SCORE:
logger.info(f" Tie-Breaker-Situation erkannt für '{mrow['CRM Name']}'. Scores: {best_score} vs {second_best_score}")
# Finde alle Kandidaten im "Tie-Bereich"
tie_candidates = [c for c in scored if (best_score - c['score']) < TIE_SCORE_DIFF]
# Wähle den Kandidaten mit dem kürzesten Namen
best_match_by_length = min(tie_candidates, key=lambda x: len(x['name']))
if best_match_by_length['name'] != best_match['name']:
logger.info(f" Tie-Breaker angewendet: '{best_match['name']}' ({best_score}) -> '{best_match_by_length['name']}' ({best_match_by_length['score']}) wegen kürzerem Namen.")
@@ -353,34 +352,11 @@ def main(job_id=None, interactive=False):
second_best_score = scored[1]['score']
if best_score > INTERACTIVE_SCORE_MIN and (best_score - second_best_score) < INTERACTIVE_SCORE_DIFF and best_score < GOLDEN_MATCH_SCORE:
# ... (Interaktive Logik bleibt gleich) ...
print("\n" + "="*50)
print(f"AMBIGUOUS MATCH for '{mrow['CRM Name']}'")
print(f"Top candidates have very similar scores.")
print(f" - Match: '{mrow['CRM Name']}' | {mrow['normalized_domain']} | {mrow['CRM Ort']}, {mrow['CRM Land']}")
print("-"*50)
for i, item in enumerate(scored[:5]):
cr = item['record']
print(f"[{i+1}] Candidate: '{cr['CRM Name']}' | {cr['normalized_domain']} | {cr['CRM Ort']}, {cr['CRM Land']}")
print(f" Score: {item['score']} | Details: {item['comp']}")
print("[0] No match")
print("\n" + "="*50)
# ...
choice = -1
while choice < 0 or choice > len(scored[:5]):
try:
choice = int(input(f"Please select the best match (1-{len(scored[:5])}) or 0 for no match: "))
except ValueError:
choice = -1
if choice > 0:
best_match = scored[choice-1]
logger.info(f"User selected candidate {choice}: '{best_match['name']}'")
elif choice == 0:
best_match = None
logger.info("User selected no match.")
print("="*50 + "\n")
if best_match and best_match['score'] >= SCORE_THRESHOLD:
is_weak = best_match['comp'].get('domain_match', 0) == 0 and not (best_match['comp'].get('city_match', 0) and best_match['comp'].get('country_match', 0))
is_weak = best_match['comp'].get('domain_match', 0) == 0 and not (best_match['comp'].get('location_match', 0))
applied_threshold = SCORE_THRESHOLD_WEAK if is_weak else SCORE_THRESHOLD
if best_match['score'] >= applied_threshold:
@@ -400,24 +376,16 @@ def main(job_id=None, interactive=False):
logger.info("Matching-Prozess abgeschlossen. Bereite Ergebnisse für den Upload vor...")
# ... (Rest des Codes bleibt identisch) ...
update_status(job_id, "Läuft", "Schreibe Ergebnisse zurück ins Sheet...")
result_df = pd.DataFrame(results)
cols_to_drop_from_match = ['Match', 'Score', 'Match_Grund']
match_df_clean = match_df.drop(columns=[col for col in cols_to_drop_from_match if col in match_df.columns], errors='ignore')
final_df = pd.concat([match_df_clean.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1)
cols_to_drop = ['normalized_name', 'normalized_domain']
final_df = final_df.drop(columns=[col for col in cols_to_drop if col in final_df.columns], errors='ignore')
upload_df = final_df.astype(str).replace({'nan': '', 'None': ''})
data_to_write = [upload_df.columns.tolist()] + upload_df.values.tolist()
logger.info(f"Versuche, {len(data_to_write) - 1} Ergebniszeilen in das Sheet '{MATCHING_SHEET_NAME}' zu schreiben...")
ok = sheet.clear_and_write_data(MATCHING_SHEET_NAME, data_to_write)
if ok:
logger.info("Ergebnisse erfolgreich in das Google Sheet geschrieben.")
update_status(job_id, "Abgeschlossen", f"{total} Accounts erfolgreich geprüft.")
@@ -426,7 +394,7 @@ def main(job_id=None, interactive=False):
update_status(job_id, "Fehlgeschlagen", "Fehler beim Schreiben ins Google Sheet.")
if __name__=='__main__':
parser = argparse.ArgumentParser(description="Duplicate Checker v3.3")
parser = argparse.ArgumentParser(description="Duplicate Checker v4.0")
parser.add_argument("--job-id", type=str, help="Eindeutige ID für den Job-Status.")
parser.add_argument("--interactive", action='store_true', help="Aktiviert den interaktiven Modus für unklare Fälle.")
args = parser.parse_args()