# contact_grouping.py __version__ = "v1.1.1" # Versionsnummer hochgezählt import logging import json import re import os import pandas as pd # Importiere die existierenden, robusten Handler und Konfigurationen from google_sheet_handler import GoogleSheetHandler from helpers import create_log_filename, call_openai_chat from config import LOG_DIR, Config # NEU: Config-Klasse importiert # --- Konfiguration --- # Name des Tabellenblatts, das die zu matchenden Kontakte enthält TARGET_SHEET_NAME = "Matching_Positions" # Name des Tabellenblatts, das als "Single Source of Truth" für das Lernen dient LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles" # Namen der zu ladenden Wissensbasis-Dateien EXACT_MATCH_FILE = "exact_match_map.json" KEYWORD_RULES_FILE = "keyword_rules.json" # Standard-Department, falls keine Zuordnung möglich ist DEFAULT_DEPARTMENT = "Undefined" def setup_logging(): """Konfiguriert das Logging, um sowohl in der Konsole als auch in einer Datei zu loggen.""" log_filename = create_log_filename("contact_grouping") log_level = logging.DEBUG # Root-Logger konfigurieren logging.basicConfig( level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_filename, encoding='utf-8'), logging.StreamHandler() ] ) logging.getLogger("gspread").setLevel(logging.WARNING) logging.getLogger("oauth2client").setLevel(logging.WARNING) logging.info(f"Logging initialisiert. Log-Datei: {log_filename}") class ContactGrouper: """ Kapselt die Logik zur automatischen Gruppierung von Kontakten basierend auf ihrem Jobtitel. Inklusive Lernfunktion via KI. """ def __init__(self): self.logger = logging.getLogger(__name__ + ".ContactGrouper") self.exact_match_map = self._load_json(EXACT_MATCH_FILE) self.keyword_rules = self._load_json(KEYWORD_RULES_FILE) def _load_json(self, file_path): """Lädt eine JSON-Datei und gibt den Inhalt als Dictionary zurück.""" if not os.path.exists(file_path): self.logger.critical(f"Wissensbasis-Datei '{file_path}' nicht gefunden. Bitte zuerst 'knowledge_base_builder.py' ausführen.") return None try: with open(file_path, 'r', encoding='utf-8') as f: self.logger.info(f"Lade Wissensbasis aus '{file_path}'...") return json.load(f) except (json.JSONDecodeError, IOError) as e: self.logger.critical(f"Fehler beim Laden oder Parsen der Datei '{file_path}': {e}") return None def _normalize_job_title(self, job_title): """Bereinigt und normalisiert einen Jobtitel für den Abgleich.""" if not isinstance(job_title, str): return "" return job_title.lower().strip() def _find_best_match(self, job_title): """ Führt den mehrstufigen Matching-Algorithmus aus. Stufe 1: Exakter Match. Stufe 2: Keyword-basierter Match mit Priorisierung. """ normalized_title = self._normalize_job_title(job_title) if not normalized_title: return DEFAULT_DEPARTMENT # --- Stufe 1: Exakter Match --- exact_match = self.exact_match_map.get(normalized_title) if exact_match: self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1: Exakter Match)") return exact_match # --- Stufe 2: Keyword-basierter Match --- title_tokens = set(re.split(r'[\s/(),-]+', normalized_title)) scores = {} for department, rules in self.keyword_rules.items(): matches = title_tokens.intersection(rules.get("keywords", [])) if matches: scores[department] = len(matches) if not scores: self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine Keywords gefunden)") return DEFAULT_DEPARTMENT max_score = max(scores.values()) top_departments = [dept for dept, score in scores.items() if score == max_score] if len(top_departments) == 1: winner = top_departments[0] self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score})") return winner best_priority = float('inf') winner = top_departments[0] for department in top_departments: priority = self.keyword_rules[department].get("priority", 99) if priority < best_priority: best_priority = priority winner = department self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score}, Prio-Tiebreak: {best_priority})") return winner def _get_ai_classification(self, job_titles_to_classify): """ Sendet eine Liste von Jobtiteln an die OpenAI API zur Klassifizierung. """ self.logger.info(f"Starte Stufe 3: Sende {len(job_titles_to_classify)} 'Undefined' Jobtitel zur KI-Klassifizierung...") if not job_titles_to_classify: return {} valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT]) prompt_parts = [ "Du bist ein HR-Experte, der Jobtitel präzise vordefinierten Abteilungen zuordnet.", "Analysiere die folgende Liste von Jobtiteln.", "Ordne JEDEN Jobtitel EINER der folgenden gültigen Abteilungen zu:", ", ".join(valid_departments), "\nGib deine Antwort als valides JSON-Array von Objekten zurück, wobei jedes Objekt die Schlüssel 'job_title' und 'department' hat.", "Beispiel: [{\"job_title\": \"Head of Fleet Management\", \"department\": \"Fuhrparkmanagement\"}]", "\n--- Zu klassifizierende Jobtitel ---", json.dumps(job_titles_to_classify, ensure_ascii=False) ] prompt = "\n".join(prompt_parts) try: # Wir nutzen die call_openai_chat Funktion aus helpers.py response_str = call_openai_chat(prompt, temperature=0.0, model="gpt-4o-mini", response_format_json=True) # Robuste JSON-Extraktion json_start = response_str.find('[') json_end = response_str.rfind(']') if json_start == -1 or json_end == -1: raise json.JSONDecodeError("Kein JSON-Array in der Antwort gefunden.", response_str, 0) json_str = response_str[json_start : json_end + 1] results_list = json.loads(json_str) # Konvertiere die Liste in ein Dictionary für einfaches Nachschlagen classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments} self.logger.info(f"{len(classified_map)} Jobtitel erfolgreich von der KI klassifiziert.") return classified_map except Exception as e: self.logger.error(f"Fehler bei der KI-Klassifizierung: {e}") return {} def _append_learnings_to_source(self, gsh, new_mappings_df): """ Hängt die neu gelernten Mappings an das 'CRM_Jobtitles'-Sheet an. """ if new_mappings_df.empty: return self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...") # Stelle sicher, dass das DataFrame die Spalten "Job Title" und "Department" hat if "Job Title" not in new_mappings_df.columns or "Department" not in new_mappings_df.columns: self.logger.error("Fehler im Lern-Mechanismus: DataFrame hat nicht die erwarteten Spalten.") return # Konvertiere das DataFrame in eine Liste von Listen für den Upload rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist() success = gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append) if success: self.logger.info("Lern-Daten erfolgreich an die Wissensbasis angehängt.") else: self.logger.error("Fehler beim Anhängen der Lern-Daten an die Wissensbasis.") def process_contacts(self): """ Orchestriert den gesamten Prozess: Daten laden, zuordnen, KI anreichern, lernen und zurückschreiben. """ self.logger.info(f"Starte Kontakt-Gruppierung (Version {__version__})...") if self.exact_match_map is None or self.keyword_rules is None: self.logger.error("Verarbeitung abgebrochen, da Wissensbasis nicht geladen werden konnte.") return gsh = GoogleSheetHandler() df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME) if df is None or df.empty: self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer oder konnte nicht geladen werden. Nichts zu tun.") return df.columns = [col.strip() for col in df.columns] if "Job Title" not in df.columns: self.logger.critical(f"Benötigte Spalte 'Job Title' in '{TARGET_SHEET_NAME}' nicht gefunden. Abbruch.") return # Original Jobtitel für späteres Lernen speichern df['Original Job Title'] = df['Job Title'] self.logger.info(f"{len(df)} Kontakte aus '{TARGET_SHEET_NAME}' zum Verarbeiten geladen.") # Stufe 1 & 2: Zuordnung durchführen if "Department" not in df.columns: df["Department"] = "" df['Department'] = df['Job Title'].apply(self._find_best_match) # Stufe 3: KI-Klassifizierung für 'Undefined' Fälle undefined_df = df[df['Department'] == DEFAULT_DEPARTMENT] if not undefined_df.empty: titles_to_classify = undefined_df['Job Title'].unique().tolist() ai_results_map = self._get_ai_classification(titles_to_classify) # Wende die KI-Ergebnisse an df['Department'] = df.apply( lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'], axis=1 ) # Lern-Mechanismus: Neue Erkenntnisse für die Zukunft speichern # Wir erstellen ein neues DataFrame mit den Originaltiteln und den KI-Departments new_learnings = [] for title, dept in ai_results_map.items(): new_learnings.append({'Job Title': title, 'Department': dept}) if new_learnings: new_learnings_df = pd.DataFrame(new_learnings) self._append_learnings_to_source(gsh, new_learnings_df) self.logger.info("Zuordnung abgeschlossen. Bereite das Schreiben der Ergebnisse vor...") # --- Zusammenfassende Statistik --- self.logger.info("--- Zuordnungs-Statistik ---") stats = df['Department'].value_counts() for department, count in stats.items(): self.logger.info(f"- {department}: {count} Zuordnungen") self.logger.info(f"GESAMT: {len(df)} Jobtitel verarbeitet.") self.logger.info("--------------------------") # Ergebnisse zurück in das Google Sheet schreiben (nur die Originalspalten) output_df = df.drop(columns=['Original Job Title']) output_data = [output_df.columns.values.tolist()] + output_df.values.tolist() success = gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data) if success: self.logger.info(f"Erfolgreich {len(df)} Kontakte verarbeitet und Ergebnisse in '{TARGET_SHEET_NAME}' geschrieben.") else: self.logger.error("Ein Fehler ist beim Zurückschreiben der Daten in das Google Sheet aufgetreten.") if __name__ == "__main__": setup_logging() Config.load_api_keys() # NEU: API-Schlüssel werden vor der Ausführung geladen grouper = ContactGrouper() grouper.process_contacts()