diff --git a/contact_grouping.py b/contact_grouping.py index 1580aac3..8cebc4b0 100644 --- a/contact_grouping.py +++ b/contact_grouping.py @@ -1,6 +1,6 @@ # contact_grouping.py -__version__ = "v1.1.8" # Versionsnummer hochgezählt +__version__ = "v1.2.2" import logging import json @@ -8,10 +8,11 @@ import re import os import sys import pandas as pd +from collections import defaultdict from google_sheet_handler import GoogleSheetHandler from helpers import create_log_filename, call_openai_chat -from config import LOG_DIR, Config +from config import Config # --- Konfiguration --- TARGET_SHEET_NAME = "Matching_Positions" @@ -19,13 +20,12 @@ LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles" EXACT_MATCH_FILE = "exact_match_map.json" KEYWORD_RULES_FILE = "keyword_rules.json" DEFAULT_DEPARTMENT = "Undefined" -# NEU: Konfigurierbare Batch-Größe für KI-Anfragen -AI_BATCH_SIZE = 200 +AI_BATCH_SIZE = 150 def setup_logging(): log_filename = create_log_filename("contact_grouping") if not log_filename: - print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden...") + print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden. Logge nur in die Konsole.") logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()]) return log_level = logging.DEBUG @@ -38,12 +38,12 @@ def setup_logging(): logging.getLogger("oauth2client").setLevel(logging.WARNING) logging.info(f"Logging erfolgreich initialisiert. Log-Datei: {log_filename}") - class ContactGrouper: def __init__(self): self.logger = logging.getLogger(__name__ + ".ContactGrouper") self.exact_match_map = None self.keyword_rules = None + self.ai_example_prompt_part = "" def load_knowledge_base(self): self.logger.info("Lade Wissensbasis...") @@ -52,42 +52,77 @@ class ContactGrouper: if self.exact_match_map is None or self.keyword_rules is None: self.logger.critical("Fehler beim Laden der Wissensbasis. Abbruch.") return False - self.logger.info("Wissensbasis erfolgreich geladen.") + self._generate_ai_examples() + self.logger.info("Wissensbasis erfolgreich geladen und KI-Beispiele generiert.") return True def _load_json(self, file_path): if not os.path.exists(file_path): - self.logger.error(f"Datei '{file_path}' nicht gefunden.") + self.logger.error(f"Wissensbasis-Datei '{file_path}' nicht gefunden.") return None try: with open(file_path, 'r', encoding='utf-8') as f: - self.logger.debug(f"Lese '{file_path}'...") - return json.load(f) + self.logger.debug(f"Lese und parse '{file_path}'...") + data = json.load(f) + self.logger.debug(f"'{file_path}' erfolgreich geparst.") + return data except (json.JSONDecodeError, IOError) as e: self.logger.error(f"Fehler beim Laden der Datei '{file_path}': {e}") return None - def _normalize_job_title(self, job_title): - if not isinstance(job_title, str): return "" - return job_title.lower().strip() + def _normalize_text(self, text): + if not isinstance(text, str): return "" + return text.lower().strip() + + def _generate_ai_examples(self): + self.logger.info("Generiere KI-Beispiele aus der Wissensbasis...") + if not self.exact_match_map: + return + titles_by_dept = defaultdict(list) + for title, dept in self.exact_match_map.items(): + titles_by_dept[dept].append(title) + example_lines = [] + sorted_depts = sorted(self.keyword_rules.keys(), key=lambda d: self.keyword_rules.get(d, {}).get('priority', 99)) + for dept in sorted_depts: + if dept == DEFAULT_DEPARTMENT or not titles_by_dept[dept]: + continue + top_titles = sorted(titles_by_dept[dept], key=len)[:5] + example_lines.append(f"- Für '{dept}': {', '.join(f'\\"{title}\\"' for title in top_titles)}") + self.ai_example_prompt_part = "\n".join(example_lines) + self.logger.debug(f"Generierter Beispiel-Prompt:\n{self.ai_example_prompt_part}") - def _find_best_match(self, job_title): - normalized_title = self._normalize_job_title(job_title) + def _find_best_match(self, job_title, company_branch): + normalized_title = self._normalize_text(job_title) + normalized_branch = self._normalize_text(company_branch) if not normalized_title: return DEFAULT_DEPARTMENT exact_match = self.exact_match_map.get(normalized_title) if exact_match: - self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1)") - return exact_match + rule = self.keyword_rules.get(exact_match, {}) + required_keywords = rule.get("required_branch_keywords") + if required_keywords: + if not any(keyword in normalized_branch for keyword in required_keywords): + self.logger.debug(f"'{job_title}' -> Exakter Match '{exact_match}' verworfen (Branche: '{company_branch}')") + else: + self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1, Branche OK)") + return exact_match + else: + self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1)") + return exact_match title_tokens = set(re.split(r'[\s/(),-]+', normalized_title)) scores = {} for department, rules in self.keyword_rules.items(): + required_keywords = rules.get("required_branch_keywords") + if required_keywords: + if not any(keyword in normalized_branch for keyword in required_keywords): + self.logger.debug(f"Dept '{department}' für '{job_title}' übersprungen (Branche: '{company_branch}')") + continue matches = title_tokens.intersection(rules.get("keywords", [])) if matches: scores[department] = len(matches) if not scores: - self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine Keywords)") + self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine passenden Keywords)") return DEFAULT_DEPARTMENT max_score = max(scores.values()) @@ -101,7 +136,7 @@ class ContactGrouper: best_priority = float('inf') winner = top_departments[0] for department in top_departments: - priority = self.keyword_rules[department].get("priority", 99) + priority = self.keyword_rules.get(department, {}).get("priority", 99) if priority < best_priority: best_priority = priority winner = department @@ -109,117 +144,107 @@ class ContactGrouper: self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score}, Prio {best_priority})") return winner - def _get_ai_classification(self, job_titles_to_classify): - self.logger.info(f"Sende {len(job_titles_to_classify)} Titel an KI...") - if not job_titles_to_classify: return {} - + def _get_ai_classification(self, contacts_to_classify): + self.logger.info(f"Sende {len(contacts_to_classify)} Titel an KI (mit Kontext)...") + if not contacts_to_classify: return {} valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT]) prompt_parts = [ - "You are a specialized data processing tool...", # Gekürzt zur Lesbarkeit - json.dumps(job_titles_to_classify, ensure_ascii=False) + "You are a specialized data processing tool. Your SOLE function is to receive a list of job titles and classify each one into a predefined department category.", + "--- VALID DEPARTMENT CATEGORIES ---", + ", ".join(valid_departments), + "\n--- EXAMPLES OF TYPICAL ROLES ---", + self.ai_example_prompt_part, + "\n--- RULES ---", + "1. You MUST use the 'company_branch' to make a context-aware decision.", + "2. For departments with branch requirements (like 'Baustofflogistik' for 'bau'), you MUST ONLY use them if the branch matches.", + "3. Your response MUST be a single, valid JSON array of objects.", + "4. Each object MUST contain the keys 'job_title' and 'department'.", + "5. Your entire response MUST start with '[' and end with ']'.", + "6. You MUST NOT add any introductory text, explanations, summaries, or markdown formatting like ```json.", + "\n--- CONTACTS TO CLASSIFY (JSON) ---", + json.dumps(contacts_to_classify, ensure_ascii=False) ] prompt = "\n".join(prompt_parts) - response_str = "" try: response_str = call_openai_chat(prompt, temperature=0.0, model="gpt-4o-mini", response_format_json=True) match = re.search(r'\[.*\]', response_str, re.DOTALL) - if not match: self.logger.error("Kein JSON-Array in KI-Antwort gefunden.") self.logger.debug(f"ROH-ANTWORT DER API:\n{response_str}") return {} - json_str = match.group(0) results_list = json.loads(json_str) classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments} - self.logger.info(f"{len(classified_map)} Titel erfolgreich von KI klassifiziert.") return classified_map + except json.JSONDecodeError as e: + self.logger.error(f"Fehler beim Parsen des extrahierten JSON: {e}") + self.logger.debug(f"EXTRAHIERTER JSON-STRING, DER FEHLER VERURSACHTE:\n{json_str}") + return {} except Exception as e: - self.logger.error(f"Fehler bei KI-Klassifizierung: {e}") + self.logger.error(f"Unerwarteter Fehler bei KI-Klassifizierung: {e}") return {} def _append_learnings_to_source(self, gsh, new_mappings_df): if new_mappings_df.empty: return - self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...") + self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...") rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist() - gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append) + if not gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append): + self.logger.error("Fehler beim Anhängen der Lern-Daten.") def process_contacts(self): self.logger.info("Starte Kontakt-Verarbeitung...") gsh = GoogleSheetHandler() df = gsh.get_sheet_as_dataframe(TARGET_SHEET_NAME) - if df is None or df.empty: self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer. Nichts zu tun.") return - self.logger.info(f"{len(df)} Zeilen aus '{TARGET_SHEET_NAME}' geladen.") df.columns = [col.strip() for col in df.columns] - if "Job Title" not in df.columns: - self.logger.critical(f"Spalte 'Job Title' nicht gefunden. Abbruch.") + if "Job Title" not in df.columns or "Branche" not in df.columns: + self.logger.critical(f"Benötigte Spalten 'Job Title' und/oder 'Branche' nicht gefunden. Abbruch.") return - df['Original Job Title'] = df['Job Title'] if "Department" not in df.columns: df["Department"] = "" - - self.logger.info("Starte regelbasierte Zuordnung (Stufe 1 & 2)...") - df['Department'] = df['Job Title'].apply(self._find_best_match) + self.logger.info("Starte regelbasierte Zuordnung (Stufe 1 & 2) mit Branchen-Kontext...") + df['Department'] = df.apply(lambda row: self._find_best_match(row['Job Title'], row.get('Branche', '')), axis=1) self.logger.info("Regelbasierte Zuordnung abgeschlossen.") - undefined_df = df[df['Department'] == DEFAULT_DEPARTMENT] - if not undefined_df.empty: - self.logger.info(f"{len(undefined_df)} Jobtitel konnten nicht durch Regeln zugeordnet werden. Starte Stufe 3 (KI).") - titles_to_classify = undefined_df['Job Title'].unique().tolist() - - # --- NEU: BATCH-VERARBEITUNGSLOGIK --- + self.logger.info(f"{len(undefined_df)} Jobtitel konnten nicht zugeordnet werden. Starte Stufe 3 (KI).") + contacts_to_classify = undefined_df[['Job Title', 'Branche']].drop_duplicates().to_dict('records') + contacts_to_classify = [{'job_title': c['Job Title'], 'company_branch': c.get('Branche', '')} for c in contacts_to_classify] ai_results_map = {} - title_chunks = [titles_to_classify[i:i + AI_BATCH_SIZE] for i in range(0, len(titles_to_classify), AI_BATCH_SIZE)] - - self.logger.info(f"Teile KI-Anfrage in {len(title_chunks)} Batches von max. {AI_BATCH_SIZE} Titeln auf.") - - for i, chunk in enumerate(title_chunks): - self.logger.info(f"Verarbeite Batch {i+1}/{len(title_chunks)}...") + contact_chunks = [contacts_to_classify[i:i + AI_BATCH_SIZE] for i in range(0, len(contacts_to_classify), AI_BATCH_SIZE)] + self.logger.info(f"Teile KI-Anfrage in {len(contact_chunks)} Batches von max. {AI_BATCH_SIZE} Kontakten auf.") + for i, chunk in enumerate(contact_chunks): + self.logger.info(f"Verarbeite KI-Batch {i+1}/{len(contact_chunks)}...") chunk_results = self._get_ai_classification(chunk) ai_results_map.update(chunk_results) - - df['Department'] = df.apply( - lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'], - axis=1 - ) - + df['Department'] = df.apply(lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'], axis=1) new_learnings = [{'Job Title': title, 'Department': dept} for title, dept in ai_results_map.items()] if new_learnings: self._append_learnings_to_source(gsh, pd.DataFrame(new_learnings)) else: self.logger.info("Alle Jobtitel durch Regeln zugeordnet. Stufe 3 wird übersprungen.") - self.logger.info("--- Zuordnungs-Statistik ---") stats = df['Department'].value_counts() for department, count in stats.items(): self.logger.info(f"- {department}: {count} Zuordnungen") self.logger.info(f"GESAMT: {len(df)} Jobtitel verarbeitet.") - output_df = df.drop(columns=['Original Job Title']) output_data = [output_df.columns.values.tolist()] + output_df.values.tolist() - if gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data): self.logger.info(f"Ergebnisse erfolgreich in '{TARGET_SHEET_NAME}' geschrieben.") else: self.logger.error("Fehler beim Zurückschreiben der Daten.") - if __name__ == "__main__": setup_logging() logging.info(f"Starte contact_grouping.py v{__version__}") - Config.load_api_keys() - grouper = ContactGrouper() - if not grouper.load_knowledge_base(): logging.critical("Skript-Abbruch: Wissensbasis nicht geladen.") sys.exit(1) - grouper.process_contacts() \ No newline at end of file