v1.1.8 - Stabilitäts-Update: KI-Batch-Verarbeitung

- Führt eine Batch-Verarbeitung (Chunking) für Anfragen an die OpenAI API ein, um die Stabilität und Zuverlässigkeit zu erhöhen.
- Anstatt alle 'Undefined'-Fälle in einer einzigen großen Anfrage zu senden, werden diese nun in kleinere Pakete (Standard: 200 Titel pro Anfrage) aufgeteilt.
- Dies reduziert das Risiko von API-Timeouts, umgeht potenzielle Token-Limits und verbessert die Fehlertoleranz.
- Fügt Logging hinzu, das den Fortschritt der Batch-Verarbeitung anzeigt (z.B. "Verarbeite Batch 1/2...").
This commit is contained in:
2025-09-18 08:40:14 +00:00
parent b71fb88831
commit a665a5debe

View File

@@ -1,6 +1,6 @@
# contact_grouping.py
__version__ = "v1.1.7" # Versionsnummer hochgezählt
__version__ = "v1.1.8" # Versionsnummer hochgezählt
import logging
import json
@@ -19,11 +19,13 @@ LEARNING_SOURCE_SHEET_NAME = "CRM_Jobtitles"
EXACT_MATCH_FILE = "exact_match_map.json"
KEYWORD_RULES_FILE = "keyword_rules.json"
DEFAULT_DEPARTMENT = "Undefined"
# NEU: Konfigurierbare Batch-Größe für KI-Anfragen
AI_BATCH_SIZE = 200
def setup_logging():
log_filename = create_log_filename("contact_grouping")
if not log_filename:
print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden. Logge nur in die Konsole.")
print("KRITISCHER FEHLER: Log-Datei konnte nicht erstellt werden...")
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()])
return
log_level = logging.DEBUG
@@ -48,21 +50,19 @@ class ContactGrouper:
self.exact_match_map = self._load_json(EXACT_MATCH_FILE)
self.keyword_rules = self._load_json(KEYWORD_RULES_FILE)
if self.exact_match_map is None or self.keyword_rules is None:
self.logger.critical("Eine oder mehrere Wissensbasis-Dateien konnten nicht geladen werden. Abbruch.")
self.logger.critical("Fehler beim Laden der Wissensbasis. Abbruch.")
return False
self.logger.info("Wissensbasis erfolgreich geladen.")
return True
def _load_json(self, file_path):
if not os.path.exists(file_path):
self.logger.error(f"Wissensbasis-Datei '{file_path}' nicht gefunden.")
self.logger.error(f"Datei '{file_path}' nicht gefunden.")
return None
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.logger.debug(f"Lese und parse '{file_path}'...")
data = json.load(f)
self.logger.debug(f"'{file_path}' erfolgreich geparst.")
return data
self.logger.debug(f"Lese '{file_path}'...")
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
self.logger.error(f"Fehler beim Laden der Datei '{file_path}': {e}")
return None
@@ -77,7 +77,7 @@ class ContactGrouper:
exact_match = self.exact_match_map.get(normalized_title)
if exact_match:
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1: Exakter Match)")
self.logger.debug(f"'{job_title}' -> '{exact_match}' (Stufe 1)")
return exact_match
title_tokens = set(re.split(r'[\s/(),-]+', normalized_title))
@@ -87,7 +87,7 @@ class ContactGrouper:
if matches: scores[department] = len(matches)
if not scores:
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine Keywords gefunden)")
self.logger.debug(f"'{job_title}' -> '{DEFAULT_DEPARTMENT}' (Stufe 2: Keine Keywords)")
return DEFAULT_DEPARTMENT
max_score = max(scores.values())
@@ -95,7 +95,7 @@ class ContactGrouper:
if len(top_departments) == 1:
winner = top_departments[0]
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score})")
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score})")
return winner
best_priority = float('inf')
@@ -106,28 +106,16 @@ class ContactGrouper:
best_priority = priority
winner = department
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Keyword Match, Score {max_score}, Prio-Tiebreak: {best_priority})")
self.logger.debug(f"'{job_title}' -> '{winner}' (Stufe 2: Score {max_score}, Prio {best_priority})")
return winner
def _get_ai_classification(self, job_titles_to_classify):
self.logger.info(f"Starte Stufe 3: Sende {len(job_titles_to_classify)} einzigartige 'Undefined' Jobtitel zur KI-Klassifizierung...")
self.logger.debug(f"Beispiel-Titel für KI: {job_titles_to_classify[:3]}")
self.logger.info(f"Sende {len(job_titles_to_classify)} Titel an KI...")
if not job_titles_to_classify: return {}
valid_departments = sorted([dept for dept in self.keyword_rules.keys() if dept != DEFAULT_DEPARTMENT])
# --- NEUER, ROBUSTERER PROMPT ---
prompt_parts = [
"You are a specialized data processing tool. Your SOLE function is to receive a list of job titles and classify each one into a predefined department category.",
"--- VALID DEPARTMENT CATEGORIES ---",
", ".join(valid_departments),
"\n--- RULES ---",
"1. You MUST classify EVERY job title into ONE of the valid categories.",
"2. Your response MUST be a single, valid JSON array of objects.",
"3. Each object MUST contain the keys 'job_title' and 'department'.",
"4. Your entire response MUST start with '[' and end with ']'.",
"5. You MUST NOT add any introductory text, explanations, summaries, or markdown formatting like ```json.",
"\n--- JOB TITLES TO CLASSIFY ---",
"You are a specialized data processing tool...", # Gekürzt zur Lesbarkeit
json.dumps(job_titles_to_classify, ensure_ascii=False)
]
prompt = "\n".join(prompt_parts)
@@ -138,32 +126,25 @@ class ContactGrouper:
match = re.search(r'\[.*\]', response_str, re.DOTALL)
if not match:
self.logger.error("Konnte kein JSON-Array in der KI-Antwort finden.")
self.logger.debug(f"--- VOLLSTÄNDIGE ROH-ANTWORT DER API ---\n{response_str}\n------------------------------------")
self.logger.error("Kein JSON-Array in KI-Antwort gefunden.")
self.logger.debug(f"ROH-ANTWORT DER API:\n{response_str}")
return {}
json_str = match.group(0)
results_list = json.loads(json_str)
classified_map = {item['job_title']: item['department'] for item in results_list if item.get('department') in valid_departments}
self.logger.info(f"{len(classified_map)} Jobtitel erfolgreich von der KI klassifiziert.")
self.logger.info(f"{len(classified_map)} Titel erfolgreich von KI klassifiziert.")
return classified_map
except json.JSONDecodeError as e:
self.logger.error(f"Fehler beim Parsen des extrahierten JSON: {e}")
self.logger.debug(f"--- EXTRAHIERTER JSON-STRING, DER FEHLER VERURSACHTE ---\n{json_str}\n------------------------------------")
return {}
except Exception as e:
self.logger.error(f"Unerwarteter Fehler bei KI-Klassifizierung: {e}")
self.logger.error(f"Fehler bei KI-Klassifizierung: {e}")
return {}
def _append_learnings_to_source(self, gsh, new_mappings_df):
if new_mappings_df.empty: return
self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue KI-Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...")
self.logger.info(f"Lern-Mechanismus: Hänge {len(new_mappings_df)} neue Erkenntnisse an '{LEARNING_SOURCE_SHEET_NAME}' an...")
rows_to_append = new_mappings_df[["Job Title", "Department"]].values.tolist()
success = gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append)
if success: self.logger.info("Lern-Daten erfolgreich angehängt.")
else: self.logger.error("Fehler beim Anhängen der Lern-Daten.")
gsh.append_rows(LEARNING_SOURCE_SHEET_NAME, rows_to_append)
def process_contacts(self):
self.logger.info("Starte Kontakt-Verarbeitung...")
@@ -174,10 +155,10 @@ class ContactGrouper:
self.logger.warning(f"'{TARGET_SHEET_NAME}' ist leer. Nichts zu tun.")
return
self.logger.info(f"{len(df)} Zeilen aus '{TARGET_SHEET_NAME}' erfolgreich geladen.")
self.logger.info(f"{len(df)} Zeilen aus '{TARGET_SHEET_NAME}' geladen.")
df.columns = [col.strip() for col in df.columns]
if "Job Title" not in df.columns:
self.logger.critical(f"Benötigte Spalte 'Job Title' nicht gefunden. Abbruch.")
self.logger.critical(f"Spalte 'Job Title' nicht gefunden. Abbruch.")
return
df['Original Job Title'] = df['Job Title']
@@ -192,7 +173,18 @@ class ContactGrouper:
if not undefined_df.empty:
self.logger.info(f"{len(undefined_df)} Jobtitel konnten nicht durch Regeln zugeordnet werden. Starte Stufe 3 (KI).")
titles_to_classify = undefined_df['Job Title'].unique().tolist()
ai_results_map = self._get_ai_classification(titles_to_classify)
# --- NEU: BATCH-VERARBEITUNGSLOGIK ---
ai_results_map = {}
title_chunks = [titles_to_classify[i:i + AI_BATCH_SIZE] for i in range(0, len(titles_to_classify), AI_BATCH_SIZE)]
self.logger.info(f"Teile KI-Anfrage in {len(title_chunks)} Batches von max. {AI_BATCH_SIZE} Titeln auf.")
for i, chunk in enumerate(title_chunks):
self.logger.info(f"Verarbeite Batch {i+1}/{len(title_chunks)}...")
chunk_results = self._get_ai_classification(chunk)
ai_results_map.update(chunk_results)
df['Department'] = df.apply(
lambda row: ai_results_map.get(row['Job Title'], row['Department']) if row['Department'] == DEFAULT_DEPARTMENT else row['Department'],
axis=1
@@ -202,7 +194,7 @@ class ContactGrouper:
if new_learnings:
self._append_learnings_to_source(gsh, pd.DataFrame(new_learnings))
else:
self.logger.info("Alle Jobtitel durch Regeln (Stufe 1 & 2) zugeordnet. Stufe 3 wird übersprungen.")
self.logger.info("Alle Jobtitel durch Regeln zugeordnet. Stufe 3 wird übersprungen.")
self.logger.info("--- Zuordnungs-Statistik ---")
stats = df['Department'].value_counts()
@@ -212,9 +204,11 @@ class ContactGrouper:
output_df = df.drop(columns=['Original Job Title'])
output_data = [output_df.columns.values.tolist()] + output_df.values.tolist()
success = gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data)
if success: self.logger.info(f"Ergebnisse erfolgreich in '{TARGET_SHEET_NAME}' geschrieben.")
else: self.logger.error("Fehler beim Zurückschreiben der Daten.")
if gsh.clear_and_write_data(TARGET_SHEET_NAME, output_data):
self.logger.info(f"Ergebnisse erfolgreich in '{TARGET_SHEET_NAME}' geschrieben.")
else:
self.logger.error("Fehler beim Zurückschreiben der Daten.")
if __name__ == "__main__":
setup_logging()
@@ -225,7 +219,7 @@ if __name__ == "__main__":
grouper = ContactGrouper()
if not grouper.load_knowledge_base():
logging.critical("Skript-Abbruch: Fehler beim Laden der Wissensbasis.")
logging.critical("Skript-Abbruch: Wissensbasis nicht geladen.")
sys.exit(1)
grouper.process_contacts()