From 5cf6e22006fe73d24648bef0903b5b722166a773 Mon Sep 17 00:00:00 2001 From: Floke Date: Thu, 18 Sep 2025 13:43:56 +0000 Subject: [PATCH] v1.2.3 - Bugfix SyntaxError bei KI-Beispiel-Generierung MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Bugfix: Behebt einen `SyntaxError: invalid syntax` in der Funktion `_generate_ai_examples`. - Die fehlerhafte f-String-Formatierung, die einen Backslash innerhalb eines Ausdrucks enthielt, wurde durch eine robuste String-Verkettung ersetzt. - Dies stellt die Lauffähigkeit des Skripts auf allen Python-Versionen sicher. --- knowledge_base_builder.py | 364 ++++++++++++++++++++++---------------- 1 file changed, 210 insertions(+), 154 deletions(-) diff --git a/knowledge_base_builder.py b/knowledge_base_builder.py index c1a6c929..372ed908 100644 --- a/knowledge_base_builder.py +++ b/knowledge_base_builder.py @@ -1,196 +1,252 @@ -# knowledge_base_builder.py +# contact_grouping.py -__version__ = "v1.2.2" +__version__ = "v1.2.3" import logging import json import re import os import sys -from collections import Counter import pandas as pd +from collections import defaultdict from google_sheet_handler import GoogleSheetHandler -from helpers import create_log_filename +from helpers import create_log_filename, call_openai_chat from config import Config # --- Konfiguration --- -SOURCE_SHEET_NAME = "CRM_Jobtitles" -EXACT_MATCH_OUTPUT_FILE = "exact_match_map.json" -KEYWORD_RULES_OUTPUT_FILE = "keyword_rules.json" - -DEPARTMENT_PRIORITIES = { - "Fuhrparkmanagement": 1, - "Legal": 1, - "Baustofflogistik": 1, - "Baustoffherstellung": 1, - "Field Service Management / Kundenservice": 2, - "IT": 3, - "Production Maintenance / Wartung Produktion": 4, - "Utility Maintenance": 5, - "Procurement / Einkauf": 6, - "Supply Chain Management": 7, - "Finanzen": 8, - "Technik": 8, - "Management / GF / C-Level": 10, - "Logistik": 11, - "Vertrieb": 12, - "Transportwesen": 13, - "Berater": 20, - "Undefined": 99 -} - -BRANCH_GROUP_RULES = { - "bau": [ - "Baustoffhandel", "Baustoffindustrie", - "Logistiker Baustoffe", "Bauunternehmen" - ], - "versorger": [ - "Stadtwerke", "Verteilnetzbetreiber", - "Telekommunikation", "Gase & Mineralöl" - ], - "produktion": [ - "Maschinenbau", "Automobil", "Anlagenbau", "Medizintechnik", - "Chemie & Pharma", "Elektrotechnik", "Lebensmittelproduktion", - "Bürotechnik", "Automaten (Vending, Slot)", "Gebäudetechnik Allgemein", - "Braune & Weiße Ware", "Fenster / Glas", "Getränke", "Möbel", "Agrar, Pellets" - ] -} - -MIN_SAMPLES_FOR_BRANCH_RULE = 5 -# --- MODIFIZIERT: Schwellenwert auf 60% gesenkt --- -BRANCH_SPECIFICITY_THRESHOLD = 0.6 - -STOP_WORDS = { - 'manager', 'leiter', 'head', 'lead', 'senior', 'junior', 'direktor', 'director', - 'verantwortlicher', 'beauftragter', 'referent', 'sachbearbeiter', 'mitarbeiter', - 'spezialist', 'specialist', 'expert', 'experte', 'consultant', 'berater', - 'assistant', 'assistenz', 'teamleiter', 'teamlead', 'abteilungsleiter', - 'bereichsleiter', 'gruppenleiter', 'geschäftsführer', 'vorstand', 'ceo', 'cio', - 'cfo', 'cto', 'coo', 'von', 'of', 'und', 'für', 'der', 'die', 'das', '&' -} +TARGET_SHEET_NAME = "Matching_Positions" +LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles" +EXACT_MATCH_FILE = "exact_match_map.json" +KEYWORD_RULES_FILE = "keyword_rules.json" +DEFAULT_DEPARTMENT = "Undefined" +AI_BATCH_SIZE = 150 def setup_logging(): - log_filename = create_log_filename("knowledge_base_builder") + log_filename = create_log_filename("contact_grouping") if not log_filename: print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden. Logge nur in die Konsole.") logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()]) return - log_level = logging.DEBUG - root_logger = logging.getLogger() if root_logger.handlers: for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) - - logging.basicConfig( - level=log_level, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(log_filename, encoding='utf-8'), - logging.StreamHandler() - ] - ) - + logging.basicConfig(level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler(log_filename, encoding='utf-8'), logging.StreamHandler()]) logging.getLogger("gspread").setLevel(logging.WARNING) logging.getLogger("oauth2client").setLevel(logging.WARNING) - logging.info(f"Logging erfolgreich initialisiert. Log-Datei: {log_filename}") +class ContactGrouper: + def __init__(self): + self.logger = logging.getLogger(__name__ + ".ContactGrouper") + self.exact_match_map = None + self.keyword_rules = None + self.ai_example_prompt_part = "" -def build_knowledge_base(): - logger = logging.getLogger(__name__) - logger.info(f"Starte Erstellung der Wissensbasis (Version {__version__})...") + def load_knowledge_base(self): + self.logger.info("Lade Wissensbasis...") + self.exact_match_map = self._load_json(EXACT_MATCH_FILE) + self.keyword_rules = self._load_json(KEYWORD_RULES_FILE) + if self.exact_match_map is None or self.keyword_rules is None: + self.logger.critical("Fehler beim Laden der Wissensbasis. Abbruch.") + return False + self._generate_ai_examples() + self.logger.info("Wissensbasis erfolgreich geladen und KI-Beispiele generiert.") + return True - gsh = GoogleSheetHandler() - df = gsh.get_sheet_as_dataframe(SOURCE_SHEET_NAME) + def _load_json(self, file_path): + if not os.path.exists(file_path): + self.logger.error(f"Wissensbasis-Datei '{file_path}' nicht gefunden.") + return None + try: + with open(file_path, 'r', encoding='utf-8') as f: + self.logger.debug(f"Lese und parse '{file_path}'...") + data = json.load(f) + self.logger.debug(f"'{file_path}' erfolgreich geparst.") + return data + except (json.JSONDecodeError, IOError) as e: + self.logger.error(f"Fehler beim Laden der Datei '{file_path}': {e}") + return None - if df is None or df.empty: - logger.critical(f"Konnte keine Daten aus '{SOURCE_SHEET_NAME}' laden. Abbruch.") - return + def _normalize_text(self, text): + if not isinstance(text, str): return "" + return text.lower().strip() - df.columns = [col.strip() for col in df.columns] - - required_cols = ["Job Title", "Department", "Branche"] - if not all(col in df.columns for col in required_cols): - logger.critical(f"Benötigte Spalten {required_cols} nicht in '{SOURCE_SHEET_NAME}' gefunden. Abbruch.") - return + def _generate_ai_examples(self): + self.logger.info("Generiere KI-Beispiele aus der Wissensbasis...") + if not self.exact_match_map: + return + titles_by_dept = defaultdict(list) + for title, dept in self.exact_match_map.items(): + titles_by_dept[dept].append(title) + example_lines = [] + sorted_depts = sorted(self.keyword_rules.keys(), key=lambda d: self.keyword_rules.get(d, {}).get('priority', 99)) + for dept in sorted_depts: + if dept == DEFAULT_DEPARTMENT or not titles_by_dept[dept]: + continue + top_titles = sorted(titles_by_dept[dept], key=len)[:5] + # --- KORREKTUR: Die fehlerhafte Zeile wurde ersetzt --- + formatted_titles = ', '.join('"' + title + '"' for title in top_titles) + example_lines.append(f"- Für '{dept}': {formatted_titles}") + self.ai_example_prompt_part = "\n".join(example_lines) + self.logger.debug(f"Generierter Beispiel-Prompt:\n{self.ai_example_prompt_part}") - logger.info(f"{len(df)} Zeilen aus '{SOURCE_SHEET_NAME}' geladen.") - - df.dropna(subset=required_cols, inplace=True) - df = df[df["Job Title"].str.strip() != ''] - df['normalized_title'] = df['Job Title'].str.lower().str.strip() - logger.info(f"{len(df)} Zeilen nach Bereinigung.") + def _find_best_match(self, job_title, company_branch): + normalized_title = self._normalize_text(job_title) + normalized_branch = self._normalize_text(company_branch) + if not normalized_title: return DEFAULT_DEPARTMENT - logger.info("Erstelle 'Primary Mapping' für exakte Treffer (Stufe 1)...") - exact_match_map = df.groupby('normalized_title')['Department'].apply(lambda x: x.mode()[0]).to_dict() - try: - with open(EXACT_MATCH_OUTPUT_FILE, 'w', encoding='utf-8') as f: - json.dump(exact_match_map, f, indent=4, ensure_ascii=False) - logger.info(f"-> '{EXACT_MATCH_OUTPUT_FILE}' mit {len(exact_match_map)} Titeln erstellt.") - except IOError as e: - logger.error(f"Fehler beim Schreiben der Datei '{EXACT_MATCH_OUTPUT_FILE}': {e}") - return - - logger.info("Erstelle 'Keyword-Datenbank' mit automatischer Branchen-Logik (Stufe 2)...") - - titles_by_department = df.groupby('Department')['normalized_title'].apply(list).to_dict() - branches_by_department = df.groupby('Department')['Branche'].apply(list).to_dict() - - keyword_rules = {} - for department, titles in titles_by_department.items(): - all_words = [] - for title in titles: - words = re.split(r'[\s/(),-]+', title) - all_words.extend([word for word in words if word]) - - word_counts = Counter(all_words) - top_keywords = [word for word, count in word_counts.most_common(50) if word not in STOP_WORDS and (len(word) > 2 or word in {'it', 'edv'})] - - if top_keywords: - rule = { - "priority": DEPARTMENT_PRIORITIES.get(department, 99), - "keywords": sorted(top_keywords) - } - - department_branches = branches_by_department.get(department, []) - total_titles_in_dept = len(department_branches) - - if total_titles_in_dept >= MIN_SAMPLES_FOR_BRANCH_RULE: - branch_group_counts = Counter() - for branch_name in department_branches: - for group_keyword, d365_names in BRANCH_GROUP_RULES.items(): - if branch_name in d365_names: - branch_group_counts[group_keyword] += 1 - - if branch_group_counts: - most_common_group, count = branch_group_counts.most_common(1)[0] - ratio = count / total_titles_in_dept - if ratio > BRANCH_SPECIFICITY_THRESHOLD: - logger.info(f" -> Department '{department}' ist spezifisch für Branche '{most_common_group}' ({ratio:.0%}). Regel wird hinzugefügt.") - rule["required_branch_keywords"] = [most_common_group] - else: - logger.debug(f" -> Department '{department}' nicht spezifisch genug. Dominante Branche '{most_common_group}' nur bei {ratio:.0%}, benötigt >{BRANCH_SPECIFICITY_THRESHOLD:.0%}.") + exact_match = self.exact_match_map.get(normalized_title) + if exact_match: + rule = self.keyword_rules.get(exact_match, {}) + required_keywords = rule.get("required_branch_keywords") + if required_keywords: + if not any(keyword in normalized_branch for keyword in required_keywords): + self.logger.debug(f"'{job_title}' -> Exakter Match '{exact_match}' verworfen (Branche: '{company_branch}')") else: - logger.debug(f" -> Department '{department}' konnte keiner Branchen-Gruppe zugeordnet werden.") + self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1, Branche OK)") + return exact_match else: - logger.debug(f" -> Department '{department}' hat zu wenige Datenpunkte ({total_titles_in_dept} < {MIN_SAMPLES_FOR_BRANCH_RULE}) für eine Branchen-Regel.") + self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1)") + return exact_match - keyword_rules[department] = rule - - try: - with open(KEYWORD_RULES_OUTPUT_FILE, 'w', encoding='utf-8') as f: - json.dump(keyword_rules, f, indent=4, ensure_ascii=False) - logger.info(f"-> '{KEYWORD_RULES_OUTPUT_FILE}' mit Regeln für {len(keyword_rules)} Departments erstellt.") - except IOError as e: - logger.error(f"Fehler beim Schreiben der Datei '{KEYWORD_RULES_OUTPUT_FILE}': {e}") - return + title_tokens = set(re.split(r'[\s/(),-]+', normalized_title)) + scores = {} + for department, rules in self.keyword_rules.items(): + required_keywords = rules.get("required_branch_keywords") + if required_keywords: + if not any(keyword in normalized_branch for keyword in required_keywords): + self.logger.debug(f"Dept '{department}' für '{job_title}' übersprungen (Branche: '{company_branch}')") + continue + matches = title_tokens.intersection(rules.get("keywords", [])) + if matches: scores[department] = len(matches) - logger.info("Wissensbasis erfolgreich erstellt.") + if not scores: + self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine passenden Keywords)") + return DEFAULT_DEPARTMENT + max_score = max(scores.values()) + top_departments = [dept for dept, score in scores.items() if score == max_score] + + if len(top_departments) == 1: + winner = top_departments[0] + self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score})") + return winner + + best_priority = float('inf') + winner = top_departments[0] + for department in top_departments: + priority = self.keyword_rules.get(department, {}).get("priority", 99) + if priority < best_priority: + best_priority = priority + winner = department + + self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score}, Prio {best_priority})") + return winner + + def _get_ai_classification(self, contacts_to_classify): + self.logger.info(f"Sende {len(contacts_to_classify)} Titel an KI (mit Kontext)...") + if not contacts_to_classify: return {} + valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT]) + prompt_parts = [ + "You are a specialized data processing tool. Your SOLE function is to receive a list of job titles and classify each one into a predefined department category.", + "--- VALID DEPARTMENT CATEGORIES ---", + ", ".join(valid_departments), + "\n--- EXAMPLES OF TYPICAL ROLES ---", + self.ai_example_prompt_part, + "\n--- RULES ---", + "1. You MUST use the 'company_branch' to make a context-aware decision.", + "2. For departments with branch requirements (like 'Baustofflogistik' for 'bau'), you MUST ONLY use them if the branch matches.", + "3. Your response MUST be a single, valid JSON array of objects.", + "4. Each object MUST contain the keys 'job_title' and 'department'.", + "5. Your entire response MUST start with '[' and end with ']'.", + "6. You MUST NOT add any introductory text, explanations, summaries, or markdown formatting like ```json.", + "\n--- CONTACTS TO CLASSIFY (JSON) ---", + json.dumps(contacts_to_classify, ensure_ascii=False) + ] + prompt = "\n".join(prompt_parts) + response_str = "" + try: + response_str = call_openai_chat(prompt, temperature=0.0, model="gpt-4o-mini", response_format_json=True) + match = re.search(r'\[.*\]', response_str, re.DOTALL) + if not match: + self.logger.error("Kein JSON-Array in KI-Antwort gefunden.") + self.logger.debug(f"ROH-ANTWORT DER API:\n{response_str}") + return {} + json_str = match.group(0) + results_list = json.loads(json_str) + classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments} + self.logger.info(f"{len(classified_map)} Titel erfolgreich von KI klassifiziert.") + return classified_map + except json.JSONDecodeError as e: + self.logger.error(f"Fehler beim Parsen des extrahierten JSON: {e}") + self.logger.debug(f"EXTRAHIERTER JSON-STRING, DER FEHLER VERURSACHTE:\n{json_str}") + return {} + except Exception as e: + self.logger.error(f"Unerwarteter Fehler bei KI-Klassifizierung: {e}") + return {} + + def _append_learnings_to_source(self, gsh, new_mappings_df): + if new_mappings_df.empty: return + self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...") + rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist() + if not gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append): + self.logger.error("Fehler beim Anhängen der Lern-Daten.") + + def process_contacts(self): + self.logger.info("Starte Kontakt-Verarbeitung...") + gsh = GoogleSheetHandler() + df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME) + if df is None or df.empty: + self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer. Nichts zu tun.") + return + self.logger.info(f"{len(df)} Zeilen aus '{TARGET_SHEET_NAME}' geladen.") + df.columns = [col.strip() for col in df.columns] + if "Job Title" not in df.columns or "Branche" not in df.columns: + self.logger.critical(f"Benötigte Spalten 'Job Title' und/oder 'Branche' nicht gefunden. Abbruch.") + return + df['Original Job Title'] = df['Job Title'] + if "Department" not in df.columns: df["Department"] = "" + self.logger.info("Starte regelbasierte Zuordnung (Stufe 1 & 2) mit Branchen-Kontext...") + df['Department'] = df.apply(lambda row: self._find_best_match(row['Job Title'], row.get('Branche', '')), axis=1) + self.logger.info("Regelbasierte Zuordnung abgeschlossen.") + undefined_df = df[df['Department'] == DEFAULT_DEPARTMENT] + if not undefined_df.empty: + self.logger.info(f"{len(undefined_df)} Jobtitel konnten nicht zugeordnet werden. Starte Stufe 3 (KI).") + contacts_to_classify = undefined_df[['Job Title', 'Branche']].drop_duplicates().to_dict('records') + contacts_to_classify = [{'job_title': c['Job Title'], 'company_branch': c.get('Branche', '')} for c in contacts_to_classify] + ai_results_map = {} + contact_chunks = [contacts_to_classify[i:i + AI_BATCH_SIZE] for i in range(0, len(contacts_to_classify), AI_BATCH_SIZE)] + self.logger.info(f"Teile KI-Anfrage in {len(contact_chunks)} Batches von max. {AI_BATCH_SIZE} Kontakten auf.") + for i, chunk in enumerate(contact_chunks): + self.logger.info(f"Verarbeite KI-Batch {i+1}/{len(contact_chunks)}...") + chunk_results = self._get_ai_classification(chunk) + ai_results_map.update(chunk_results) + df['Department'] = df.apply(lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'], axis=1) + new_learnings = [{'Job Title': title, 'Department': dept} for title, dept in ai_results_map.items()] + if new_learnings: + self._append_learnings_to_source(gsh, pd.DataFrame(new_learnings)) + else: + self.logger.info("Alle Jobtitel durch Regeln zugeordnet. Stufe 3 wird übersprungen.") + self.logger.info("--- Zuordnungs-Statistik ---") + stats = df['Department'].value_counts() + for department, count in stats.items(): self.logger.info(f"- {department}: {count} Zuordnungen") + self.logger.info(f"GESAMT: {len(df)} Jobtitel verarbeitet.") + output_df = df.drop(columns=['Original Job Title']) + output_data = [output_df.columns.values.tolist()] + output_df.values.tolist() + if gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data): + self.logger.info(f"Ergebnisse erfolgreich in '{TARGET_SHEET_NAME}' geschrieben.") + else: + self.logger.error("Fehler beim Zurückschreiben der Daten.") if __name__ == "__main__": setup_logging() - build_knowledge_base() \ No newline at end of file + logging.info(f"Starte contact_grouping.py v{__version__}") + Config.load_api_keys() + grouper = ContactGrouper() + if not grouper.load_knowledge_base(): + logging.critical("Skript-Abbruch: Wissensbasis nicht geladen.") + sys.exit(1) + grouper.process_contacts() \ No newline at end of file