feat: Interne Deduplizierung implementieren und Skript refaktorieren

- Skript zu company_deduplicator.py umbenannt mit Erhalt der Git-Historie
- Hauptlogik in externen und internen Modus refaktorisiert
- Interaktive Modus-Auswahl für den Benutzer hinzugefügt
- Interne Deduplizierung zum Finden von Duplikaten innerhalb der CRM-Liste implementiert
- Logik zur Gruppierung von Duplikatspaaren zu eindeutigen Clustern hinzugefügt
- Eindeutige Dup_XXXX IDs den Duplikatsgruppen zugewiesen
- Neue Spalte Duplicate_ID zurück in das Google Sheet geschrieben
This commit is contained in:
2025-11-09 08:09:45 +00:00
parent 44f83ac661
commit fe96789f6b

View File

@@ -215,9 +215,180 @@ def choose_rarest_token(norm_name: str, token_freq: Counter):
lst = sorted(list(toks), key=lambda x: (token_freq.get(x, 10**9), -len(x)))
return lst[0] if lst else None
# --- Hauptfunktion ---
def main():
logger.info("Starte Duplikats-Check v2.15 (Quality-first++)")
def build_city_tokens(df1: pd.DataFrame, df2: pd.DataFrame = None):
"""Baut dynamisch ein Set von City-Tokens aus den Orts-Spalten."""
dfs = [df1]
if df2 is not None:
dfs.append(df2)
cities = set()
for s in pd.concat([df['CRM Ort'] for df in dfs], ignore_index=True).dropna().unique():
for t in _tokenize(s):
if len(t) >= 3:
cities.add(t)
return cities
def run_internal_deduplication():
"""Führt die interne Deduplizierung auf dem CRM_Accounts-Sheet durch."""
logger.info("Modus 'Interne Deduplizierung' gewählt.")
try:
sheet = GoogleSheetHandler()
logger.info("GoogleSheetHandler initialisiert")
except Exception as e:
logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}")
sys.exit(1)
# Daten laden
crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME)
if crm_df is None or crm_df.empty:
logger.critical("CRM-Sheet ist leer. Abbruch.")
return
# Eindeutige ID hinzufügen, um Zeilen zu identifizieren
crm_df['unique_id'] = crm_df.index
logger.info(f"{len(crm_df)} CRM-Datensätze geladen.")
# Normalisierung
crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name)
crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url)
crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip()
crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip()
crm_df['domain_use_flag'] = 1 # CRM-Domain gilt als vertrauenswürdig
# City-Tokens und Blocking-Indizes
global CITY_TOKENS
CITY_TOKENS = build_city_tokens(crm_df)
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
crm_records, domain_index, token_freq, token_index = build_indexes(crm_df)
logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}")
# --- Selbst-Vergleich ---
found_pairs = []
processed_pairs = set() # Verhindert (A,B) und (B,A)
total = len(crm_records)
logger.info("Starte internen Abgleich...")
for i, record1 in enumerate(crm_records):
if i % 100 == 0:
logger.info(f"Verarbeite Datensatz {i}/{total}...")
candidate_records = {}
# Kandidaten via Domain finden
domain = record1.get('normalized_domain')
if domain:
for record2 in domain_index.get(domain, []):
candidate_records[record2['unique_id']] = record2
# Kandidaten via seltenstem Token finden
rtok = choose_rarest_token(record1.get('normalized_name',''), token_freq)
if rtok:
for record2 in token_index.get(rtok, []):
candidate_records[record2['unique_id']] = record2
if not candidate_records:
continue
for record2 in candidate_records.values():
# Vergleiche nicht mit sich selbst
if record1['unique_id'] == record2['unique_id']:
continue
# Verhindere doppelte Vergleiche (A,B) vs (B,A)
pair_key = tuple(sorted((record1['unique_id'], record2['unique_id'])))
if pair_key in processed_pairs:
continue
processed_pairs.add(pair_key)
score, comp = calculate_similarity(record1, record2, token_freq)
# Akzeptanzlogik (hier könnte man den Threshold anpassen)
if score >= SCORE_THRESHOLD:
pair_info = {
'id1': record1['unique_id'], 'name1': record1['CRM Name'],
'id2': record2['unique_id'], 'name2': record2['CRM Name'],
'score': score,
'details': str(comp)
}
found_pairs.append(pair_info)
logger.info(f" -> Potenzielles Duplikat gefunden: '{record1['CRM Name']}' <-> '{record2['CRM Name']}' (Score: {score})")
logger.info("\n===== Interner Abgleich abgeschlossen ====")
logger.info(f"Insgesamt {len(found_pairs)} potenzielle Duplikatspaare gefunden.")
if not found_pairs:
logger.info("Keine weiteren Schritte nötig.")
return
groups = group_duplicate_pairs(found_pairs)
logger.info(f"{len(groups)} eindeutige Duplikatsgruppen gebildet.")
if not groups:
logger.info("Keine Duplikate gefunden, die geschrieben werden müssen.")
return
# Schritt 4: IDs zuweisen und in Tabelle schreiben
crm_df['Duplicate_ID'] = ''
dup_counter = 1
for group in groups:
dup_id = f"Dup_{dup_counter:04d}"
dup_counter += 1
# IDs der Gruppe im DataFrame aktualisieren
crm_df.loc[crm_df['unique_id'].isin(group), 'Duplicate_ID'] = dup_id
# Namen der Gruppenmitglieder für Log-Ausgabe sammeln
member_names = crm_df[crm_df['unique_id'].isin(group)]['CRM Name'].tolist()
logger.info(f"Gruppe {dup_id}: {member_names}")
# Bereinigen der Hilfsspalten vor dem Schreiben
crm_df.drop(columns=['unique_id', 'normalized_name', 'normalized_domain', 'domain_use_flag'], inplace=True)
# Ergebnisse zurückschreiben
logger.info("Schreibe Ergebnisse mit Duplikats-IDs ins Sheet...")
backup_path = os.path.join(LOG_DIR, f"{now}_backup_internal_{CRM_SHEET_NAME}.csv")
try:
crm_df.to_csv(backup_path, index=False, encoding='utf-8')
logger.info(f"Lokales Backup geschrieben: {backup_path}")
except Exception as e:
logger.warning(f"Backup fehlgeschlagen: {e}")
data = [crm_df.columns.tolist()] + crm_df.fillna('').values.tolist()
ok = sheet.clear_and_write_data(CRM_SHEET_NAME, data)
if ok:
logger.info("Ergebnisse erfolgreich ins Google Sheet geschrieben.")
else:
logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.")
def group_duplicate_pairs(pairs: list) -> list:
"""Fasst eine Liste von Duplikatspaaren zu Gruppen zusammen."""
groups = []
for pair in pairs:
id1, id2 = pair['id1'], pair['id2']
group1_found = None
group2_found = None
for group in groups:
if id1 in group:
group1_found = group
if id2 in group:
group2_found = group
if group1_found and group2_found:
if group1_found is not group2_found: # Zwei unterschiedliche Gruppen verschmelzen
group1_found.update(group2_found)
groups.remove(group2_found)
elif group1_found: # Zu Gruppe 1 hinzufügen
group1_found.add(id2)
elif group2_found: # Zu Gruppe 2 hinzufügen
group2_found.add(id1)
else: # Neue Gruppe erstellen
groups.append({id1, id2})
return [set(g) for g in groups]
def run_external_comparison():
"""Führt den Vergleich zwischen CRM_Accounts und Matching_Accounts durch."""
logger.info("Modus 'Externer Vergleich' gewählt.")
try:
sheet = GoogleSheetHandler()
logger.info("GoogleSheetHandler initialisiert")
@@ -296,13 +467,6 @@ def main():
match_df['domain_use_flag'] = match_df.apply(_domain_use, axis=1)
# City-Tokens dynamisch bauen (nach Normalisierung von Ort)
def build_city_tokens(crm_df, match_df):
cities = set()
for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique():
for t in _tokenize(s):
if len(t) >= 3:
cities.add(t)
return cities
global CITY_TOKENS
CITY_TOKENS = build_city_tokens(crm_df, match_df)
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
@@ -446,5 +610,25 @@ def main():
logger.info(f"Serp Vertrauen: {dict(serp_counts)}")
logger.info(f"Config: TH={SCORE_THRESHOLD}, TH_WEAK={SCORE_THRESHOLD_WEAK}, MIN_NAME_FOR_DOMAIN={MIN_NAME_FOR_DOMAIN}, Penalties(city={CITY_MISMATCH_PENALTY},country={COUNTRY_MISMATCH_PENALTY}), Prefilter(partial>={PREFILTER_MIN_PARTIAL}, limit={PREFILTER_LIMIT})")
# --- Hauptfunktion ---
def main():
logger.info("Starte Duplikats-Check v3.0")
while True:
print("\nBitte wählen Sie den gewünschten Modus:")
print("1: Externer Vergleich (gleicht CRM_Accounts mit Matching_Accounts ab)")
print("2: Interne Deduplizierung (findet Duplikate innerhalb von CRM_Accounts)")
choice = input("Ihre Wahl (1 oder 2): ")
if choice == '1':
run_external_comparison()
break
elif choice == '2':
run_internal_deduplication()
break
else:
print("Ungültige Eingabe. Bitte geben Sie 1 oder 2 ein.")
if __name__=='__main__':
main()
main()