- Bugfix: Behebt das Problem, bei dem die OpenAI API eine textuelle Antwort anstelle eines JSON-Arrays zurückgab. - Der Prompt für die KI-Klassifizierung (Stufe 3) wurde grundlegend überarbeitet, um Missverständnisse durch die KI zu verhindern. - Die KI wird nun als "Datenverarbeitungstool" instruiert und erhält einen unmissverständlichen Befehl mit strikten Ausgabe-Formatierungsregeln. - Dies erhöht die Zuverlässigkeit der KI-gestützten Klassifizierung erheblich und stellt sicher, dass die Antwort maschinell verarbeitbar ist.
231 lines
11 KiB
Python
231 lines
11 KiB
Python
# contact_grouping.py
|
|
|
|
__version__ = "v1.1.7" # Versionsnummer hochgezählt
|
|
|
|
import logging
|
|
import json
|
|
import re
|
|
import os
|
|
import sys
|
|
import pandas as pd
|
|
|
|
from google_sheet_handler import GoogleSheetHandler
|
|
from helpers import create_log_filename, call_openai_chat
|
|
from config import LOG_DIR, Config
|
|
|
|
# --- Konfiguration ---
|
|
TARGET_SHEET_NAME = "Matching_Positions"
|
|
LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles"
|
|
EXACT_MATCH_FILE = "exact_match_map.json"
|
|
KEYWORD_RULES_FILE = "keyword_rules.json"
|
|
DEFAULT_DEPARTMENT = "Undefined"
|
|
|
|
def setup_logging():
|
|
log_filename = create_log_filename("contact_grouping")
|
|
if not log_filename:
|
|
print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden. Logge nur in die Konsole.")
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()])
|
|
return
|
|
log_level = logging.DEBUG
|
|
root_logger = logging.getLogger()
|
|
if root_logger.handlers:
|
|
for handler in root_logger.handlers[:]:
|
|
root_logger.removeHandler(handler)
|
|
logging.basicConfig(level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler(log_filename, encoding='utf-8'), logging.StreamHandler()])
|
|
logging.getLogger("gspread").setLevel(logging.WARNING)
|
|
logging.getLogger("oauth2client").setLevel(logging.WARNING)
|
|
logging.info(f"Logging erfolgreich initialisiert. Log-Datei: {log_filename}")
|
|
|
|
|
|
class ContactGrouper:
|
|
def __init__(self):
|
|
self.logger = logging.getLogger(__name__ + ".ContactGrouper")
|
|
self.exact_match_map = None
|
|
self.keyword_rules = None
|
|
|
|
def load_knowledge_base(self):
|
|
self.logger.info("Lade Wissensbasis...")
|
|
self.exact_match_map = self._load_json(EXACT_MATCH_FILE)
|
|
self.keyword_rules = self._load_json(KEYWORD_RULES_FILE)
|
|
if self.exact_match_map is None or self.keyword_rules is None:
|
|
self.logger.critical("Eine oder mehrere Wissensbasis-Dateien konnten nicht geladen werden. Abbruch.")
|
|
return False
|
|
self.logger.info("Wissensbasis erfolgreich geladen.")
|
|
return True
|
|
|
|
def _load_json(self, file_path):
|
|
if not os.path.exists(file_path):
|
|
self.logger.error(f"Wissensbasis-Datei '{file_path}' nicht gefunden.")
|
|
return None
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
self.logger.debug(f"Lese und parse '{file_path}'...")
|
|
data = json.load(f)
|
|
self.logger.debug(f"'{file_path}' erfolgreich geparst.")
|
|
return data
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
self.logger.error(f"Fehler beim Laden der Datei '{file_path}': {e}")
|
|
return None
|
|
|
|
def _normalize_job_title(self, job_title):
|
|
if not isinstance(job_title, str): return ""
|
|
return job_title.lower().strip()
|
|
|
|
def _find_best_match(self, job_title):
|
|
normalized_title = self._normalize_job_title(job_title)
|
|
if not normalized_title: return DEFAULT_DEPARTMENT
|
|
|
|
exact_match = self.exact_match_map.get(normalized_title)
|
|
if exact_match:
|
|
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1: Exakter Match)")
|
|
return exact_match
|
|
|
|
title_tokens = set(re.split(r'[\s/(),-]+', normalized_title))
|
|
scores = {}
|
|
for department, rules in self.keyword_rules.items():
|
|
matches = title_tokens.intersection(rules.get("keywords", []))
|
|
if matches: scores[department] = len(matches)
|
|
|
|
if not scores:
|
|
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine Keywords gefunden)")
|
|
return DEFAULT_DEPARTMENT
|
|
|
|
max_score = max(scores.values())
|
|
top_departments = [dept for dept, score in scores.items() if score == max_score]
|
|
|
|
if len(top_departments) == 1:
|
|
winner = top_departments[0]
|
|
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score})")
|
|
return winner
|
|
|
|
best_priority = float('inf')
|
|
winner = top_departments[0]
|
|
for department in top_departments:
|
|
priority = self.keyword_rules[department].get("priority", 99)
|
|
if priority < best_priority:
|
|
best_priority = priority
|
|
winner = department
|
|
|
|
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score}, Prio-Tiebreak: {best_priority})")
|
|
return winner
|
|
|
|
def _get_ai_classification(self, job_titles_to_classify):
|
|
self.logger.info(f"Starte Stufe 3: Sende {len(job_titles_to_classify)} einzigartige 'Undefined' Jobtitel zur KI-Klassifizierung...")
|
|
self.logger.debug(f"Beispiel-Titel für KI: {job_titles_to_classify[:3]}")
|
|
if not job_titles_to_classify: return {}
|
|
|
|
valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT])
|
|
|
|
# --- NEUER, ROBUSTERER PROMPT ---
|
|
prompt_parts = [
|
|
"You are a specialized data processing tool. Your SOLE function is to receive a list of job titles and classify each one into a predefined department category.",
|
|
"--- VALID DEPARTMENT CATEGORIES ---",
|
|
", ".join(valid_departments),
|
|
"\n--- RULES ---",
|
|
"1. You MUST classify EVERY job title into ONE of the valid categories.",
|
|
"2. Your response MUST be a single, valid JSON array of objects.",
|
|
"3. Each object MUST contain the keys 'job_title' and 'department'.",
|
|
"4. Your entire response MUST start with '[' and end with ']'.",
|
|
"5. You MUST NOT add any introductory text, explanations, summaries, or markdown formatting like ```json.",
|
|
"\n--- JOB TITLES TO CLASSIFY ---",
|
|
json.dumps(job_titles_to_classify, ensure_ascii=False)
|
|
]
|
|
prompt = "\n".join(prompt_parts)
|
|
|
|
response_str = ""
|
|
try:
|
|
response_str = call_openai_chat(prompt, temperature=0.0, model="gpt-4o-mini", response_format_json=True)
|
|
match = re.search(r'\[.*\]', response_str, re.DOTALL)
|
|
|
|
if not match:
|
|
self.logger.error("Konnte kein JSON-Array in der KI-Antwort finden.")
|
|
self.logger.debug(f"--- VOLLSTÄNDIGE ROH-ANTWORT DER API ---\n{response_str}\n------------------------------------")
|
|
return {}
|
|
|
|
json_str = match.group(0)
|
|
results_list = json.loads(json_str)
|
|
classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments}
|
|
|
|
self.logger.info(f"{len(classified_map)} Jobtitel erfolgreich von der KI klassifiziert.")
|
|
return classified_map
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Fehler beim Parsen des extrahierten JSON: {e}")
|
|
self.logger.debug(f"--- EXTRAHIERTER JSON-STRING, DER FEHLER VERURSACHTE ---\n{json_str}\n------------------------------------")
|
|
return {}
|
|
except Exception as e:
|
|
self.logger.error(f"Unerwarteter Fehler bei KI-Klassifizierung: {e}")
|
|
return {}
|
|
|
|
def _append_learnings_to_source(self, gsh, new_mappings_df):
|
|
if new_mappings_df.empty: return
|
|
self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...")
|
|
rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist()
|
|
success = gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append)
|
|
if success: self.logger.info("Lern-Daten erfolgreich angehängt.")
|
|
else: self.logger.error("Fehler beim Anhängen der Lern-Daten.")
|
|
|
|
def process_contacts(self):
|
|
self.logger.info("Starte Kontakt-Verarbeitung...")
|
|
gsh = GoogleSheetHandler()
|
|
df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME)
|
|
|
|
if df is None or df.empty:
|
|
self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer. Nichts zu tun.")
|
|
return
|
|
|
|
self.logger.info(f"{len(df)} Zeilen aus '{TARGET_SHEET_NAME}' erfolgreich geladen.")
|
|
df.columns = [col.strip() for col in df.columns]
|
|
if "Job Title" not in df.columns:
|
|
self.logger.critical(f"Benötigte Spalte 'Job Title' nicht gefunden. Abbruch.")
|
|
return
|
|
|
|
df['Original Job Title'] = df['Job Title']
|
|
if "Department" not in df.columns: df["Department"] = ""
|
|
|
|
self.logger.info("Starte regelbasierte Zuordnung (Stufe 1 & 2)...")
|
|
df['Department'] = df['Job Title'].apply(self._find_best_match)
|
|
self.logger.info("Regelbasierte Zuordnung abgeschlossen.")
|
|
|
|
undefined_df = df[df['Department'] == DEFAULT_DEPARTMENT]
|
|
|
|
if not undefined_df.empty:
|
|
self.logger.info(f"{len(undefined_df)} Jobtitel konnten nicht durch Regeln zugeordnet werden. Starte Stufe 3 (KI).")
|
|
titles_to_classify = undefined_df['Job Title'].unique().tolist()
|
|
ai_results_map = self._get_ai_classification(titles_to_classify)
|
|
df['Department'] = df.apply(
|
|
lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'],
|
|
axis=1
|
|
)
|
|
|
|
new_learnings = [{'Job Title': title, 'Department': dept} for title, dept in ai_results_map.items()]
|
|
if new_learnings:
|
|
self._append_learnings_to_source(gsh, pd.DataFrame(new_learnings))
|
|
else:
|
|
self.logger.info("Alle Jobtitel durch Regeln (Stufe 1 & 2) zugeordnet. Stufe 3 wird übersprungen.")
|
|
|
|
self.logger.info("--- Zuordnungs-Statistik ---")
|
|
stats = df['Department'].value_counts()
|
|
for department, count in stats.items(): self.logger.info(f"- {department}: {count} Zuordnungen")
|
|
self.logger.info(f"GESAMT: {len(df)} Jobtitel verarbeitet.")
|
|
|
|
output_df = df.drop(columns=['Original Job Title'])
|
|
output_data = [output_df.columns.values.tolist()] + output_df.values.tolist()
|
|
|
|
success = gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data)
|
|
if success: self.logger.info(f"Ergebnisse erfolgreich in '{TARGET_SHEET_NAME}' geschrieben.")
|
|
else: self.logger.error("Fehler beim Zurückschreiben der Daten.")
|
|
|
|
if __name__ == "__main__":
|
|
setup_logging()
|
|
logging.info(f"Starte contact_grouping.py v{__version__}")
|
|
|
|
Config.load_api_keys()
|
|
|
|
grouper = ContactGrouper()
|
|
|
|
if not grouper.load_knowledge_base():
|
|
logging.critical("Skript-Abbruch: Fehler beim Laden der Wissensbasis.")
|
|
sys.exit(1)
|
|
|
|
grouper.process_contacts() |