feat: Interne Deduplizierung implementieren und Skript refaktorieren
- Skript zu company_deduplicator.py umbenannt mit Erhalt der Git-Historie - Hauptlogik in externen und internen Modus refaktorisiert - Interaktive Modus-Auswahl für den Benutzer hinzugefügt - Interne Deduplizierung zum Finden von Duplikaten innerhalb der CRM-Liste implementiert - Logik zur Gruppierung von Duplikatspaaren zu eindeutigen Clustern hinzugefügt - Eindeutige Dup_XXXX IDs den Duplikatsgruppen zugewiesen - Neue Spalte Duplicate_ID zurück in das Google Sheet geschrieben
This commit is contained in:
@@ -215,9 +215,180 @@ def choose_rarest_token(norm_name: str, token_freq: Counter):
|
||||
lst = sorted(list(toks), key=lambda x: (token_freq.get(x, 10**9), -len(x)))
|
||||
return lst[0] if lst else None
|
||||
|
||||
# --- Hauptfunktion ---
|
||||
def main():
|
||||
logger.info("Starte Duplikats-Check v2.15 (Quality-first++)")
|
||||
def build_city_tokens(df1: pd.DataFrame, df2: pd.DataFrame = None):
|
||||
"""Baut dynamisch ein Set von City-Tokens aus den Orts-Spalten."""
|
||||
dfs = [df1]
|
||||
if df2 is not None:
|
||||
dfs.append(df2)
|
||||
cities = set()
|
||||
for s in pd.concat([df['CRM Ort'] for df in dfs], ignore_index=True).dropna().unique():
|
||||
for t in _tokenize(s):
|
||||
if len(t) >= 3:
|
||||
cities.add(t)
|
||||
return cities
|
||||
|
||||
def run_internal_deduplication():
|
||||
"""Führt die interne Deduplizierung auf dem CRM_Accounts-Sheet durch."""
|
||||
logger.info("Modus 'Interne Deduplizierung' gewählt.")
|
||||
try:
|
||||
sheet = GoogleSheetHandler()
|
||||
logger.info("GoogleSheetHandler initialisiert")
|
||||
except Exception as e:
|
||||
logger.critical(f"Init GoogleSheetHandler fehlgeschlagen: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Daten laden
|
||||
crm_df = sheet.get_sheet_as_dataframe(CRM_SHEET_NAME)
|
||||
if crm_df is None or crm_df.empty:
|
||||
logger.critical("CRM-Sheet ist leer. Abbruch.")
|
||||
return
|
||||
|
||||
# Eindeutige ID hinzufügen, um Zeilen zu identifizieren
|
||||
crm_df['unique_id'] = crm_df.index
|
||||
logger.info(f"{len(crm_df)} CRM-Datensätze geladen.")
|
||||
|
||||
# Normalisierung
|
||||
crm_df['normalized_name'] = crm_df['CRM Name'].astype(str).apply(normalize_company_name)
|
||||
crm_df['normalized_domain'] = crm_df['CRM Website'].astype(str).apply(simple_normalize_url)
|
||||
crm_df['CRM Ort'] = crm_df['CRM Ort'].astype(str).str.lower().str.strip()
|
||||
crm_df['CRM Land'] = crm_df['CRM Land'].astype(str).str.lower().str.strip()
|
||||
crm_df['domain_use_flag'] = 1 # CRM-Domain gilt als vertrauenswürdig
|
||||
|
||||
# City-Tokens und Blocking-Indizes
|
||||
global CITY_TOKENS
|
||||
CITY_TOKENS = build_city_tokens(crm_df)
|
||||
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
|
||||
|
||||
crm_records, domain_index, token_freq, token_index = build_indexes(crm_df)
|
||||
logger.info(f"Blocking: Domains={len(domain_index)} | TokenKeys={len(token_index)}")
|
||||
|
||||
# --- Selbst-Vergleich ---
|
||||
found_pairs = []
|
||||
processed_pairs = set() # Verhindert (A,B) und (B,A)
|
||||
total = len(crm_records)
|
||||
logger.info("Starte internen Abgleich...")
|
||||
|
||||
for i, record1 in enumerate(crm_records):
|
||||
if i % 100 == 0:
|
||||
logger.info(f"Verarbeite Datensatz {i}/{total}...")
|
||||
|
||||
candidate_records = {}
|
||||
# Kandidaten via Domain finden
|
||||
domain = record1.get('normalized_domain')
|
||||
if domain:
|
||||
for record2 in domain_index.get(domain, []):
|
||||
candidate_records[record2['unique_id']] = record2
|
||||
|
||||
# Kandidaten via seltenstem Token finden
|
||||
rtok = choose_rarest_token(record1.get('normalized_name',''), token_freq)
|
||||
if rtok:
|
||||
for record2 in token_index.get(rtok, []):
|
||||
candidate_records[record2['unique_id']] = record2
|
||||
|
||||
if not candidate_records:
|
||||
continue
|
||||
|
||||
for record2 in candidate_records.values():
|
||||
# Vergleiche nicht mit sich selbst
|
||||
if record1['unique_id'] == record2['unique_id']:
|
||||
continue
|
||||
|
||||
# Verhindere doppelte Vergleiche (A,B) vs (B,A)
|
||||
pair_key = tuple(sorted((record1['unique_id'], record2['unique_id'])))
|
||||
if pair_key in processed_pairs:
|
||||
continue
|
||||
processed_pairs.add(pair_key)
|
||||
|
||||
score, comp = calculate_similarity(record1, record2, token_freq)
|
||||
|
||||
# Akzeptanzlogik (hier könnte man den Threshold anpassen)
|
||||
if score >= SCORE_THRESHOLD:
|
||||
pair_info = {
|
||||
'id1': record1['unique_id'], 'name1': record1['CRM Name'],
|
||||
'id2': record2['unique_id'], 'name2': record2['CRM Name'],
|
||||
'score': score,
|
||||
'details': str(comp)
|
||||
}
|
||||
found_pairs.append(pair_info)
|
||||
logger.info(f" -> Potenzielles Duplikat gefunden: '{record1['CRM Name']}' <-> '{record2['CRM Name']}' (Score: {score})")
|
||||
|
||||
logger.info("\n===== Interner Abgleich abgeschlossen ====")
|
||||
logger.info(f"Insgesamt {len(found_pairs)} potenzielle Duplikatspaare gefunden.")
|
||||
|
||||
if not found_pairs:
|
||||
logger.info("Keine weiteren Schritte nötig.")
|
||||
return
|
||||
|
||||
groups = group_duplicate_pairs(found_pairs)
|
||||
logger.info(f"{len(groups)} eindeutige Duplikatsgruppen gebildet.")
|
||||
|
||||
if not groups:
|
||||
logger.info("Keine Duplikate gefunden, die geschrieben werden müssen.")
|
||||
return
|
||||
|
||||
# Schritt 4: IDs zuweisen und in Tabelle schreiben
|
||||
crm_df['Duplicate_ID'] = ''
|
||||
dup_counter = 1
|
||||
for group in groups:
|
||||
dup_id = f"Dup_{dup_counter:04d}"
|
||||
dup_counter += 1
|
||||
# IDs der Gruppe im DataFrame aktualisieren
|
||||
crm_df.loc[crm_df['unique_id'].isin(group), 'Duplicate_ID'] = dup_id
|
||||
|
||||
# Namen der Gruppenmitglieder für Log-Ausgabe sammeln
|
||||
member_names = crm_df[crm_df['unique_id'].isin(group)]['CRM Name'].tolist()
|
||||
logger.info(f"Gruppe {dup_id}: {member_names}")
|
||||
|
||||
# Bereinigen der Hilfsspalten vor dem Schreiben
|
||||
crm_df.drop(columns=['unique_id', 'normalized_name', 'normalized_domain', 'domain_use_flag'], inplace=True)
|
||||
|
||||
# Ergebnisse zurückschreiben
|
||||
logger.info("Schreibe Ergebnisse mit Duplikats-IDs ins Sheet...")
|
||||
backup_path = os.path.join(LOG_DIR, f"{now}_backup_internal_{CRM_SHEET_NAME}.csv")
|
||||
try:
|
||||
crm_df.to_csv(backup_path, index=False, encoding='utf-8')
|
||||
logger.info(f"Lokales Backup geschrieben: {backup_path}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Backup fehlgeschlagen: {e}")
|
||||
|
||||
data = [crm_df.columns.tolist()] + crm_df.fillna('').values.tolist()
|
||||
ok = sheet.clear_and_write_data(CRM_SHEET_NAME, data)
|
||||
if ok:
|
||||
logger.info("Ergebnisse erfolgreich ins Google Sheet geschrieben.")
|
||||
else:
|
||||
logger.error("Fehler beim Schreiben der Ergebnisse ins Google Sheet.")
|
||||
|
||||
|
||||
def group_duplicate_pairs(pairs: list) -> list:
|
||||
"""Fasst eine Liste von Duplikatspaaren zu Gruppen zusammen."""
|
||||
groups = []
|
||||
for pair in pairs:
|
||||
id1, id2 = pair['id1'], pair['id2']
|
||||
group1_found = None
|
||||
group2_found = None
|
||||
for group in groups:
|
||||
if id1 in group:
|
||||
group1_found = group
|
||||
if id2 in group:
|
||||
group2_found = group
|
||||
|
||||
if group1_found and group2_found:
|
||||
if group1_found is not group2_found: # Zwei unterschiedliche Gruppen verschmelzen
|
||||
group1_found.update(group2_found)
|
||||
groups.remove(group2_found)
|
||||
elif group1_found: # Zu Gruppe 1 hinzufügen
|
||||
group1_found.add(id2)
|
||||
elif group2_found: # Zu Gruppe 2 hinzufügen
|
||||
group2_found.add(id1)
|
||||
else: # Neue Gruppe erstellen
|
||||
groups.append({id1, id2})
|
||||
|
||||
return [set(g) for g in groups]
|
||||
|
||||
|
||||
def run_external_comparison():
|
||||
"""Führt den Vergleich zwischen CRM_Accounts und Matching_Accounts durch."""
|
||||
logger.info("Modus 'Externer Vergleich' gewählt.")
|
||||
try:
|
||||
sheet = GoogleSheetHandler()
|
||||
logger.info("GoogleSheetHandler initialisiert")
|
||||
@@ -296,13 +467,6 @@ def main():
|
||||
match_df['domain_use_flag'] = match_df.apply(_domain_use, axis=1)
|
||||
|
||||
# City-Tokens dynamisch bauen (nach Normalisierung von Ort)
|
||||
def build_city_tokens(crm_df, match_df):
|
||||
cities = set()
|
||||
for s in pd.concat([crm_df['CRM Ort'], match_df['CRM Ort']], ignore_index=True).dropna().unique():
|
||||
for t in _tokenize(s):
|
||||
if len(t) >= 3:
|
||||
cities.add(t)
|
||||
return cities
|
||||
global CITY_TOKENS
|
||||
CITY_TOKENS = build_city_tokens(crm_df, match_df)
|
||||
logger.info(f"City tokens gesammelt: {len(CITY_TOKENS)}")
|
||||
@@ -446,5 +610,25 @@ def main():
|
||||
logger.info(f"Serp Vertrauen: {dict(serp_counts)}")
|
||||
logger.info(f"Config: TH={SCORE_THRESHOLD}, TH_WEAK={SCORE_THRESHOLD_WEAK}, MIN_NAME_FOR_DOMAIN={MIN_NAME_FOR_DOMAIN}, Penalties(city={CITY_MISMATCH_PENALTY},country={COUNTRY_MISMATCH_PENALTY}), Prefilter(partial>={PREFILTER_MIN_PARTIAL}, limit={PREFILTER_LIMIT})")
|
||||
|
||||
|
||||
# --- Hauptfunktion ---
|
||||
def main():
|
||||
logger.info("Starte Duplikats-Check v3.0")
|
||||
|
||||
while True:
|
||||
print("\nBitte wählen Sie den gewünschten Modus:")
|
||||
print("1: Externer Vergleich (gleicht CRM_Accounts mit Matching_Accounts ab)")
|
||||
print("2: Interne Deduplizierung (findet Duplikate innerhalb von CRM_Accounts)")
|
||||
choice = input("Ihre Wahl (1 oder 2): ")
|
||||
|
||||
if choice == '1':
|
||||
run_external_comparison()
|
||||
break
|
||||
elif choice == '2':
|
||||
run_internal_deduplication()
|
||||
break
|
||||
else:
|
||||
print("Ungültige Eingabe. Bitte geben Sie 1 oder 2 ein.")
|
||||
|
||||
if __name__=='__main__':
|
||||
main()
|
||||
main()
|
||||
Reference in New Issue
Block a user