knowledge_base_builder.py aktualisiert

This commit is contained in:
2025-09-18 13:45:27 +00:00
parent 7f3d6c603a
commit f5a686403d

View File

@@ -1,252 +1,196 @@
# contact_grouping.py # knowledge_base_builder.py
__version__ = "v1.2.3" __version__ = "v1.2.2"
import logging import logging
import json import json
import re import re
import os import os
import sys import sys
from collections import Counter
import pandas as pd import pandas as pd
from collections import defaultdict
from google_sheet_handler import GoogleSheetHandler from google_sheet_handler import GoogleSheetHandler
from helpers import create_log_filename, call_openai_chat from helpers import create_log_filename
from config import Config from config import Config
# --- Konfiguration --- # --- Konfiguration ---
TARGET_SHEET_NAME = "Matching_Positions" SOURCE_SHEET_NAME = "CRM_Jobtitles"
LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles" EXACT_MATCH_OUTPUT_FILE = "exact_match_map.json"
EXACT_MATCH_FILE = "exact_match_map.json" KEYWORD_RULES_OUTPUT_FILE = "keyword_rules.json"
KEYWORD_RULES_FILE = "keyword_rules.json"
DEFAULT_DEPARTMENT = "Undefined" DEPARTMENT_PRIORITIES = {
AI_BATCH_SIZE = 150 "Fuhrparkmanagement": 1,
"Legal": 1,
"Baustofflogistik": 1,
"Baustoffherstellung": 1,
"Field Service Management / Kundenservice": 2,
"IT": 3,
"Production Maintenance / Wartung Produktion": 4,
"Utility Maintenance": 5,
"Procurement / Einkauf": 6,
"Supply Chain Management": 7,
"Finanzen": 8,
"Technik": 8,
"Management / GF / C-Level": 10,
"Logistik": 11,
"Vertrieb": 12,
"Transportwesen": 13,
"Berater": 20,
"Undefined": 99
}
BRANCH_GROUP_RULES = {
"bau": [
"Baustoffhandel", "Baustoffindustrie",
"Logistiker Baustoffe", "Bauunternehmen"
],
"versorger": [
"Stadtwerke", "Verteilnetzbetreiber",
"Telekommunikation", "Gase & Mineralöl"
],
"produktion": [
"Maschinenbau", "Automobil", "Anlagenbau", "Medizintechnik",
"Chemie & Pharma", "Elektrotechnik", "Lebensmittelproduktion",
"Bürotechnik", "Automaten (Vending, Slot)", "Gebäudetechnik Allgemein",
"Braune & Weiße Ware", "Fenster / Glas", "Getränke", "Möbel", "Agrar, Pellets"
]
}
MIN_SAMPLES_FOR_BRANCH_RULE = 5
# --- MODIFIZIERT: Schwellenwert auf 60% gesenkt ---
BRANCH_SPECIFICITY_THRESHOLD = 0.6
STOP_WORDS = {
'manager', 'leiter', 'head', 'lead', 'senior', 'junior', 'direktor', 'director',
'verantwortlicher', 'beauftragter', 'referent', 'sachbearbeiter', 'mitarbeiter',
'spezialist', 'specialist', 'expert', 'experte', 'consultant', 'berater',
'assistant', 'assistenz', 'teamleiter', 'teamlead', 'abteilungsleiter',
'bereichsleiter', 'gruppenleiter', 'geschäftsführer', 'vorstand', 'ceo', 'cio',
'cfo', 'cto', 'coo', 'von', 'of', 'und', 'für', 'der', 'die', 'das', '&'
}
def setup_logging(): def setup_logging():
log_filename = create_log_filename("contact_grouping") log_filename = create_log_filename("knowledge_base_builder")
if not log_filename: if not log_filename:
print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden. Logge nur in die Konsole.") print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden. Logge nur in die Konsole.")
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()]) logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()])
return return
log_level = logging.DEBUG log_level = logging.DEBUG
root_logger = logging.getLogger() root_logger = logging.getLogger()
if root_logger.handlers: if root_logger.handlers:
for handler in root_logger.handlers[:]: for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler) root_logger.removeHandler(handler)
logging.basicConfig(level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler(log_filename, encoding='utf-8'), logging.StreamHandler()])
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename, encoding='utf-8'),
logging.StreamHandler()
]
)
logging.getLogger("gspread").setLevel(logging.WARNING) logging.getLogger("gspread").setLevel(logging.WARNING)
logging.getLogger("oauth2client").setLevel(logging.WARNING) logging.getLogger("oauth2client").setLevel(logging.WARNING)
logging.info(f"Logging erfolgreich initialisiert. Log-Datei: {log_filename}") logging.info(f"Logging erfolgreich initialisiert. Log-Datei: {log_filename}")
class ContactGrouper:
def __init__(self):
self.logger = logging.getLogger(__name__ + ".ContactGrouper")
self.exact_match_map = None
self.keyword_rules = None
self.ai_example_prompt_part = ""
def load_knowledge_base(self): def build_knowledge_base():
self.logger.info("Lade Wissensbasis...") logger = logging.getLogger(__name__)
self.exact_match_map = self._load_json(EXACT_MATCH_FILE) logger.info(f"Starte Erstellung der Wissensbasis (Version {__version__})...")
self.keyword_rules = self._load_json(KEYWORD_RULES_FILE)
if self.exact_match_map is None or self.keyword_rules is None:
self.logger.critical("Fehler beim Laden der Wissensbasis. Abbruch.")
return False
self._generate_ai_examples()
self.logger.info("Wissensbasis erfolgreich geladen und KI-Beispiele generiert.")
return True
def _load_json(self, file_path): gsh = GoogleSheetHandler()
if not os.path.exists(file_path): df = gsh.get_sheet_as_dataframe(SOURCE_SHEET_NAME)
self.logger.error(f"Wissensbasis-Datei '{file_path}' nicht gefunden.")
return None
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.logger.debug(f"Lese und parse '{file_path}'...")
data = json.load(f)
self.logger.debug(f"'{file_path}' erfolgreich geparst.")
return data
except (json.JSONDecodeError, IOError) as e:
self.logger.error(f"Fehler beim Laden der Datei '{file_path}': {e}")
return None
def _normalize_text(self, text): if df is None or df.empty:
if not isinstance(text, str): return "" logger.critical(f"Konnte keine Daten aus '{SOURCE_SHEET_NAME}' laden. Abbruch.")
return text.lower().strip() return
def _generate_ai_examples(self): df.columns = [col.strip() for col in df.columns]
self.logger.info("Generiere KI-Beispiele aus der Wissensbasis...")
if not self.exact_match_map:
return
titles_by_dept = defaultdict(list)
for title, dept in self.exact_match_map.items():
titles_by_dept[dept].append(title)
example_lines = []
sorted_depts = sorted(self.keyword_rules.keys(), key=lambda d: self.keyword_rules.get(d, {}).get('priority', 99))
for dept in sorted_depts:
if dept == DEFAULT_DEPARTMENT or not titles_by_dept[dept]:
continue
top_titles = sorted(titles_by_dept[dept], key=len)[:5]
# --- KORREKTUR: Die fehlerhafte Zeile wurde ersetzt ---
formatted_titles = ', '.join('"' + title + '"' for title in top_titles)
example_lines.append(f"- Für '{dept}': {formatted_titles}")
self.ai_example_prompt_part = "\n".join(example_lines)
self.logger.debug(f"Generierter Beispiel-Prompt:\n{self.ai_example_prompt_part}")
def _find_best_match(self, job_title, company_branch): required_cols = ["Job Title", "Department", "Branche"]
normalized_title = self._normalize_text(job_title) if not all(col in df.columns for col in required_cols):
normalized_branch = self._normalize_text(company_branch) logger.critical(f"Benötigte Spalten {required_cols} nicht in '{SOURCE_SHEET_NAME}' gefunden. Abbruch.")
if not normalized_title: return DEFAULT_DEPARTMENT return
exact_match = self.exact_match_map.get(normalized_title) logger.info(f"{len(df)} Zeilen aus '{SOURCE_SHEET_NAME}' geladen.")
if exact_match:
rule = self.keyword_rules.get(exact_match, {}) df.dropna(subset=required_cols, inplace=True)
required_keywords = rule.get("required_branch_keywords") df = df[df["Job Title"].str.strip() != '']
if required_keywords: df['normalized_title'] = df['Job Title'].str.lower().str.strip()
if not any(keyword in normalized_branch for keyword in required_keywords): logger.info(f"{len(df)} Zeilen nach Bereinigung.")
self.logger.debug(f"'{job_title}' -> Exakter Match '{exact_match}' verworfen (Branche: '{company_branch}')")
logger.info("Erstelle 'Primary Mapping' für exakte Treffer (Stufe 1)...")
exact_match_map = df.groupby('normalized_title')['Department'].apply(lambda x: x.mode()[0]).to_dict()
try:
with open(EXACT_MATCH_OUTPUT_FILE, 'w', encoding='utf-8') as f:
json.dump(exact_match_map, f, indent=4, ensure_ascii=False)
logger.info(f"-> '{EXACT_MATCH_OUTPUT_FILE}' mit {len(exact_match_map)} Titeln erstellt.")
except IOError as e:
logger.error(f"Fehler beim Schreiben der Datei '{EXACT_MATCH_OUTPUT_FILE}': {e}")
return
logger.info("Erstelle 'Keyword-Datenbank' mit automatischer Branchen-Logik (Stufe 2)...")
titles_by_department = df.groupby('Department')['normalized_title'].apply(list).to_dict()
branches_by_department = df.groupby('Department')['Branche'].apply(list).to_dict()
keyword_rules = {}
for department, titles in titles_by_department.items():
all_words = []
for title in titles:
words = re.split(r'[\s/(),-]+', title)
all_words.extend([word for word in words if word])
word_counts = Counter(all_words)
top_keywords = [word for word, count in word_counts.most_common(50) if word not in STOP_WORDS and (len(word) > 2 or word in {'it', 'edv'})]
if top_keywords:
rule = {
"priority": DEPARTMENT_PRIORITIES.get(department, 99),
"keywords": sorted(top_keywords)
}
department_branches = branches_by_department.get(department, [])
total_titles_in_dept = len(department_branches)
if total_titles_in_dept >= MIN_SAMPLES_FOR_BRANCH_RULE:
branch_group_counts = Counter()
for branch_name in department_branches:
for group_keyword, d365_names in BRANCH_GROUP_RULES.items():
if branch_name in d365_names:
branch_group_counts[group_keyword] += 1
if branch_group_counts:
most_common_group, count = branch_group_counts.most_common(1)[0]
ratio = count / total_titles_in_dept
if ratio > BRANCH_SPECIFICITY_THRESHOLD:
logger.info(f" -> Department '{department}' ist spezifisch für Branche '{most_common_group}' ({ratio:.0%}). Regel wird hinzugefügt.")
rule["required_branch_keywords"] = [most_common_group]
else:
logger.debug(f" -> Department '{department}' nicht spezifisch genug. Dominante Branche '{most_common_group}' nur bei {ratio:.0%}, benötigt >{BRANCH_SPECIFICITY_THRESHOLD:.0%}.")
else: else:
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1, Branche OK)") logger.debug(f" -> Department '{department}' konnte keiner Branchen-Gruppe zugeordnet werden.")
return exact_match
else: else:
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1)") logger.debug(f" -> Department '{department}' hat zu wenige Datenpunkte ({total_titles_in_dept} < {MIN_SAMPLES_FOR_BRANCH_RULE}) für eine Branchen-Regel.")
return exact_match
title_tokens = set(re.split(r'[\s/(),-]+', normalized_title)) keyword_rules[department] = rule
scores = {}
for department, rules in self.keyword_rules.items():
required_keywords = rules.get("required_branch_keywords")
if required_keywords:
if not any(keyword in normalized_branch for keyword in required_keywords):
self.logger.debug(f"Dept '{department}' für '{job_title}' übersprungen (Branche: '{company_branch}')")
continue
matches = title_tokens.intersection(rules.get("keywords", []))
if matches: scores[department] = len(matches)
if not scores: try:
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine passenden Keywords)") with open(KEYWORD_RULES_OUTPUT_FILE, 'w', encoding='utf-8') as f:
return DEFAULT_DEPARTMENT json.dump(keyword_rules, f, indent=4, ensure_ascii=False)
logger.info(f"-> '{KEYWORD_RULES_OUTPUT_FILE}' mit Regeln für {len(keyword_rules)} Departments erstellt.")
except IOError as e:
logger.error(f"Fehler beim Schreiben der Datei '{KEYWORD_RULES_OUTPUT_FILE}': {e}")
return
max_score = max(scores.values()) logger.info("Wissensbasis erfolgreich erstellt.")
top_departments = [dept for dept, score in scores.items() if score == max_score]
if len(top_departments) == 1:
winner = top_departments[0]
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score})")
return winner
best_priority = float('inf')
winner = top_departments[0]
for department in top_departments:
priority = self.keyword_rules.get(department, {}).get("priority", 99)
if priority < best_priority:
best_priority = priority
winner = department
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score}, Prio {best_priority})")
return winner
def _get_ai_classification(self, contacts_to_classify):
self.logger.info(f"Sende {len(contacts_to_classify)} Titel an KI (mit Kontext)...")
if not contacts_to_classify: return {}
valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT])
prompt_parts = [
"You are a specialized data processing tool. Your SOLE function is to receive a list of job titles and classify each one into a predefined department category.",
"--- VALID DEPARTMENT CATEGORIES ---",
", ".join(valid_departments),
"\n--- EXAMPLES OF TYPICAL ROLES ---",
self.ai_example_prompt_part,
"\n--- RULES ---",
"1. You MUST use the 'company_branch' to make a context-aware decision.",
"2. For departments with branch requirements (like 'Baustofflogistik' for 'bau'), you MUST ONLY use them if the branch matches.",
"3. Your response MUST be a single, valid JSON array of objects.",
"4. Each object MUST contain the keys 'job_title' and 'department'.",
"5. Your entire response MUST start with '[' and end with ']'.",
"6. You MUST NOT add any introductory text, explanations, summaries, or markdown formatting like ```json.",
"\n--- CONTACTS TO CLASSIFY (JSON) ---",
json.dumps(contacts_to_classify, ensure_ascii=False)
]
prompt = "\n".join(prompt_parts)
response_str = ""
try:
response_str = call_openai_chat(prompt, temperature=0.0, model="gpt-4o-mini", response_format_json=True)
match = re.search(r'\[.*\]', response_str, re.DOTALL)
if not match:
self.logger.error("Kein JSON-Array in KI-Antwort gefunden.")
self.logger.debug(f"ROH-ANTWORT DER API:\n{response_str}")
return {}
json_str = match.group(0)
results_list = json.loads(json_str)
classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments}
self.logger.info(f"{len(classified_map)} Titel erfolgreich von KI klassifiziert.")
return classified_map
except json.JSONDecodeError as e:
self.logger.error(f"Fehler beim Parsen des extrahierten JSON: {e}")
self.logger.debug(f"EXTRAHIERTER JSON-STRING, DER FEHLER VERURSACHTE:\n{json_str}")
return {}
except Exception as e:
self.logger.error(f"Unerwarteter Fehler bei KI-Klassifizierung: {e}")
return {}
def _append_learnings_to_source(self, gsh, new_mappings_df):
if new_mappings_df.empty: return
self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...")
rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist()
if not gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append):
self.logger.error("Fehler beim Anhängen der Lern-Daten.")
def process_contacts(self):
self.logger.info("Starte Kontakt-Verarbeitung...")
gsh = GoogleSheetHandler()
df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME)
if df is None or df.empty:
self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer. Nichts zu tun.")
return
self.logger.info(f"{len(df)} Zeilen aus '{TARGET_SHEET_NAME}' geladen.")
df.columns = [col.strip() for col in df.columns]
if "Job Title" not in df.columns or "Branche" not in df.columns:
self.logger.critical(f"Benötigte Spalten 'Job Title' und/oder 'Branche' nicht gefunden. Abbruch.")
return
df['Original Job Title'] = df['Job Title']
if "Department" not in df.columns: df["Department"] = ""
self.logger.info("Starte regelbasierte Zuordnung (Stufe 1 & 2) mit Branchen-Kontext...")
df['Department'] = df.apply(lambda row: self._find_best_match(row['Job Title'], row.get('Branche', '')), axis=1)
self.logger.info("Regelbasierte Zuordnung abgeschlossen.")
undefined_df = df[df['Department'] == DEFAULT_DEPARTMENT]
if not undefined_df.empty:
self.logger.info(f"{len(undefined_df)} Jobtitel konnten nicht zugeordnet werden. Starte Stufe 3 (KI).")
contacts_to_classify = undefined_df[['Job Title', 'Branche']].drop_duplicates().to_dict('records')
contacts_to_classify = [{'job_title': c['Job Title'], 'company_branch': c.get('Branche', '')} for c in contacts_to_classify]
ai_results_map = {}
contact_chunks = [contacts_to_classify[i:i + AI_BATCH_SIZE] for i in range(0, len(contacts_to_classify), AI_BATCH_SIZE)]
self.logger.info(f"Teile KI-Anfrage in {len(contact_chunks)} Batches von max. {AI_BATCH_SIZE} Kontakten auf.")
for i, chunk in enumerate(contact_chunks):
self.logger.info(f"Verarbeite KI-Batch {i+1}/{len(contact_chunks)}...")
chunk_results = self._get_ai_classification(chunk)
ai_results_map.update(chunk_results)
df['Department'] = df.apply(lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'], axis=1)
new_learnings = [{'Job Title': title, 'Department': dept} for title, dept in ai_results_map.items()]
if new_learnings:
self._append_learnings_to_source(gsh, pd.DataFrame(new_learnings))
else:
self.logger.info("Alle Jobtitel durch Regeln zugeordnet. Stufe 3 wird übersprungen.")
self.logger.info("--- Zuordnungs-Statistik ---")
stats = df['Department'].value_counts()
for department, count in stats.items(): self.logger.info(f"- {department}: {count} Zuordnungen")
self.logger.info(f"GESAMT: {len(df)} Jobtitel verarbeitet.")
output_df = df.drop(columns=['Original Job Title'])
output_data = [output_df.columns.values.tolist()] + output_df.values.tolist()
if gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data):
self.logger.info(f"Ergebnisse erfolgreich in '{TARGET_SHEET_NAME}' geschrieben.")
else:
self.logger.error("Fehler beim Zurückschreiben der Daten.")
if __name__ == "__main__": if __name__ == "__main__":
setup_logging() setup_logging()
logging.info(f"Starte contact_grouping.py v{__version__}") build_knowledge_base()
Config.load_api_keys()
grouper = ContactGrouper()
if not grouper.load_knowledge_base():
logging.critical("Skript-Abbruch: Wissensbasis nicht geladen.")
sys.exit(1)
grouper.process_contacts()