v1.1.0 - Intelligente Lernfunktion & Verbessertes Logging

- Erweitertes Logging mit DEBUG-Level und Logfile-Erstellung.
- Zusammenfassende Statistik der Department-Zuweisungen am Ende des Laufs.
- NEU: Stufe 3 - KI-Klassifizierung für unklare Fälle ('Undefined').
- Jobtitel, die nicht durch Stufe 1 oder 2 zugeordnet werden können, werden an die OpenAI API zur Klassifizierung gesendet.
- NEU: Lern-Mechanismus - Die von der KI ermittelten Zuordnungen werden automatisch in das 'CRM_Jobtitles'-Sheet zurückgeschrieben.
- Das System verbessert sich dadurch selbst für zukünftige Durchläufe.
This commit is contained in:
2025-09-18 07:37:16 +00:00
parent 8c4df42e29
commit 5f43ebde38

View File

@@ -1,6 +1,6 @@
# contact_grouping.py
__version__ = "v1.0.0"
__version__ = "v1.1.0"
import logging
import json
@@ -10,21 +10,44 @@ import pandas as pd
# Importiere die existierenden, robusten Handler und Konfigurationen
from google_sheet_handler import GoogleSheetHandler
# NEU: Import von Hilfsfunktionen für Logging und API-Aufrufe
from helpers import create_log_filename, call_openai_chat
from config import LOG_DIR
# --- Konfiguration ---
# Name des Tabellenblatts, das die zu matchenden Kontakte enthält
TARGET_SHEET_NAME = "Matching_Positions"
# Name des Tabellenblatts, das als "Single Source of Truth" für das Lernen dient
LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles"
# Namen der zu ladenden Wissensbasis-Dateien
EXACT_MATCH_FILE = "exact_match_map.json"
KEYWORD_RULES_FILE = "keyword_rules.json"
# Standard-Department, falls keine Zuordnung möglich ist
DEFAULT_DEPARTMENT = "Undefined"
def setup_logging():
"""Konfiguriert das Logging, um sowohl in der Konsole als auch in einer Datei zu loggen."""
log_filename = create_log_filename("contact_grouping")
log_level = logging.DEBUG # NEU: Auf DEBUG geändert für detailliertere Ausgaben
# Root-Logger konfigurieren
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename, encoding='utf-8'),
logging.StreamHandler()
]
)
logging.getLogger("gspread").setLevel(logging.WARNING)
logging.getLogger("oauth2client").setLevel(logging.WARNING)
logging.info(f"Logging initialisiert. Log-Datei: {log_filename}")
class ContactGrouper:
"""
Kapselt die Logik zur automatischen Gruppierung von Kontakten
basierend auf ihrem Jobtitel.
basierend auf ihrem Jobtitel. Inklusive Lernfunktion via KI.
"""
def __init__(self):
self.logger = logging.getLogger(__name__ + ".ContactGrouper")
@@ -63,88 +86,174 @@ class ContactGrouper:
# --- Stufe 1: Exakter Match ---
exact_match = self.exact_match_map.get(normalized_title)
if exact_match:
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Exakter Match)")
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1: Exakter Match)")
return exact_match
# --- Stufe 2: Keyword-basierter Match ---
# Zerlege den Jobtitel in einzigartige Wörter (Tokens)
title_tokens = set(re.split(r'[\s/(),-]+', normalized_title))
scores = {}
for department, rules in self.keyword_rules.items():
# Zähle, wie viele der Department-Keywords im Jobtitel vorkommen
matches = title_tokens.intersection(rules.get("keywords", []))
if matches:
scores[department] = len(matches)
if not scores:
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Keine Keywords gefunden)")
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine Keywords gefunden)")
return DEFAULT_DEPARTMENT
# Finde die Departments mit der höchsten Trefferanzahl
max_score = max(scores.values())
top_departments = [dept for dept, score in scores.items() if score == max_score]
# Wenn es nur ein Department mit der höchsten Punktzahl gibt, ist es der Gewinner
if len(top_departments) == 1:
winner = top_departments[0]
self.logger.debug(f"'{job_title}' -> '{winner}' (Keyword Match: Score {max_score})")
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score})")
return winner
# --- Tie-Breaker: Priorität ---
# Wenn mehrere Departments die gleiche Punktzahl haben, gewinnt das mit der höchsten Priorität (niedrigste Prio-Zahl)
best_priority = float('inf')
winner = top_departments[0] # Fallback
winner = top_departments[0]
for department in top_departments:
priority = self.keyword_rules[department].get("priority", 99)
if priority < best_priority:
best_priority = priority
winner = department
self.logger.debug(f"'{job_title}' -> '{winner}' (Keyword Match: Score {max_score}, Prio-Tiebreak: {best_priority})")
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score}, Prio-Tiebreak: {best_priority})")
return winner
def _get_ai_classification(self, job_titles_to_classify):
"""
Sendet eine Liste von Jobtiteln an die OpenAI API zur Klassifizierung.
"""
self.logger.info(f"Starte Stufe 3: Sende {len(job_titles_to_classify)} 'Undefined' Jobtitel zur KI-Klassifizierung...")
if not job_titles_to_classify:
return {}
valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT])
prompt_parts = [
"Du bist ein HR-Experte, der Jobtitel präzise vordefinierten Abteilungen zuordnet.",
"Analysiere die folgende Liste von Jobtiteln.",
"Ordne JEDEN Jobtitel EINER der folgenden gültigen Abteilungen zu:",
", ".join(valid_departments),
"\nGib deine Antwort als valides JSON-Array von Objekten zurück, wobei jedes Objekt die Schlüssel 'job_title' und 'department' hat.",
"Beispiel: [{\"job_title\": \"Head of Fleet Management\", \"department\": \"Fuhrparkmanagement\"}]",
"\n--- Zu klassifizierende Jobtitel ---",
json.dumps(job_titles_to_classify, ensure_ascii=False)
]
prompt = "\n".join(prompt_parts)
try:
# Wir nutzen die call_openai_chat Funktion aus helpers.py
response_str = call_openai_chat(prompt, temperature=0.0, model="gpt-4o-mini", response_format_json=True)
# Robuste JSON-Extraktion
json_start = response_str.find('[')
json_end = response_str.rfind(']')
if json_start == -1 or json_end == -1:
raise json.JSONDecodeError("Kein JSON-Array in der Antwort gefunden.", response_str, 0)
json_str = response_str[json_start : json_end + 1]
results_list = json.loads(json_str)
# Konvertiere die Liste in ein Dictionary für einfaches Nachschlagen
classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments}
self.logger.info(f"{len(classified_map)} Jobtitel erfolgreich von der KI klassifiziert.")
return classified_map
except Exception as e:
self.logger.error(f"Fehler bei der KI-Klassifizierung: {e}")
return {}
def _append_learnings_to_source(self, gsh, new_mappings_df):
"""
Hängt die neu gelernten Mappings an das 'CRM_Jobtitles'-Sheet an.
"""
if new_mappings_df.empty:
return
self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...")
# Stelle sicher, dass das DataFrame die Spalten "Job Title" und "Department" hat
if "Job Title" not in new_mappings_df.columns or "Department" not in new_mappings_df.columns:
self.logger.error("Fehler im Lern-Mechanismus: DataFrame hat nicht die erwarteten Spalten.")
return
# Konvertiere das DataFrame in eine Liste von Listen für den Upload
rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist()
success = gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append)
if success:
self.logger.info("Lern-Daten erfolgreich an die Wissensbasis angehängt.")
else:
self.logger.error("Fehler beim Anhängen der Lern-Daten an die Wissensbasis.")
def process_contacts(self):
"""
Orchestriert den gesamten Prozess: Daten laden, zuordnen und zurückschreiben.
Orchestriert den gesamten Prozess: Daten laden, zuordnen, KI anreichern, lernen und zurückschreiben.
"""
self.logger.info(f"Starte Kontakt-Gruppierung (Version {__version__})...")
if self.exact_match_map is None or self.keyword_rules is None:
self.logger.error("Verarbeitung abgebrochen, da Wissensbasis nicht geladen werden konnte.")
return
# 1. Daten aus Google Sheet laden
gsh = GoogleSheetHandler()
df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME)
if df is None:
self.logger.critical("Konnte Daten nicht laden. Verarbeitung abgebrochen.")
return
if df.empty:
self.logger.warning("Tabellenblatt 'Matching_Positions' ist leer. Es gibt nichts zu tun.")
if df is None or df.empty:
self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer oder konnte nicht geladen werden. Nichts zu tun.")
return
df.columns = [col.strip() for col in df.columns]
if "Job Title" not in df.columns:
self.logger.critical("Benötigte Spalte 'Job Title' in 'Matching_Positions' nicht gefunden. Abbruch.")
self.logger.critical(f"Benötigte Spalte 'Job Title' in '{TARGET_SHEET_NAME}' nicht gefunden. Abbruch.")
return
# Original Jobtitel für späteres Lernen speichern
df['Original Job Title'] = df['Job Title']
self.logger.info(f"{len(df)} Kontakte aus '{TARGET_SHEET_NAME}' zum Verarbeiten geladen.")
# 2. Zuordnung für jeden Jobtitel durchführen
# Sicherstellen, dass die Department-Spalte existiert
if "Department" not in df.columns:
df["Department"] = ""
# Stufe 1 & 2: Zuordnung durchführen
if "Department" not in df.columns: df["Department"] = ""
df['Department'] = df['Job Title'].apply(self._find_best_match)
# Stufe 3: KI-Klassifizierung für 'Undefined' Fälle
undefined_df = df[df['Department'] == DEFAULT_DEPARTMENT]
if not undefined_df.empty:
titles_to_classify = undefined_df['Job Title'].unique().tolist()
ai_results_map = self._get_ai_classification(titles_to_classify)
# Wende die KI-Ergebnisse an
df['Department'] = df.apply(
lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'],
axis=1
)
# Lern-Mechanismus: Neue Erkenntnisse für die Zukunft speichern
# Wir erstellen ein neues DataFrame mit den Originaltiteln und den KI-Departments
new_learnings = []
for title, dept in ai_results_map.items():
new_learnings.append({'Job Title': title, 'Department': dept})
if new_learnings:
new_learnings_df = pd.DataFrame(new_learnings)
self._append_learnings_to_source(gsh, new_learnings_df)
self.logger.info("Zuordnung abgeschlossen. Bereite das Schreiben der Ergebnisse vor...")
# 3. Ergebnisse zurück in das Google Sheet schreiben
# Erstelle eine Liste von Listen, inklusive der Header-Zeile
output_data = [df.columns.values.tolist()] + df.values.tolist()
# --- NEU: Zusammenfassende Statistik ---
self.logger.info("--- Zuordnungs-Statistik ---")
stats = df['Department'].value_counts()
for department, count in stats.items():
self.logger.info(f"- {department}: {count} Zuordnungen")
self.logger.info(f"GESAMT: {len(df)} Jobtitel verarbeitet.")
self.logger.info("--------------------------")
# Ergebnisse zurück in das Google Sheet schreiben (nur die Originalspalten)
output_df = df.drop(columns=['Original Job Title'])
output_data = [output_df.columns.values.tolist()] + output_df.values.tolist()
success = gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data)
@@ -155,6 +264,7 @@ class ContactGrouper:
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# NEU: Logging wird zentral am Anfang konfiguriert
setup_logging()
grouper = ContactGrouper()
grouper.process_contacts()