Files
Brancheneinstufung2/contact_grouping.py
Floke cd9a2ffc55 contact_grouping.py aktualisiert
- Detailliertere Log-Ausgaben während des gesamten Prozesses für eine bessere Nachvollziehbarkeit.
- Loggt die Anzahl der Zeilen direkt nach dem erfolgreichen Laden aus dem Google Sheet.
- Fügt explizite Start- und End-Meldungen für die regel-basierte Zuordnung (Stufe 1 & 2) hinzu.
- Loggt die genaue Anzahl der 'Undefined'-Fälle, die an die KI zur Klassifizierung gesendet werden.
- Gibt eine klare Meldung aus, wenn keine KI-Anfrage nötig ist, weil alle Fälle durch Regeln gelöst wurden.
- Verbessert die allgemeine Klarheit der Log-Nachrichten, um die Analyse von Laufzeitverhalten und potenziellen Fehlern zu erleichtern.

- Bugfix: Behebt einen kritischen APIError [400] 'Invalid list_value', der beim Zurückschreiben der Daten in das Google Sheet auftrat.
- Die Funktion `_find_best_match` gab in manchen Fällen fälschlicherweise eine Liste anstelle eines einzelnen Strings als Department zurück.
- Die Rückgabewerte der Funktion wurden korrigiert, um sicherzustellen, dass immer ein String übergeben wird.
- Dies stellt die Kompatibilität mit der Google Sheets API wieder her und macht den Schreibvorgang robust.
2025-09-18 07:59:31 +00:00

230 lines
10 KiB
Python

