contact_grouping.py hinzugefügt
This commit is contained in:
160
contact_grouping.py
Normal file
160
contact_grouping.py
Normal file
@@ -0,0 +1,160 @@
|
||||
# contact_grouping.py
|
||||
|
||||
__version__ = "v1.0.0"
|
||||
|
||||
import logging
|
||||
import json
|
||||
import re
|
||||
import os
|
||||
import pandas as pd
|
||||
|
||||
# Importiere die existierenden, robusten Handler und Konfigurationen
|
||||
from google_sheet_handler import GoogleSheetHandler
|
||||
|
||||
# --- Konfiguration ---
|
||||
# Name des Tabellenblatts, das die zu matchenden Kontakte enthält
|
||||
TARGET_SHEET_NAME = "Matching_Positions"
|
||||
# Namen der zu ladenden Wissensbasis-Dateien
|
||||
EXACT_MATCH_FILE = "exact_match_map.json"
|
||||
KEYWORD_RULES_FILE = "keyword_rules.json"
|
||||
# Standard-Department, falls keine Zuordnung möglich ist
|
||||
DEFAULT_DEPARTMENT = "Undefined"
|
||||
|
||||
|
||||
class ContactGrouper:
|
||||
"""
|
||||
Kapselt die Logik zur automatischen Gruppierung von Kontakten
|
||||
basierend auf ihrem Jobtitel.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger(__name__ + ".ContactGrouper")
|
||||
self.exact_match_map = self._load_json(EXACT_MATCH_FILE)
|
||||
self.keyword_rules = self._load_json(KEYWORD_RULES_FILE)
|
||||
|
||||
def _load_json(self, file_path):
|
||||
"""Lädt eine JSON-Datei und gibt den Inhalt als Dictionary zurück."""
|
||||
if not os.path.exists(file_path):
|
||||
self.logger.critical(f"Wissensbasis-Datei '{file_path}' nicht gefunden. Bitte zuerst 'knowledge_base_builder.py' ausführen.")
|
||||
return None
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
self.logger.info(f"Lade Wissensbasis aus '{file_path}'...")
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError) as e:
|
||||
self.logger.critical(f"Fehler beim Laden oder Parsen der Datei '{file_path}': {e}")
|
||||
return None
|
||||
|
||||
def _normalize_job_title(self, job_title):
|
||||
"""Bereinigt und normalisiert einen Jobtitel für den Abgleich."""
|
||||
if not isinstance(job_title, str):
|
||||
return ""
|
||||
return job_title.lower().strip()
|
||||
|
||||
def _find_best_match(self, job_title):
|
||||
"""
|
||||
Führt den mehrstufigen Matching-Algorithmus aus.
|
||||
Stufe 1: Exakter Match.
|
||||
Stufe 2: Keyword-basierter Match mit Priorisierung.
|
||||
"""
|
||||
normalized_title = self._normalize_job_title(job_title)
|
||||
if not normalized_title:
|
||||
return DEFAULT_DEPARTMENT
|
||||
|
||||
# --- Stufe 1: Exakter Match ---
|
||||
exact_match = self.exact_match_map.get(normalized_title)
|
||||
if exact_match:
|
||||
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Exakter Match)")
|
||||
return exact_match
|
||||
|
||||
# --- Stufe 2: Keyword-basierter Match ---
|
||||
# Zerlege den Jobtitel in einzigartige Wörter (Tokens)
|
||||
title_tokens = set(re.split(r'[\s/(),-]+', normalized_title))
|
||||
|
||||
scores = {}
|
||||
for department, rules in self.keyword_rules.items():
|
||||
# Zähle, wie viele der Department-Keywords im Jobtitel vorkommen
|
||||
matches = title_tokens.intersection(rules.get("keywords", []))
|
||||
if matches:
|
||||
scores[department] = len(matches)
|
||||
|
||||
if not scores:
|
||||
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Keine Keywords gefunden)")
|
||||
return DEFAULT_DEPARTMENT
|
||||
|
||||
# Finde die Departments mit der höchsten Trefferanzahl
|
||||
max_score = max(scores.values())
|
||||
top_departments = [dept for dept, score in scores.items() if score == max_score]
|
||||
|
||||
# Wenn es nur ein Department mit der höchsten Punktzahl gibt, ist es der Gewinner
|
||||
if len(top_departments) == 1:
|
||||
winner = top_departments[0]
|
||||
self.logger.debug(f"'{job_title}' -> '{winner}' (Keyword Match: Score {max_score})")
|
||||
return winner
|
||||
|
||||
# --- Tie-Breaker: Priorität ---
|
||||
# Wenn mehrere Departments die gleiche Punktzahl haben, gewinnt das mit der höchsten Priorität (niedrigste Prio-Zahl)
|
||||
best_priority = float('inf')
|
||||
winner = top_departments[0] # Fallback
|
||||
|
||||
for department in top_departments:
|
||||
priority = self.keyword_rules[department].get("priority", 99)
|
||||
if priority < best_priority:
|
||||
best_priority = priority
|
||||
winner = department
|
||||
|
||||
self.logger.debug(f"'{job_title}' -> '{winner}' (Keyword Match: Score {max_score}, Prio-Tiebreak: {best_priority})")
|
||||
return winner
|
||||
|
||||
|
||||
def process_contacts(self):
|
||||
"""
|
||||
Orchestriert den gesamten Prozess: Daten laden, zuordnen und zurückschreiben.
|
||||
"""
|
||||
self.logger.info(f"Starte Kontakt-Gruppierung (Version {__version__})...")
|
||||
if self.exact_match_map is None or self.keyword_rules is None:
|
||||
self.logger.error("Verarbeitung abgebrochen, da Wissensbasis nicht geladen werden konnte.")
|
||||
return
|
||||
|
||||
# 1. Daten aus Google Sheet laden
|
||||
gsh = GoogleSheetHandler()
|
||||
df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME)
|
||||
|
||||
if df is None:
|
||||
self.logger.critical("Konnte Daten nicht laden. Verarbeitung abgebrochen.")
|
||||
return
|
||||
if df.empty:
|
||||
self.logger.warning("Tabellenblatt 'Matching_Positions' ist leer. Es gibt nichts zu tun.")
|
||||
return
|
||||
|
||||
df.columns = [col.strip() for col in df.columns]
|
||||
if "Job Title" not in df.columns:
|
||||
self.logger.critical("Benötigte Spalte 'Job Title' in 'Matching_Positions' nicht gefunden. Abbruch.")
|
||||
return
|
||||
|
||||
self.logger.info(f"{len(df)} Kontakte aus '{TARGET_SHEET_NAME}' zum Verarbeiten geladen.")
|
||||
|
||||
# 2. Zuordnung für jeden Jobtitel durchführen
|
||||
# Sicherstellen, dass die Department-Spalte existiert
|
||||
if "Department" not in df.columns:
|
||||
df["Department"] = ""
|
||||
|
||||
df['Department'] = df['Job Title'].apply(self._find_best_match)
|
||||
|
||||
self.logger.info("Zuordnung abgeschlossen. Bereite das Schreiben der Ergebnisse vor...")
|
||||
|
||||
# 3. Ergebnisse zurück in das Google Sheet schreiben
|
||||
# Erstelle eine Liste von Listen, inklusive der Header-Zeile
|
||||
output_data = [df.columns.values.tolist()] + df.values.tolist()
|
||||
|
||||
success = gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data)
|
||||
|
||||
if success:
|
||||
self.logger.info(f"Erfolgreich {len(df)} Kontakte verarbeitet und Ergebnisse in '{TARGET_SHEET_NAME}' geschrieben.")
|
||||
else:
|
||||
self.logger.error("Ein Fehler ist beim Zurückschreiben der Daten in das Google Sheet aufgetreten.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
grouper = ContactGrouper()
|
||||
grouper.process_contacts()
|
||||
Reference in New Issue
Block a user