v1.1.0 - Intelligente Lernfunktion & Verbessertes Logging

- Erweitertes Logging mit DEBUG-Level und Logfile-Erstellung.
- Zusammenfassende Statistik der Department-Zuweisungen am Ende des Laufs.
- NEU: Stufe 3 - KI-Klassifizierung für unklare Fälle ('Undefined').
- Jobtitel, die nicht durch Stufe 1 oder 2 zugeordnet werden können, werden an die OpenAI API zur Klassifizierung gesendet.
- NEU: Lern-Mechanismus - Die von der KI ermittelten Zuordnungen werden automatisch in das 'CRM_Jobtitles'-Sheet zurückgeschrieben.
- Das System verbessert sich dadurch selbst für zukünftige Durchläufe.
This commit is contained in:
2025-09-18 07:37:16 +00:00
parent 67a486e207
commit f343599d59

View File

@@ -1,6 +1,6 @@
# contact_grouping.py # contact_grouping.py
__version__ = "v1.0.0" __version__ = "v1.1.0"
import logging import logging
import json import json
@@ -10,21 +10,44 @@ import pandas as pd
# Importiere die existierenden, robusten Handler und Konfigurationen # Importiere die existierenden, robusten Handler und Konfigurationen
from google_sheet_handler import GoogleSheetHandler from google_sheet_handler import GoogleSheetHandler
# NEU: Import von Hilfsfunktionen für Logging und API-Aufrufe
from helpers import create_log_filename, call_openai_chat
from config import LOG_DIR
# --- Konfiguration --- # --- Konfiguration ---
# Name des Tabellenblatts, das die zu matchenden Kontakte enthält # Name des Tabellenblatts, das die zu matchenden Kontakte enthält
TARGET_SHEET_NAME = "Matching_Positions" TARGET_SHEET_NAME = "Matching_Positions"
# Name des Tabellenblatts, das als "Single Source of Truth" für das Lernen dient
LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles"
# Namen der zu ladenden Wissensbasis-Dateien # Namen der zu ladenden Wissensbasis-Dateien
EXACT_MATCH_FILE = "exact_match_map.json" EXACT_MATCH_FILE = "exact_match_map.json"
KEYWORD_RULES_FILE = "keyword_rules.json" KEYWORD_RULES_FILE = "keyword_rules.json"
# Standard-Department, falls keine Zuordnung möglich ist # Standard-Department, falls keine Zuordnung möglich ist
DEFAULT_DEPARTMENT = "Undefined" DEFAULT_DEPARTMENT = "Undefined"
def setup_logging():
"""Konfiguriert das Logging, um sowohl in der Konsole als auch in einer Datei zu loggen."""
log_filename = create_log_filename("contact_grouping")
log_level = logging.DEBUG # NEU: Auf DEBUG geändert für detailliertere Ausgaben
# Root-Logger konfigurieren
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename, encoding='utf-8'),
logging.StreamHandler()
]
)
logging.getLogger("gspread").setLevel(logging.WARNING)
logging.getLogger("oauth2client").setLevel(logging.WARNING)
logging.info(f"Logging initialisiert. Log-Datei: {log_filename}")
class ContactGrouper: class ContactGrouper:
""" """
Kapselt die Logik zur automatischen Gruppierung von Kontakten Kapselt die Logik zur automatischen Gruppierung von Kontakten
basierend auf ihrem Jobtitel. basierend auf ihrem Jobtitel. Inklusive Lernfunktion via KI.
""" """
def __init__(self): def __init__(self):
self.logger = logging.getLogger(__name__ + ".ContactGrouper") self.logger = logging.getLogger(__name__ + ".ContactGrouper")
@@ -63,88 +86,174 @@ class ContactGrouper:
# --- Stufe 1: Exakter Match --- # --- Stufe 1: Exakter Match ---
exact_match = self.exact_match_map.get(normalized_title) exact_match = self.exact_match_map.get(normalized_title)
if exact_match: if exact_match:
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Exakter Match)") self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1: Exakter Match)")
return exact_match return exact_match
# --- Stufe 2: Keyword-basierter Match --- # --- Stufe 2: Keyword-basierter Match ---
# Zerlege den Jobtitel in einzigartige Wörter (Tokens)
title_tokens = set(re.split(r'[\s/(),-]+', normalized_title)) title_tokens = set(re.split(r'[\s/(),-]+', normalized_title))
scores = {} scores = {}
for department, rules in self.keyword_rules.items(): for department, rules in self.keyword_rules.items():
# Zähle, wie viele der Department-Keywords im Jobtitel vorkommen
matches = title_tokens.intersection(rules.get("keywords", [])) matches = title_tokens.intersection(rules.get("keywords", []))
if matches: if matches:
scores[department] = len(matches) scores[department] = len(matches)
if not scores: if not scores:
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Keine Keywords gefunden)") self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine Keywords gefunden)")
return DEFAULT_DEPARTMENT return DEFAULT_DEPARTMENT
# Finde die Departments mit der höchsten Trefferanzahl
max_score = max(scores.values()) max_score = max(scores.values())
top_departments = [dept for dept, score in scores.items() if score == max_score] top_departments = [dept for dept, score in scores.items() if score == max_score]
# Wenn es nur ein Department mit der höchsten Punktzahl gibt, ist es der Gewinner
if len(top_departments) == 1: if len(top_departments) == 1:
winner = top_departments[0] winner = top_departments[0]
self.logger.debug(f"'{job_title}' -> '{winner}' (Keyword Match: Score {max_score})") self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score})")
return winner return winner
# --- Tie-Breaker: Priorität ---
# Wenn mehrere Departments die gleiche Punktzahl haben, gewinnt das mit der höchsten Priorität (niedrigste Prio-Zahl)
best_priority = float('inf') best_priority = float('inf')
winner = top_departments[0] # Fallback winner = top_departments[0]
for department in top_departments: for department in top_departments:
priority = self.keyword_rules[department].get("priority", 99) priority = self.keyword_rules[department].get("priority", 99)
if priority < best_priority: if priority < best_priority:
best_priority = priority best_priority = priority
winner = department winner = department
self.logger.debug(f"'{job_title}' -> '{winner}' (Keyword Match: Score {max_score}, Prio-Tiebreak: {best_priority})") self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score}, Prio-Tiebreak: {best_priority})")
return winner return winner
def _get_ai_classification(self, job_titles_to_classify):
"""
Sendet eine Liste von Jobtiteln an die OpenAI API zur Klassifizierung.
"""
self.logger.info(f"Starte Stufe 3: Sende {len(job_titles_to_classify)} 'Undefined' Jobtitel zur KI-Klassifizierung...")
if not job_titles_to_classify:
return {}
valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT])
prompt_parts = [
"Du bist ein HR-Experte, der Jobtitel präzise vordefinierten Abteilungen zuordnet.",
"Analysiere die folgende Liste von Jobtiteln.",
"Ordne JEDEN Jobtitel EINER der folgenden gültigen Abteilungen zu:",
", ".join(valid_departments),
"\nGib deine Antwort als valides JSON-Array von Objekten zurück, wobei jedes Objekt die Schlüssel 'job_title' und 'department' hat.",
"Beispiel: [{\"job_title\": \"Head of Fleet Management\", \"department\": \"Fuhrparkmanagement\"}]",
"\n--- Zu klassifizierende Jobtitel ---",
json.dumps(job_titles_to_classify, ensure_ascii=False)
]
prompt = "\n".join(prompt_parts)
try:
# Wir nutzen die call_openai_chat Funktion aus helpers.py
response_str = call_openai_chat(prompt, temperature=0.0, model="gpt-4o-mini", response_format_json=True)
# Robuste JSON-Extraktion
json_start = response_str.find('[')
json_end = response_str.rfind(']')
if json_start == -1 or json_end == -1:
raise json.JSONDecodeError("Kein JSON-Array in der Antwort gefunden.", response_str, 0)
json_str = response_str[json_start : json_end + 1]
results_list = json.loads(json_str)
# Konvertiere die Liste in ein Dictionary für einfaches Nachschlagen
classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments}
self.logger.info(f"{len(classified_map)} Jobtitel erfolgreich von der KI klassifiziert.")
return classified_map
except Exception as e:
self.logger.error(f"Fehler bei der KI-Klassifizierung: {e}")
return {}
def _append_learnings_to_source(self, gsh, new_mappings_df):
"""
Hängt die neu gelernten Mappings an das 'CRM_Jobtitles'-Sheet an.
"""
if new_mappings_df.empty:
return
self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...")
# Stelle sicher, dass das DataFrame die Spalten "Job Title" und "Department" hat
if "Job Title" not in new_mappings_df.columns or "Department" not in new_mappings_df.columns:
self.logger.error("Fehler im Lern-Mechanismus: DataFrame hat nicht die erwarteten Spalten.")
return
# Konvertiere das DataFrame in eine Liste von Listen für den Upload
rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist()
success = gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append)
if success:
self.logger.info("Lern-Daten erfolgreich an die Wissensbasis angehängt.")
else:
self.logger.error("Fehler beim Anhängen der Lern-Daten an die Wissensbasis.")
def process_contacts(self): def process_contacts(self):
""" """
Orchestriert den gesamten Prozess: Daten laden, zuordnen und zurückschreiben. Orchestriert den gesamten Prozess: Daten laden, zuordnen, KI anreichern, lernen und zurückschreiben.
""" """
self.logger.info(f"Starte Kontakt-Gruppierung (Version {__version__})...") self.logger.info(f"Starte Kontakt-Gruppierung (Version {__version__})...")
if self.exact_match_map is None or self.keyword_rules is None: if self.exact_match_map is None or self.keyword_rules is None:
self.logger.error("Verarbeitung abgebrochen, da Wissensbasis nicht geladen werden konnte.") self.logger.error("Verarbeitung abgebrochen, da Wissensbasis nicht geladen werden konnte.")
return return
# 1. Daten aus Google Sheet laden
gsh = GoogleSheetHandler() gsh = GoogleSheetHandler()
df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME) df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME)
if df is None: if df is None or df.empty:
self.logger.critical("Konnte Daten nicht laden. Verarbeitung abgebrochen.") self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer oder konnte nicht geladen werden. Nichts zu tun.")
return
if df.empty:
self.logger.warning("Tabellenblatt 'Matching_Positions' ist leer. Es gibt nichts zu tun.")
return return
df.columns = [col.strip() for col in df.columns] df.columns = [col.strip() for col in df.columns]
if "Job Title" not in df.columns: if "Job Title" not in df.columns:
self.logger.critical("Benötigte Spalte 'Job Title' in 'Matching_Positions' nicht gefunden. Abbruch.") self.logger.critical(f"Benötigte Spalte 'Job Title' in '{TARGET_SHEET_NAME}' nicht gefunden. Abbruch.")
return return
# Original Jobtitel für späteres Lernen speichern
df['Original Job Title'] = df['Job Title']
self.logger.info(f"{len(df)} Kontakte aus '{TARGET_SHEET_NAME}' zum Verarbeiten geladen.") self.logger.info(f"{len(df)} Kontakte aus '{TARGET_SHEET_NAME}' zum Verarbeiten geladen.")
# 2. Zuordnung für jeden Jobtitel durchführen # Stufe 1 & 2: Zuordnung durchführen
# Sicherstellen, dass die Department-Spalte existiert if "Department" not in df.columns: df["Department"] = ""
if "Department" not in df.columns:
df["Department"] = ""
df['Department'] = df['Job Title'].apply(self._find_best_match) df['Department'] = df['Job Title'].apply(self._find_best_match)
# Stufe 3: KI-Klassifizierung für 'Undefined' Fälle
undefined_df = df[df['Department'] == DEFAULT_DEPARTMENT]
if not undefined_df.empty:
titles_to_classify = undefined_df['Job Title'].unique().tolist()
ai_results_map = self._get_ai_classification(titles_to_classify)
# Wende die KI-Ergebnisse an
df['Department'] = df.apply(
lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'],
axis=1
)
# Lern-Mechanismus: Neue Erkenntnisse für die Zukunft speichern
# Wir erstellen ein neues DataFrame mit den Originaltiteln und den KI-Departments
new_learnings = []
for title, dept in ai_results_map.items():
new_learnings.append({'Job Title': title, 'Department': dept})
if new_learnings:
new_learnings_df = pd.DataFrame(new_learnings)
self._append_learnings_to_source(gsh, new_learnings_df)
self.logger.info("Zuordnung abgeschlossen. Bereite das Schreiben der Ergebnisse vor...") self.logger.info("Zuordnung abgeschlossen. Bereite das Schreiben der Ergebnisse vor...")
# 3. Ergebnisse zurück in das Google Sheet schreiben # --- NEU: Zusammenfassende Statistik ---
# Erstelle eine Liste von Listen, inklusive der Header-Zeile self.logger.info("--- Zuordnungs-Statistik ---")
output_data = [df.columns.values.tolist()] + df.values.tolist() stats = df['Department'].value_counts()
for department, count in stats.items():
self.logger.info(f"- {department}: {count} Zuordnungen")
self.logger.info(f"GESAMT: {len(df)} Jobtitel verarbeitet.")
self.logger.info("--------------------------")
# Ergebnisse zurück in das Google Sheet schreiben (nur die Originalspalten)
output_df = df.drop(columns=['Original Job Title'])
output_data = [output_df.columns.values.tolist()] + output_df.values.tolist()
success = gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data) success = gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data)
@@ -155,6 +264,7 @@ class ContactGrouper:
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # NEU: Logging wird zentral am Anfang konfiguriert
setup_logging()
grouper = ContactGrouper() grouper = ContactGrouper()
grouper.process_contacts() grouper.process_contacts()