contact_grouping.py hinzugefügt

This commit is contained in:
2025-09-18 07:28:31 +00:00
parent 248ab1787b
commit 63d2388bf8

160
contact_grouping.py Normal file
View File

@@ -0,0 +1,160 @@
# contact_grouping.py
__version__ = "v1.0.0"
import logging
import json
import re
import os
import pandas as pd
# Importiere die existierenden, robusten Handler und Konfigurationen
from google_sheet_handler import GoogleSheetHandler
# --- Konfiguration ---
# Name des Tabellenblatts, das die zu matchenden Kontakte enthält
TARGET_SHEET_NAME = "Matching_Positions"
# Namen der zu ladenden Wissensbasis-Dateien
EXACT_MATCH_FILE = "exact_match_map.json"
KEYWORD_RULES_FILE = "keyword_rules.json"
# Standard-Department, falls keine Zuordnung möglich ist
DEFAULT_DEPARTMENT = "Undefined"
class ContactGrouper:
"""
Kapselt die Logik zur automatischen Gruppierung von Kontakten
basierend auf ihrem Jobtitel.
"""
def __init__(self):
self.logger = logging.getLogger(__name__ + ".ContactGrouper")
self.exact_match_map = self._load_json(EXACT_MATCH_FILE)
self.keyword_rules = self._load_json(KEYWORD_RULES_FILE)
def _load_json(self, file_path):
"""Lädt eine JSON-Datei und gibt den Inhalt als Dictionary zurück."""
if not os.path.exists(file_path):
self.logger.critical(f"Wissensbasis-Datei '{file_path}' nicht gefunden. Bitte zuerst 'knowledge_base_builder.py' ausführen.")
return None
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.logger.info(f"Lade Wissensbasis aus '{file_path}'...")
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
self.logger.critical(f"Fehler beim Laden oder Parsen der Datei '{file_path}': {e}")
return None
def _normalize_job_title(self, job_title):
"""Bereinigt und normalisiert einen Jobtitel für den Abgleich."""
if not isinstance(job_title, str):
return ""
return job_title.lower().strip()
def _find_best_match(self, job_title):
"""
Führt den mehrstufigen Matching-Algorithmus aus.
Stufe 1: Exakter Match.
Stufe 2: Keyword-basierter Match mit Priorisierung.
"""
normalized_title = self._normalize_job_title(job_title)
if not normalized_title:
return DEFAULT_DEPARTMENT
# --- Stufe 1: Exakter Match ---
exact_match = self.exact_match_map.get(normalized_title)
if exact_match:
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Exakter Match)")
return exact_match
# --- Stufe 2: Keyword-basierter Match ---
# Zerlege den Jobtitel in einzigartige Wörter (Tokens)
title_tokens = set(re.split(r'[\s/(),-]+', normalized_title))
scores = {}
for department, rules in self.keyword_rules.items():
# Zähle, wie viele der Department-Keywords im Jobtitel vorkommen
matches = title_tokens.intersection(rules.get("keywords", []))
if matches:
scores[department] = len(matches)
if not scores:
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Keine Keywords gefunden)")
return DEFAULT_DEPARTMENT
# Finde die Departments mit der höchsten Trefferanzahl
max_score = max(scores.values())
top_departments = [dept for dept, score in scores.items() if score == max_score]
# Wenn es nur ein Department mit der höchsten Punktzahl gibt, ist es der Gewinner
if len(top_departments) == 1:
winner = top_departments[0]
self.logger.debug(f"'{job_title}' -> '{winner}' (Keyword Match: Score {max_score})")
return winner
# --- Tie-Breaker: Priorität ---
# Wenn mehrere Departments die gleiche Punktzahl haben, gewinnt das mit der höchsten Priorität (niedrigste Prio-Zahl)
best_priority = float('inf')
winner = top_departments[0] # Fallback
for department in top_departments:
priority = self.keyword_rules[department].get("priority", 99)
if priority < best_priority:
best_priority = priority
winner = department
self.logger.debug(f"'{job_title}' -> '{winner}' (Keyword Match: Score {max_score}, Prio-Tiebreak: {best_priority})")
return winner
def process_contacts(self):
"""
Orchestriert den gesamten Prozess: Daten laden, zuordnen und zurückschreiben.
"""
self.logger.info(f"Starte Kontakt-Gruppierung (Version {__version__})...")
if self.exact_match_map is None or self.keyword_rules is None:
self.logger.error("Verarbeitung abgebrochen, da Wissensbasis nicht geladen werden konnte.")
return
# 1. Daten aus Google Sheet laden
gsh = GoogleSheetHandler()
df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME)
if df is None:
self.logger.critical("Konnte Daten nicht laden. Verarbeitung abgebrochen.")
return
if df.empty:
self.logger.warning("Tabellenblatt 'Matching_Positions' ist leer. Es gibt nichts zu tun.")
return
df.columns = [col.strip() for col in df.columns]
if "Job Title" not in df.columns:
self.logger.critical("Benötigte Spalte 'Job Title' in 'Matching_Positions' nicht gefunden. Abbruch.")
return
self.logger.info(f"{len(df)} Kontakte aus '{TARGET_SHEET_NAME}' zum Verarbeiten geladen.")
# 2. Zuordnung für jeden Jobtitel durchführen
# Sicherstellen, dass die Department-Spalte existiert
if "Department" not in df.columns:
df["Department"] = ""
df['Department'] = df['Job Title'].apply(self._find_best_match)
self.logger.info("Zuordnung abgeschlossen. Bereite das Schreiben der Ergebnisse vor...")
# 3. Ergebnisse zurück in das Google Sheet schreiben
# Erstelle eine Liste von Listen, inklusive der Header-Zeile
output_data = [df.columns.values.tolist()] + df.values.tolist()
success = gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data)
if success:
self.logger.info(f"Erfolgreich {len(df)} Kontakte verarbeitet und Ergebnisse in '{TARGET_SHEET_NAME}' geschrieben.")
else:
self.logger.error("Ein Fehler ist beim Zurückschreiben der Daten in das Google Sheet aufgetreten.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
grouper = ContactGrouper()
grouper.process_contacts()