# contact_grouping.py
__version__ = "v1.1.3" # Versionsnummer hochgezählt
import logging
import json
import re
import os
import sys
import pandas as pd
from google_sheet_handler import GoogleSheetHandler
from helpers import create_log_filename, call_openai_chat
from config import LOG_DIR, Config
# --- Konfiguration ---
TARGET_SHEET_NAME = "Matching_Positions"
LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles"
EXACT_MATCH_FILE = "exact_match_map.json"
KEYWORD_RULES_FILE = "keyword_rules.json"
DEFAULT_DEPARTMENT = "Undefined"
def setup_logging():
log_filename = create_log_filename("contact_grouping")
log_level = logging.DEBUG
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename, encoding='utf-8'),
logging.StreamHandler()
]
)
logging.getLogger("gspread").setLevel(logging.WARNING)
logging.getLogger("oauth2client").setLevel(logging.WARNING)
logging.info(f"Logging initialisiert. Log-Datei: {log_filename}")
class ContactGrouper:
def __init__(self):
self.logger = logging.getLogger(__name__ + ".ContactGrouper")
self.exact_match_map = None
self.keyword_rules = None
def load_knowledge_base(self):
self.logger.info("Lade Wissensbasis...")
self.exact_match_map = self._load_json(EXACT_MATCH_FILE)
self.keyword_rules = self._load_json(KEYWORD_RULES_FILE)
if self.exact_match_map is None or self.keyword_rules is None:
self.logger.critical("Eine oder mehrere Wissensbasis-Dateien konnten nicht geladen werden. Abbruch.")
return False
self.logger.info("Wissensbasis erfolgreich geladen.")
return True
def _load_json(self, file_path):
if not os.path.exists(file_path):
self.logger.error(f"Wissensbasis-Datei '{file_path}' nicht gefunden. Bitte 'knowledge_base_builder.py' ausführen.")
return None
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.logger.debug(f"Lese und parse '{file_path}'...")
data = json.load(f)
self.logger.debug(f"'{file_path}' erfolgreich geparst.")
return data
except (json.JSONDecodeError, IOError) as e:
self.logger.error(f"Fehler beim Laden oder Parsen der Datei '{file_path}': {e}")
return None
def _normalize_job_title(self, job_title):
if not isinstance(job_title, str): return ""
return job_title.lower().strip()
def _find_best_match(self, job_title):
normalized_title = self._normalize_job_title(job_title)
if not normalized_title: return DEFAULT_DEPARTMENT
exact_match = self.exact_match_map.get(normalized_title)
if exact_match:
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1: Exakter Match)")
return exact_match
title_tokens = set(re.split(r'[\s/(),-]+', normalized_title))
scores = {}
for department, rules in self.keyword_rules.items():
matches = title_tokens.intersection(rules.get("keywords", []))
if matches: scores[department] = len(matches)
if not scores:
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine Keywords gefunden)")
return DEFAULT_DEPARTMENT
max_score = max(scores.values())
top_departments = [dept for dept, score in scores.items() if score == max_score]
if len(top_departments) == 1:
# KORREKTUR: Hier wurde vorher die ganze Liste zurückgegeben. Jetzt wird das erste Element extrahiert.
winner = top_departments[0]
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score})")
return winner
best_priority = float('inf')
# KORREKTUR: Fallback-Gewinner ist jetzt ebenfalls ein String, kein Liste.
winner = top_departments[0]
for department in top_departments:
priority = self.keyword_rules[department].get("priority", 99)
if priority < best_priority:
best_priority = priority
winner = department
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score}, Prio-Tiebreak: {best_priority})")
return winner
def _get_ai_classification(self, job_titles_to_classify):
self.logger.info(f"Starte Stufe 3: Sende {len(job_titles_to_classify)} 'Undefined' Jobtitel zur KI-Klassifizierung...")
if not job_titles_to_classify: return {}
valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT])
prompt_parts = [
"Du bist ein HR-Experte, der Jobtitel präzise vordefinierten Abteilungen zuordnet.",
"Analysiere die folgende Liste von Jobtiteln.",
"Ordne JEDEN Jobtitel EINER der folgenden gültigen Abteilungen zu:",
", ".join(valid_departments),
"\nGib deine Antwort als valides JSON-Array von Objekten zurück, wobei jedes Objekt die Schlüssel 'job_title' und 'department' hat.",
"Stelle sicher, dass die Antwort ausschließlich das JSON-Array enthält, ohne einleitenden Text oder Markdown-Formatierung.",
"Beispiel: [{\"job_title\": \"Head of Fleet Management\", \"department\": \"Fuhrparkmanagement\"}]",
"\n--- Zu klassifizierende Jobtitel ---",
json.dumps(job_titles_to_classify, ensure_ascii=False)
]
prompt = "\n".join(prompt_parts)
response_str = "" # Initialisieren für den Fehlerfall
try:
response_str = call_openai_chat(prompt, temperature=0.0, model="gpt-4o-mini", response_format_json=True)
# --- NEU: Robuste Regex-basierte JSON-Extraktion ---
match = re.search(r'\[.*\]', response_str, re.DOTALL)
if not match:
# NEU: Verbessertes Logging, um die Roh-Antwort zu sehen
self.logger.error("Konnte kein JSON-Array in der KI-Antwort finden.")
self.logger.debug(f"--- VOLLSTÄNDIGE ROH-ANTWORT DER API ---\n{response_str}\n------------------------------------")
return {}
json_str = match.group(0)
results_list = json.loads(json_str)
classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments}
self.logger.info(f"{len(classified_map)} Jobtitel erfolgreich von der KI klassifiziert.")
return classified_map
except json.JSONDecodeError as e:
self.logger.error(f"Fehler beim Parsen des extrahierten JSON aus der KI-Antwort: {e}")
self.logger.debug(f"--- EXTRAHIERTER JSON-STRING, DER ZUM FEHLER FÜHRTE ---\n{json_str}\n------------------------------------")
return {}
except Exception as e:
self.logger.error(f"Ein unerwarteter Fehler ist bei der KI-Klassifizierung aufgetreten: {e}")
return {}
def _append_learnings_to_source(self, gsh, new_mappings_df):
if new_mappings_df.empty: return
self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...")
rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist()
success = gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append)
if success: self.logger.info("Lern-Daten erfolgreich angehängt.")
else: self.logger.error("Fehler beim Anhängen der Lern-Daten.")
def process_contacts(self):
self.logger.info("Starte Kontakt-Verarbeitung...")
gsh = GoogleSheetHandler()
df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME)
if df is None or df.empty:
self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer. Nichts zu tun.")
return
df.columns = [col.strip() for col in df.columns]
if "Job Title" not in df.columns:
self.logger.critical(f"Benötigte Spalte 'Job Title' nicht gefunden. Abbruch.")
return
df['Original Job Title'] = df['Job Title']
self.logger.info(f"{len(df)} Kontakte aus '{TARGET_SHEET_NAME}' geladen.")
if "Department" not in df.columns: df["Department"] = ""
df['Department'] = df['Job Title'].apply(self._find_best_match)
undefined_df = df[df['Department'] == DEFAULT_DEPARTMENT]
if not undefined_df.empty:
titles_to_classify = undefined_df['Job Title'].unique().tolist()
ai_results_map = self._get_ai_classification(titles_to_classify)
df['Department'] = df.apply(
lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'],
axis=1
)
new_learnings = [{'Job Title': title, 'Department': dept} for title, dept in ai_results_map.items()]
if new_learnings:
self._append_learnings_to_source(gsh, pd.DataFrame(new_learnings))
self.logger.info("--- Zuordnungs-Statistik ---")
stats = df['Department'].value_counts()
for department, count in stats.items(): self.logger.info(f"- {department}: {count} Zuordnungen")
self.logger.info(f"GESAMT: {len(df)} Jobtitel verarbeitet.")
output_df = df.drop(columns=['Original Job Title'])
output_data = [output_df.columns.values.tolist()] + output_df.values.tolist()
success = gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data)
if success: self.logger.info(f"Ergebnisse erfolgreich in '{TARGET_SHEET_NAME}' geschrieben.")
else: self.logger.error("Fehler beim Zurückschreiben der Daten.")
if __name__ == "__main__":
setup_logging()
logging.info(f"Starte contact_grouping.py v{__version__}")
Config.load_api_keys()
grouper = ContactGrouper()
if not grouper.load_knowledge_base():
logging.critical("Skript-Abbruch aufgrund von Fehlern beim Laden der Wissensbasis.")
sys.exit(1)
grouper.process_contacts()