v1.2.3 - Bugfix SyntaxError bei KI-Beispiel-Generierung

- Bugfix: Behebt einen `SyntaxError: invalid syntax` in der Funktion `_generate_ai_examples`.
- Die fehlerhafte f-String-Formatierung, die einen Backslash innerhalb eines Ausdrucks enthielt, wurde durch eine robuste String-Verkettung ersetzt.
- Dies stellt die Lauffähigkeit des Skripts auf allen Python-Versionen sicher.
This commit is contained in:
2025-09-18 13:43:56 +00:00
parent a0c7d26e9f
commit 7f3d6c603a

View File

@@ -1,196 +1,252 @@
# knowledge_base_builder.py # contact_grouping.py
__version__ = "v1.2.2" __version__ = "v1.2.3"
import logging import logging
import json import json
import re import re
import os import os
import sys import sys
from collections import Counter
import pandas as pd import pandas as pd
from collections import defaultdict
from google_sheet_handler import GoogleSheetHandler from google_sheet_handler import GoogleSheetHandler
from helpers import create_log_filename from helpers import create_log_filename, call_openai_chat
from config import Config from config import Config
# --- Konfiguration --- # --- Konfiguration ---
SOURCE_SHEET_NAME = "CRM_Jobtitles" TARGET_SHEET_NAME = "Matching_Positions"
EXACT_MATCH_OUTPUT_FILE = "exact_match_map.json" LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles"
KEYWORD_RULES_OUTPUT_FILE = "keyword_rules.json" EXACT_MATCH_FILE = "exact_match_map.json"
KEYWORD_RULES_FILE = "keyword_rules.json"
DEPARTMENT_PRIORITIES = { DEFAULT_DEPARTMENT = "Undefined"
"Fuhrparkmanagement": 1, AI_BATCH_SIZE = 150
"Legal": 1,
"Baustofflogistik": 1,
"Baustoffherstellung": 1,
"Field Service Management / Kundenservice": 2,
"IT": 3,
"Production Maintenance / Wartung Produktion": 4,
"Utility Maintenance": 5,
"Procurement / Einkauf": 6,
"Supply Chain Management": 7,
"Finanzen": 8,
"Technik": 8,
"Management / GF / C-Level": 10,
"Logistik": 11,
"Vertrieb": 12,
"Transportwesen": 13,
"Berater": 20,
"Undefined": 99
}
BRANCH_GROUP_RULES = {
"bau": [
"Baustoffhandel", "Baustoffindustrie",
"Logistiker Baustoffe", "Bauunternehmen"
],
"versorger": [
"Stadtwerke", "Verteilnetzbetreiber",
"Telekommunikation", "Gase & Mineralöl"
],
"produktion": [
"Maschinenbau", "Automobil", "Anlagenbau", "Medizintechnik",
"Chemie & Pharma", "Elektrotechnik", "Lebensmittelproduktion",
"Bürotechnik", "Automaten (Vending, Slot)", "Gebäudetechnik Allgemein",
"Braune & Weiße Ware", "Fenster / Glas", "Getränke", "Möbel", "Agrar, Pellets"
]
}
MIN_SAMPLES_FOR_BRANCH_RULE = 5
# --- MODIFIZIERT: Schwellenwert auf 60% gesenkt ---
BRANCH_SPECIFICITY_THRESHOLD = 0.6
STOP_WORDS = {
'manager', 'leiter', 'head', 'lead', 'senior', 'junior', 'direktor', 'director',
'verantwortlicher', 'beauftragter', 'referent', 'sachbearbeiter', 'mitarbeiter',
'spezialist', 'specialist', 'expert', 'experte', 'consultant', 'berater',
'assistant', 'assistenz', 'teamleiter', 'teamlead', 'abteilungsleiter',
'bereichsleiter', 'gruppenleiter', 'geschäftsführer', 'vorstand', 'ceo', 'cio',
'cfo', 'cto', 'coo', 'von', 'of', 'und', 'für', 'der', 'die', 'das', '&'
}
def setup_logging(): def setup_logging():
log_filename = create_log_filename("knowledge_base_builder") log_filename = create_log_filename("contact_grouping")
if not log_filename: if not log_filename:
print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden. Logge nur in die Konsole.") print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden. Logge nur in die Konsole.")
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()]) logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()])
return return
log_level = logging.DEBUG log_level = logging.DEBUG
root_logger = logging.getLogger() root_logger = logging.getLogger()
if root_logger.handlers: if root_logger.handlers:
for handler in root_logger.handlers[:]: for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler) root_logger.removeHandler(handler)
logging.basicConfig(level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler(log_filename, encoding='utf-8'), logging.StreamHandler()])
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename, encoding='utf-8'),
logging.StreamHandler()
]
)
logging.getLogger("gspread").setLevel(logging.WARNING) logging.getLogger("gspread").setLevel(logging.WARNING)
logging.getLogger("oauth2client").setLevel(logging.WARNING) logging.getLogger("oauth2client").setLevel(logging.WARNING)
logging.info(f"Logging erfolgreich initialisiert. Log-Datei: {log_filename}") logging.info(f"Logging erfolgreich initialisiert. Log-Datei: {log_filename}")
class ContactGrouper:
def __init__(self):
self.logger = logging.getLogger(__name__ + ".ContactGrouper")
self.exact_match_map = None
self.keyword_rules = None
self.ai_example_prompt_part = ""
def build_knowledge_base(): def load_knowledge_base(self):
logger = logging.getLogger(__name__) self.logger.info("Lade Wissensbasis...")
logger.info(f"Starte Erstellung der Wissensbasis (Version {__version__})...") self.exact_match_map = self._load_json(EXACT_MATCH_FILE)
self.keyword_rules = self._load_json(KEYWORD_RULES_FILE)
if self.exact_match_map is None or self.keyword_rules is None:
self.logger.critical("Fehler beim Laden der Wissensbasis. Abbruch.")
return False
self._generate_ai_examples()
self.logger.info("Wissensbasis erfolgreich geladen und KI-Beispiele generiert.")
return True
gsh = GoogleSheetHandler() def _load_json(self, file_path):
df = gsh.get_sheet_as_dataframe(SOURCE_SHEET_NAME) if not os.path.exists(file_path):
self.logger.error(f"Wissensbasis-Datei '{file_path}' nicht gefunden.")
return None
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.logger.debug(f"Lese und parse '{file_path}'...")
data = json.load(f)
self.logger.debug(f"'{file_path}' erfolgreich geparst.")
return data
except (json.JSONDecodeError, IOError) as e:
self.logger.error(f"Fehler beim Laden der Datei '{file_path}': {e}")
return None
if df is None or df.empty: def _normalize_text(self, text):
logger.critical(f"Konnte keine Daten aus '{SOURCE_SHEET_NAME}' laden. Abbruch.") if not isinstance(text, str): return ""
return return text.lower().strip()
df.columns = [col.strip() for col in df.columns] def _generate_ai_examples(self):
self.logger.info("Generiere KI-Beispiele aus der Wissensbasis...")
required_cols = ["Job Title", "Department", "Branche"] if not self.exact_match_map:
if not all(col in df.columns for col in required_cols): return
logger.critical(f"Benötigte Spalten {required_cols} nicht in '{SOURCE_SHEET_NAME}' gefunden. Abbruch.") titles_by_dept = defaultdict(list)
return for title, dept in self.exact_match_map.items():
titles_by_dept[dept].append(title)
example_lines = []
sorted_depts = sorted(self.keyword_rules.keys(), key=lambda d: self.keyword_rules.get(d, {}).get('priority', 99))
for dept in sorted_depts:
if dept == DEFAULT_DEPARTMENT or not titles_by_dept[dept]:
continue
top_titles = sorted(titles_by_dept[dept], key=len)[:5]
# --- KORREKTUR: Die fehlerhafte Zeile wurde ersetzt ---
formatted_titles = ', '.join('"' + title + '"' for title in top_titles)
example_lines.append(f"- Für '{dept}': {formatted_titles}")
self.ai_example_prompt_part = "\n".join(example_lines)
self.logger.debug(f"Generierter Beispiel-Prompt:\n{self.ai_example_prompt_part}")
logger.info(f"{len(df)} Zeilen aus '{SOURCE_SHEET_NAME}' geladen.") def _find_best_match(self, job_title, company_branch):
normalized_title = self._normalize_text(job_title)
df.dropna(subset=required_cols, inplace=True) normalized_branch = self._normalize_text(company_branch)
df = df[df["Job Title"].str.strip() != ''] if not normalized_title: return DEFAULT_DEPARTMENT
df['normalized_title'] = df['Job Title'].str.lower().str.strip()
logger.info(f"{len(df)} Zeilen nach Bereinigung.")
logger.info("Erstelle 'Primary Mapping' für exakte Treffer (Stufe 1)...") exact_match = self.exact_match_map.get(normalized_title)
exact_match_map = df.groupby('normalized_title')['Department'].apply(lambda x: x.mode()[0]).to_dict() if exact_match:
try: rule = self.keyword_rules.get(exact_match, {})
with open(EXACT_MATCH_OUTPUT_FILE, 'w', encoding='utf-8') as f: required_keywords = rule.get("required_branch_keywords")
json.dump(exact_match_map, f, indent=4, ensure_ascii=False) if required_keywords:
logger.info(f"-> '{EXACT_MATCH_OUTPUT_FILE}' mit {len(exact_match_map)} Titeln erstellt.") if not any(keyword in normalized_branch for keyword in required_keywords):
except IOError as e: self.logger.debug(f"'{job_title}' -> Exakter Match '{exact_match}' verworfen (Branche: '{company_branch}')")
logger.error(f"Fehler beim Schreiben der Datei '{EXACT_MATCH_OUTPUT_FILE}': {e}")
return
logger.info("Erstelle 'Keyword-Datenbank' mit automatischer Branchen-Logik (Stufe 2)...")
titles_by_department = df.groupby('Department')['normalized_title'].apply(list).to_dict()
branches_by_department = df.groupby('Department')['Branche'].apply(list).to_dict()
keyword_rules = {}
for department, titles in titles_by_department.items():
all_words = []
for title in titles:
words = re.split(r'[\s/(),-]+', title)
all_words.extend([word for word in words if word])
word_counts = Counter(all_words)
top_keywords = [word for word, count in word_counts.most_common(50) if word not in STOP_WORDS and (len(word) > 2 or word in {'it', 'edv'})]
if top_keywords:
rule = {
"priority": DEPARTMENT_PRIORITIES.get(department, 99),
"keywords": sorted(top_keywords)
}
department_branches = branches_by_department.get(department, [])
total_titles_in_dept = len(department_branches)
if total_titles_in_dept >= MIN_SAMPLES_FOR_BRANCH_RULE:
branch_group_counts = Counter()
for branch_name in department_branches:
for group_keyword, d365_names in BRANCH_GROUP_RULES.items():
if branch_name in d365_names:
branch_group_counts[group_keyword] += 1
if branch_group_counts:
most_common_group, count = branch_group_counts.most_common(1)[0]
ratio = count / total_titles_in_dept
if ratio > BRANCH_SPECIFICITY_THRESHOLD:
logger.info(f" -> Department '{department}' ist spezifisch für Branche '{most_common_group}' ({ratio:.0%}). Regel wird hinzugefügt.")
rule["required_branch_keywords"] = [most_common_group]
else:
logger.debug(f" -> Department '{department}' nicht spezifisch genug. Dominante Branche '{most_common_group}' nur bei {ratio:.0%}, benötigt >{BRANCH_SPECIFICITY_THRESHOLD:.0%}.")
else: else:
logger.debug(f" -> Department '{department}' konnte keiner Branchen-Gruppe zugeordnet werden.") self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1, Branche OK)")
return exact_match
else: else:
logger.debug(f" -> Department '{department}' hat zu wenige Datenpunkte ({total_titles_in_dept} < {MIN_SAMPLES_FOR_BRANCH_RULE}) für eine Branchen-Regel.") self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1)")
return exact_match
keyword_rules[department] = rule title_tokens = set(re.split(r'[\s/(),-]+', normalized_title))
scores = {}
try: for department, rules in self.keyword_rules.items():
with open(KEYWORD_RULES_OUTPUT_FILE, 'w', encoding='utf-8') as f: required_keywords = rules.get("required_branch_keywords")
json.dump(keyword_rules, f, indent=4, ensure_ascii=False) if required_keywords:
logger.info(f"-> '{KEYWORD_RULES_OUTPUT_FILE}' mit Regeln für {len(keyword_rules)} Departments erstellt.") if not any(keyword in normalized_branch for keyword in required_keywords):
except IOError as e: self.logger.debug(f"Dept '{department}' für '{job_title}' übersprungen (Branche: '{company_branch}')")
logger.error(f"Fehler beim Schreiben der Datei '{KEYWORD_RULES_OUTPUT_FILE}': {e}") continue
return matches = title_tokens.intersection(rules.get("keywords", []))
if matches: scores[department] = len(matches)
logger.info("Wissensbasis erfolgreich erstellt.") if not scores:
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine passenden Keywords)")
return DEFAULT_DEPARTMENT
max_score = max(scores.values())
top_departments = [dept for dept, score in scores.items() if score == max_score]
if len(top_departments) == 1:
winner = top_departments[0]
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score})")
return winner
best_priority = float('inf')
winner = top_departments[0]
for department in top_departments:
priority = self.keyword_rules.get(department, {}).get("priority", 99)
if priority < best_priority:
best_priority = priority
winner = department
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score}, Prio {best_priority})")
return winner
def _get_ai_classification(self, contacts_to_classify):
self.logger.info(f"Sende {len(contacts_to_classify)} Titel an KI (mit Kontext)...")
if not contacts_to_classify: return {}
valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT])
prompt_parts = [
"You are a specialized data processing tool. Your SOLE function is to receive a list of job titles and classify each one into a predefined department category.",
"--- VALID DEPARTMENT CATEGORIES ---",
", ".join(valid_departments),
"\n--- EXAMPLES OF TYPICAL ROLES ---",
self.ai_example_prompt_part,
"\n--- RULES ---",
"1. You MUST use the 'company_branch' to make a context-aware decision.",
"2. For departments with branch requirements (like 'Baustofflogistik' for 'bau'), you MUST ONLY use them if the branch matches.",
"3. Your response MUST be a single, valid JSON array of objects.",
"4. Each object MUST contain the keys 'job_title' and 'department'.",
"5. Your entire response MUST start with '[' and end with ']'.",
"6. You MUST NOT add any introductory text, explanations, summaries, or markdown formatting like ```json.",
"\n--- CONTACTS TO CLASSIFY (JSON) ---",
json.dumps(contacts_to_classify, ensure_ascii=False)
]
prompt = "\n".join(prompt_parts)
response_str = ""
try:
response_str = call_openai_chat(prompt, temperature=0.0, model="gpt-4o-mini", response_format_json=True)
match = re.search(r'\[.*\]', response_str, re.DOTALL)
if not match:
self.logger.error("Kein JSON-Array in KI-Antwort gefunden.")
self.logger.debug(f"ROH-ANTWORT DER API:\n{response_str}")
return {}
json_str = match.group(0)
results_list = json.loads(json_str)
classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments}
self.logger.info(f"{len(classified_map)} Titel erfolgreich von KI klassifiziert.")
return classified_map
except json.JSONDecodeError as e:
self.logger.error(f"Fehler beim Parsen des extrahierten JSON: {e}")
self.logger.debug(f"EXTRAHIERTER JSON-STRING, DER FEHLER VERURSACHTE:\n{json_str}")
return {}
except Exception as e:
self.logger.error(f"Unerwarteter Fehler bei KI-Klassifizierung: {e}")
return {}
def _append_learnings_to_source(self, gsh, new_mappings_df):
if new_mappings_df.empty: return
self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...")
rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist()
if not gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append):
self.logger.error("Fehler beim Anhängen der Lern-Daten.")
def process_contacts(self):
self.logger.info("Starte Kontakt-Verarbeitung...")
gsh = GoogleSheetHandler()
df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME)
if df is None or df.empty:
self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer. Nichts zu tun.")
return
self.logger.info(f"{len(df)} Zeilen aus '{TARGET_SHEET_NAME}' geladen.")
df.columns = [col.strip() for col in df.columns]
if "Job Title" not in df.columns or "Branche" not in df.columns:
self.logger.critical(f"Benötigte Spalten 'Job Title' und/oder 'Branche' nicht gefunden. Abbruch.")
return
df['Original Job Title'] = df['Job Title']
if "Department" not in df.columns: df["Department"] = ""
self.logger.info("Starte regelbasierte Zuordnung (Stufe 1 & 2) mit Branchen-Kontext...")
df['Department'] = df.apply(lambda row: self._find_best_match(row['Job Title'], row.get('Branche', '')), axis=1)
self.logger.info("Regelbasierte Zuordnung abgeschlossen.")
undefined_df = df[df['Department'] == DEFAULT_DEPARTMENT]
if not undefined_df.empty:
self.logger.info(f"{len(undefined_df)} Jobtitel konnten nicht zugeordnet werden. Starte Stufe 3 (KI).")
contacts_to_classify = undefined_df[['Job Title', 'Branche']].drop_duplicates().to_dict('records')
contacts_to_classify = [{'job_title': c['Job Title'], 'company_branch': c.get('Branche', '')} for c in contacts_to_classify]
ai_results_map = {}
contact_chunks = [contacts_to_classify[i:i + AI_BATCH_SIZE] for i in range(0, len(contacts_to_classify), AI_BATCH_SIZE)]
self.logger.info(f"Teile KI-Anfrage in {len(contact_chunks)} Batches von max. {AI_BATCH_SIZE} Kontakten auf.")
for i, chunk in enumerate(contact_chunks):
self.logger.info(f"Verarbeite KI-Batch {i+1}/{len(contact_chunks)}...")
chunk_results = self._get_ai_classification(chunk)
ai_results_map.update(chunk_results)
df['Department'] = df.apply(lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'], axis=1)
new_learnings = [{'Job Title': title, 'Department': dept} for title, dept in ai_results_map.items()]
if new_learnings:
self._append_learnings_to_source(gsh, pd.DataFrame(new_learnings))
else:
self.logger.info("Alle Jobtitel durch Regeln zugeordnet. Stufe 3 wird übersprungen.")
self.logger.info("--- Zuordnungs-Statistik ---")
stats = df['Department'].value_counts()
for department, count in stats.items(): self.logger.info(f"- {department}: {count} Zuordnungen")
self.logger.info(f"GESAMT: {len(df)} Jobtitel verarbeitet.")
output_df = df.drop(columns=['Original Job Title'])
output_data = [output_df.columns.values.tolist()] + output_df.values.tolist()
if gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data):
self.logger.info(f"Ergebnisse erfolgreich in '{TARGET_SHEET_NAME}' geschrieben.")
else:
self.logger.error("Fehler beim Zurückschreiben der Daten.")
if __name__ == "__main__": if __name__ == "__main__":
setup_logging() setup_logging()
build_knowledge_base() logging.info(f"Starte contact_grouping.py v{__version__}")
Config.load_api_keys()
grouper = ContactGrouper()
if not grouper.load_knowledge_base():
logging.critical("Skript-Abbruch: Wissensbasis nicht geladen.")
sys.exit(1)
grouper.process_contacts()