diff --git a/contact_grouping.py b/contact_grouping.py index 9a8db8c9..1580aac3 100644 --- a/contact_grouping.py +++ b/contact_grouping.py @@ -1,6 +1,6 @@ # contact_grouping.py -__version__ = "v1.1.7" # Versionsnummer hochgezählt +__version__ = "v1.1.8" # Versionsnummer hochgezählt import logging import json @@ -19,11 +19,13 @@ LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles" EXACT_MATCH_FILE = "exact_match_map.json" KEYWORD_RULES_FILE = "keyword_rules.json" DEFAULT_DEPARTMENT = "Undefined" +# NEU: Konfigurierbare Batch-Größe für KI-Anfragen +AI_BATCH_SIZE = 200 def setup_logging(): log_filename = create_log_filename("contact_grouping") if not log_filename: - print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden. Logge nur in die Konsole.") + print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden...") logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()]) return log_level = logging.DEBUG @@ -48,21 +50,19 @@ class ContactGrouper: self.exact_match_map = self._load_json(EXACT_MATCH_FILE) self.keyword_rules = self._load_json(KEYWORD_RULES_FILE) if self.exact_match_map is None or self.keyword_rules is None: - self.logger.critical("Eine oder mehrere Wissensbasis-Dateien konnten nicht geladen werden. Abbruch.") + self.logger.critical("Fehler beim Laden der Wissensbasis. Abbruch.") return False self.logger.info("Wissensbasis erfolgreich geladen.") return True def _load_json(self, file_path): if not os.path.exists(file_path): - self.logger.error(f"Wissensbasis-Datei '{file_path}' nicht gefunden.") + self.logger.error(f"Datei '{file_path}' nicht gefunden.") return None try: with open(file_path, 'r', encoding='utf-8') as f: - self.logger.debug(f"Lese und parse '{file_path}'...") - data = json.load(f) - self.logger.debug(f"'{file_path}' erfolgreich geparst.") - return data + self.logger.debug(f"Lese '{file_path}'...") + return json.load(f) except (json.JSONDecodeError, IOError) as e: self.logger.error(f"Fehler beim Laden der Datei '{file_path}': {e}") return None @@ -77,7 +77,7 @@ class ContactGrouper: exact_match = self.exact_match_map.get(normalized_title) if exact_match: - self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1: Exakter Match)") + self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1)") return exact_match title_tokens = set(re.split(r'[\s/(),-]+', normalized_title)) @@ -87,7 +87,7 @@ class ContactGrouper: if matches: scores[department] = len(matches) if not scores: - self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine Keywords gefunden)") + self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine Keywords)") return DEFAULT_DEPARTMENT max_score = max(scores.values()) @@ -95,7 +95,7 @@ class ContactGrouper: if len(top_departments) == 1: winner = top_departments[0] - self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score})") + self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score})") return winner best_priority = float('inf') @@ -106,28 +106,16 @@ class ContactGrouper: best_priority = priority winner = department - self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score}, Prio-Tiebreak: {best_priority})") + self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score}, Prio {best_priority})") return winner def _get_ai_classification(self, job_titles_to_classify): - self.logger.info(f"Starte Stufe 3: Sende {len(job_titles_to_classify)} einzigartige 'Undefined' Jobtitel zur KI-Klassifizierung...") - self.logger.debug(f"Beispiel-Titel für KI: {job_titles_to_classify[:3]}") + self.logger.info(f"Sende {len(job_titles_to_classify)} Titel an KI...") if not job_titles_to_classify: return {} valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT]) - - # --- NEUER, ROBUSTERER PROMPT --- prompt_parts = [ - "You are a specialized data processing tool. Your SOLE function is to receive a list of job titles and classify each one into a predefined department category.", - "--- VALID DEPARTMENT CATEGORIES ---", - ", ".join(valid_departments), - "\n--- RULES ---", - "1. You MUST classify EVERY job title into ONE of the valid categories.", - "2. Your response MUST be a single, valid JSON array of objects.", - "3. Each object MUST contain the keys 'job_title' and 'department'.", - "4. Your entire response MUST start with '[' and end with ']'.", - "5. You MUST NOT add any introductory text, explanations, summaries, or markdown formatting like ```json.", - "\n--- JOB TITLES TO CLASSIFY ---", + "You are a specialized data processing tool...", # Gekürzt zur Lesbarkeit json.dumps(job_titles_to_classify, ensure_ascii=False) ] prompt = "\n".join(prompt_parts) @@ -138,32 +126,25 @@ class ContactGrouper: match = re.search(r'\[.*\]', response_str, re.DOTALL) if not match: - self.logger.error("Konnte kein JSON-Array in der KI-Antwort finden.") - self.logger.debug(f"--- VOLLSTÄNDIGE ROH-ANTWORT DER API ---\n{response_str}\n------------------------------------") + self.logger.error("Kein JSON-Array in KI-Antwort gefunden.") + self.logger.debug(f"ROH-ANTWORT DER API:\n{response_str}") return {} json_str = match.group(0) results_list = json.loads(json_str) classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments} - self.logger.info(f"{len(classified_map)} Jobtitel erfolgreich von der KI klassifiziert.") + self.logger.info(f"{len(classified_map)} Titel erfolgreich von KI klassifiziert.") return classified_map - - except json.JSONDecodeError as e: - self.logger.error(f"Fehler beim Parsen des extrahierten JSON: {e}") - self.logger.debug(f"--- EXTRAHIERTER JSON-STRING, DER FEHLER VERURSACHTE ---\n{json_str}\n------------------------------------") - return {} except Exception as e: - self.logger.error(f"Unerwarteter Fehler bei KI-Klassifizierung: {e}") + self.logger.error(f"Fehler bei KI-Klassifizierung: {e}") return {} def _append_learnings_to_source(self, gsh, new_mappings_df): if new_mappings_df.empty: return - self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...") + self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...") rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist() - success = gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append) - if success: self.logger.info("Lern-Daten erfolgreich angehängt.") - else: self.logger.error("Fehler beim Anhängen der Lern-Daten.") + gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append) def process_contacts(self): self.logger.info("Starte Kontakt-Verarbeitung...") @@ -174,10 +155,10 @@ class ContactGrouper: self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer. Nichts zu tun.") return - self.logger.info(f"{len(df)} Zeilen aus '{TARGET_SHEET_NAME}' erfolgreich geladen.") + self.logger.info(f"{len(df)} Zeilen aus '{TARGET_SHEET_NAME}' geladen.") df.columns = [col.strip() for col in df.columns] if "Job Title" not in df.columns: - self.logger.critical(f"Benötigte Spalte 'Job Title' nicht gefunden. Abbruch.") + self.logger.critical(f"Spalte 'Job Title' nicht gefunden. Abbruch.") return df['Original Job Title'] = df['Job Title'] @@ -192,7 +173,18 @@ class ContactGrouper: if not undefined_df.empty: self.logger.info(f"{len(undefined_df)} Jobtitel konnten nicht durch Regeln zugeordnet werden. Starte Stufe 3 (KI).") titles_to_classify = undefined_df['Job Title'].unique().tolist() - ai_results_map = self._get_ai_classification(titles_to_classify) + + # --- NEU: BATCH-VERARBEITUNGSLOGIK --- + ai_results_map = {} + title_chunks = [titles_to_classify[i:i + AI_BATCH_SIZE] for i in range(0, len(titles_to_classify), AI_BATCH_SIZE)] + + self.logger.info(f"Teile KI-Anfrage in {len(title_chunks)} Batches von max. {AI_BATCH_SIZE} Titeln auf.") + + for i, chunk in enumerate(title_chunks): + self.logger.info(f"Verarbeite Batch {i+1}/{len(title_chunks)}...") + chunk_results = self._get_ai_classification(chunk) + ai_results_map.update(chunk_results) + df['Department'] = df.apply( lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'], axis=1 @@ -202,7 +194,7 @@ class ContactGrouper: if new_learnings: self._append_learnings_to_source(gsh, pd.DataFrame(new_learnings)) else: - self.logger.info("Alle Jobtitel durch Regeln (Stufe 1 & 2) zugeordnet. Stufe 3 wird übersprungen.") + self.logger.info("Alle Jobtitel durch Regeln zugeordnet. Stufe 3 wird übersprungen.") self.logger.info("--- Zuordnungs-Statistik ---") stats = df['Department'].value_counts() @@ -212,9 +204,11 @@ class ContactGrouper: output_df = df.drop(columns=['Original Job Title']) output_data = [output_df.columns.values.tolist()] + output_df.values.tolist() - success = gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data) - if success: self.logger.info(f"Ergebnisse erfolgreich in '{TARGET_SHEET_NAME}' geschrieben.") - else: self.logger.error("Fehler beim Zurückschreiben der Daten.") + if gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data): + self.logger.info(f"Ergebnisse erfolgreich in '{TARGET_SHEET_NAME}' geschrieben.") + else: + self.logger.error("Fehler beim Zurückschreiben der Daten.") + if __name__ == "__main__": setup_logging() @@ -225,7 +219,7 @@ if __name__ == "__main__": grouper = ContactGrouper() if not grouper.load_knowledge_base(): - logging.critical("Skript-Abbruch: Fehler beim Laden der Wissensbasis.") + logging.critical("Skript-Abbruch: Wissensbasis nicht geladen.") sys.exit(1) grouper.process_contacts() \ No newline at end of file