Files
Brancheneinstufung2/helpers.py
2025-07-01 19:17:46 +00:00

1487 lines
87 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# --- START OF FILE helpers.py (Part 1/10) ---
#!/usr/bin/env python3
"""
helpers.py
Sammlung von globalen, wiederverwendbaren Hilfsfunktionen für das Projekt
"Automatisierte Unternehmensbewertung". Enthält Decorators, Text-Normalisierung,
API-Wrapper und andere Dienstprogramme.
"""
ALLOWED_TARGET_BRANCHES = []
# ==============================================================================
# 1. IMPORTS
# ==============================================================================
# Standardbibliotheken
import os
import time
import re
import csv
import json
import random
import logging
import traceback
import unicodedata
from datetime import datetime
from urllib.parse import urlparse, unquote
from difflib import SequenceMatcher
# Externe Bibliotheken
import gspread
import wikipedia
import requests
from bs4 import BeautifulSoup
import pandas as pd
import openai
from openai.error import AuthenticationError, OpenAIError, RateLimitError, APIError, Timeout, InvalidRequestError, ServiceUnavailableError
from config import (Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS, LOG_DIR)
# Optionale Bibliotheken
try:
import tiktoken
except ImportError:
tiktoken = None
logging.warning("tiktoken nicht gefunden. Token-Zaehlung wird geschaetzt.")
try:
import gender_guesser.detector as gender
# Initialisieren Sie den Detector einmal global
gender_detector = gender.Detector()
logging.info("gender_guesser.Detector initialisiert.")
except ImportError:
gender = None
gender_detector = None
logging.warning("gender_guesser Bibliothek nicht gefunden. Geschlechtserkennung deaktiviert.")
except Exception as e:
gender = None
gender_detector = None
logging.warning(f"Fehler bei Initialisierung von gender_guesser: {e}. Geschlechtserkennung deaktiviert.")
# Import der Config-Klasse und Konstanten
from config import Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS
# ==============================================================================
# 2. RETRY DECORATOR
# ==============================================================================
decorator_logger = logging.getLogger(__name__ + ".Retry")
def retry_on_failure(func):
"""
Decorator, der eine Funktion bei bestimmten Fehlern mehrmals wiederholt.
Implementiert exponentiellen Backoff mit Jitter.
"""
def wrapper(*args, **kwargs):
func_name = func.__name__
self_arg = args[0] if args and hasattr(args[0], func_name) and isinstance(args[0], object) else None
effective_func_name = f"{self_arg.__class__.__name__}.{func_name}" if self_arg else func_name
max_retries_config = getattr(Config, 'MAX_RETRIES', 3)
base_delay = getattr(Config, 'RETRY_DELAY', 5)
if max_retries_config <= 0:
try:
return func(*args, **kwargs)
except Exception as e:
decorator_logger.error(f"FEHLER bei '{effective_func_name}' (keine Retries konfiguriert). {type(e).__name__} - {str(e)[:150]}...")
if not isinstance(e, (requests.exceptions.RequestException, gspread.exceptions.APIError, OpenAIError, wikipedia.exceptions.WikipediaException)):
decorator_logger.exception("Details zum Fehler:")
raise e
for attempt in range(max_retries_config):
try:
if attempt > 0:
decorator_logger.warning(f"Wiederhole Versuch {attempt + 1}/{max_retries_config} fuer '{effective_func_name}'...")
return func(*args, **kwargs)
except (gspread.exceptions.SpreadsheetNotFound, AuthenticationError, ValueError) as e:
decorator_logger.critical(f"❌ ENDGUELTIGER FEHLER bei '{effective_func_name}': Permanentes Problem erkannt. {type(e).__name__} - {str(e)[:150]}...")
decorator_logger.exception("Details:")
raise e
except requests.exceptions.HTTPError as e:
if hasattr(e, 'response') and e.response is not None:
status_code = e.response.status_code
non_retryable_status_codes = [404, 400, 401, 403]
if status_code in non_retryable_status_codes:
decorator_logger.critical(f"❌ ENDGUELTIGER FEHLER bei '{effective_func_name}': HTTP Fehler {status_code} erhalten ({e.response.reason}). Nicht wiederholbar. {str(e)[:100]}...")
decorator_logger.exception("Details:")
raise e
except (requests.exceptions.RequestException, gspread.exceptions.APIError, OpenAIError, wikipedia.exceptions.WikipediaException) as e:
error_msg = str(e)
error_type = type(e).__name__
if attempt < max_retries_config - 1:
wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1)
if isinstance(e, RateLimitError):
decorator_logger.warning(f"🚦 RATE LIMIT ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...")
elif isinstance(e, Timeout) and isinstance(e, OpenAIError):
decorator_logger.warning(f"⏰ OPENAI TIMEOUT ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...")
elif isinstance(e, gspread.exceptions.APIError) and hasattr(e, 'response') and e.response is not None and e.response.status_code == 429:
decorator_logger.warning(f"🚦 GSPREAD RATE LIMIT ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...")
elif isinstance(e, requests.exceptions.Timeout):
decorator_logger.warning(f"⏰ REQUESTS TIMEOUT ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...")
elif isinstance(e, requests.exceptions.RequestException):
decorator_logger.warning(f"🌐 NETZWERKFEHLER ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...")
elif isinstance(e, OpenAIError):
decorator_logger.warning(f"🤖 OPENAI FEHLER ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...")
elif isinstance(e, wikipedia.exceptions.WikipediaException):
decorator_logger.warning(f"📚 WIKIPEDIA FEHLER ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...")
else:
decorator_logger.warning(f"♻️ WIEDERHOLBARER FEHLER ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...")
time.sleep(wait_time)
else:
decorator_logger.error(f"❌ ENDGUELTIGER FEHLER bei '{effective_func_name}' nach {max_retries_config} Versuchen.")
raise e
except Exception as e:
decorator_logger.critical(f"💥 UNERWARTETER FEHLER ({type(e).__name__}) bei '{effective_func_name}'. KEIN RETRY VERSUCHT.")
decorator_logger.exception("Details zum unerwarteten Fehler:")
raise e
raise RuntimeError(f"Retry decorator logic error: Loop completed unexpectedly for {effective_func_name}. This should not happen.")
return wrapper
# ==============================================================================
# 3. LOGGING & TOKEN COUNT HELPERS
# ==============================================================================
def token_count(text, model=None):
"""Zaehlt Tokens via tiktoken oder schaetzt ueber Leerzeichen."""
logger = logging.getLogger(__name__)
if not text or not isinstance(text, str): return 0
current_model = model if model else getattr(Config, 'TOKEN_MODEL', 'gpt-3.5-turbo')
if tiktoken:
try:
if not hasattr(token_count, 'enc_cache'): token_count.enc_cache = {}
if current_model not in token_count.enc_cache: token_count.enc_cache[current_model] = tiktoken.encoding_for_model(current_model)
enc = token_count.enc_cache[current_model]
return len(enc.encode(text))
except Exception as e:
logger.debug(f"Fehler beim Token-Counting mit tiktoken fuer Modell '{current_model}': {e} - Fallback zur Schaetzung.")
return len(str(text).split())
else:
return len(str(text).split())
def create_log_filename(mode):
"""Erstellt einen zeitgestempelten Logdateinamen im LOG_DIR."""
logger = logging.getLogger(__name__)
log_dir_path = LOG_DIR
if not os.path.exists(log_dir_path):
try:
os.makedirs(log_dir_path, exist_ok=True)
logger.info(f"Log-Verzeichnis '{log_dir_path}' erstellt.")
except Exception as e:
logger.error(f"FEHLER: Konnte Log-Verzeichnis '{log_dir_path}' nicht erstellen: {e}")
log_dir_path = "."
logger.warning(f"Versuche, Logdatei im aktuellen Verzeichnis '{log_dir_path}' zu erstellen.")
try:
now = datetime.now().strftime("%Y-%m-%d_%H-%M") # YYYY-MM-DD für bessere Sortierung
ver_short = getattr(Config, 'VERSION', 'unknown').replace(".", "")
filename = f"{now}_{ver_short}_Modus-{mode}.txt" # .log als Dateiendung
return os.path.join(log_dir_path, filename)
except Exception as e_fallback:
logger.error(f"FEHLER: Konnte Logdateinamen auch im Fallback-Verzeichnis '{log_dir_path}' nicht erstellen: {e_fallback}")
return None
# ==============================================================================
# 4. TEXT, STRING & URL UTILITIES
# ==============================================================================
def simple_normalize_url(url):
"""Normalisiert URL zu domain.tld oder k.A. (ohne www, ohne Pfad)."""
logger = logging.getLogger(__name__)
if not url or not isinstance(url, str): return "k.A."
url = url.strip()
if not url or url.lower() == 'k.a.': return "k.A."
if not url.lower().startswith(("http://", "https://")):
url = "https://" + url
try:
parsed = urlparse(url)
domain_part = parsed.netloc
if not domain_part:
logger.debug(f"URL '{url[:100]}...' konnte nicht sinnvoll geparst werden (leerer netloc).")
return "k.A."
domain_part = domain_part.split(":", 1)[0]
if '@' in domain_part:
domain_part = domain_part.split('@', 1)[1]
try:
domain_part = domain_part.encode('ascii').decode('idna')
except UnicodeDecodeError:
pass
domain_part = domain_part.lower()
if domain_part.startswith("www."):
domain_part = domain_part[4:]
if domain_part and '.' in domain_part:
parts = domain_part.split('.')
if len(parts) > 1 and parts[-1].isalpha() and len(parts[-1]) >= 2:
return domain_part
else:
logger.debug(f"URL '{url[:100]}...' normalisiert zu '{domain_part}', aber TLD-Pruefung schlug fehl.")
return "k.A."
else:
logger.debug(f"URL '{url[:100]}...' normalisiert zu '{domain_part}', enthaelt keinen Punkt oder ist leer.")
return "k.A."
except Exception as e:
logger.error(f"Fehler bei URL-Normalisierung fuer '{url[:100]}...': {e}")
return "k.A. (Fehler Normalisierung)"
def normalize_string(s):
"""Normalisiert Umlaute und Sonderzeichen nach einer definierten Liste."""
logger = logging.getLogger(__name__)
if not s or not isinstance(s, str): return ""
replacements = { 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue', 'ß': 'ss', 'ä': 'ae', 'ö': 'oe', 'ü': 'ue',
'À': 'A', 'Á': 'A', 'Â': 'A', 'Ã': 'A', 'Å': 'A', 'Æ': 'AE', 'à': 'a', 'á': 'a', 'â': 'a', 'ã': 'a', 'å': 'a', 'æ': 'ae',
'Ç': 'C', 'ç': 'c', 'È': 'E', 'É': 'E', 'Ê': 'E', 'Ë': 'E', 'è': 'e', 'é': 'e', 'ê': 'e', 'ë': 'e',
'Ì': 'I', 'Í': 'I', 'Î': 'I', 'Ï': 'I', 'ì': 'i', 'í': 'i', 'î': 'i', 'ï': 'i', 'Ñ': 'N', 'ñ': 'n',
'Ò': 'O', 'Ó': 'O', 'Ô': 'O', 'Õ': 'O', 'Ø': 'O', 'ò': 'o', 'ó': 'o', 'ô': 'o', 'õ': 'o', 'ø': 'o', 'Œ': 'OE', 'œ': 'oe',
'Š': 'S', 'š': 's', 'Ž': 'Z', 'ž': 'z', 'Ý': 'Y', 'ý': 'y', 'ÿ': 'y', 'Đ': 'D', 'đ': 'd',
'č': 'c', 'Č': 'C', 'ć': 'c', 'Ć': 'C', 'ł': 'l', 'Ł': 'L', 'ğ': 'g', 'Ğ': 'G', 'ş': 's', 'Ş': 'S',
'ă': 'a', 'Ă': 'A', 'ı': 'i', 'İ': 'I', 'ň': 'n', 'Ň': 'N', 'ř': 'r', 'Ř': 'R', 'ő': 'o', 'Ő': 'O', 'ű': 'u', 'Ű': 'U', 'ț': 't', 'Ț': 'T', 'ș': 's', 'Ș': 'S'
}
try:
s = unicodedata.normalize('NFKD', s).encode('ascii', 'ignore').decode('ascii')
except Exception as e:
logger.debug(f"Fehler bei unicodedata Normalisierung fuer '{str(s)[:50]}...': {e}")
pass
for src, target in replacements.items():
s = s.replace(src, target)
return s
def clean_text(text):
"""
Bereinigt Text (Unicode, Referenzen, Whitespace, etc.) von Wikipedia, Websites etc.
Entfernt gaengige unerwuenschte Muster wie [1], [Bearbeiten].
"""
logger = logging.getLogger(__name__)
if text is None: return "k.A."
try:
text = str(text)
if not text.strip(): return "k.A."
text = unicodedata.normalize("NFC", text)
text = re.sub(r'\[\d+\]', '', text)
text = re.sub(r'\[\s*Bearbeiten\s*\|\s*Quelltext bearbeiten\s*\]', '', text, flags=re.IGNORECASE)
text = re.sub(r'\[koordinaten\]', '', text, flags=re.IGNORECASE)
text = re.sub(r'\s+', ' ', text).strip()
return text if text else "k.A."
except Exception as e:
logger.error(f"Fehler bei clean_text fuer Input '{str(text)[:50]}...': {e}")
return "k.A. (Fehler Bereinigung)"
def normalize_company_name(name):
"""
Entfernt gaengige Rechtsformzusaetze etc. fuer Vergleiche.
Nutzt clean_text und normalize_string.
"""
if not name: return ""
name = clean_text(name)
name = normalize_string(name)
forms = [
r'gmbh', r'ges\.?\s*m\.?\s*b\.?\s*h\.?', r'gesellschaft mit beschraenkter haftung',
r'ug', r'u\.g\.', r'unternehmergesellschaft', r'haftungsbeschraenkt',
r'ag', r'a\.g\.', r'aktiengesellschaft',
r'ohg', r'o\.h\.g\.', r'offene handelsgesellschaft',
r'kg', r'k\.g\.', r'kommanditgesellschaft',
r'gmbh\s*&\s*co\.?\s*kg', r'ges\.?\s*m\.?\s*b\.?\s*h\.?\s*&\s*co\.?\s*k\.g\.?',
r'ag\s*&\s*co\.?\s*kg', r'a\.g\.?\s*&\s*co\.?\s*k\.g\.?',
r'e\.k\.?', r'e\.kfm\.?', r'e\.kfr\.?', r'eingetragene[rn]? kauffrau', r'eingetragene[rn]? kaufmann',
r'ltd\.?', r'limited',
r'ltd\s*&\s*co\.?\s*kg',
r's\.?a\.?r\.?l\.?', r'sarl', r'sagl',
r's\.?a\.?', r'societe anonyme', r'sociedad anonima',
r's\.?p\.?a\.?', r'societa per azioni',
r'b\.?v\.?', r'besloten vennootschap',
r'n\.?v\.?', r'naamloze vennootschap',
r'plc\.?', r'public limited company',
r'inc\.?', r'incorporated',
r'corp\.?', r'corporation',
r'llc\.?', r'limited liability company',
r'kgaa', r'kommanditgesellschaft auf aktien',
r'se', r'societas europaea',
r'e\.?g\.?', r'eingetragene genossenschaft', r'genossenschaft', r'genmbh',
r'e\.?v\.?', r'eingetragener verein', r'verein',
r'stiftung', r'ggmbh', r'gemeinnuetzige gmbh', r'gemeinnuetzige[rn]? gmbh', r'gug',
r'partg\.?', r'partnerschaftsgesellschaft', r'partgmbb',
r'og', r'o\.g\.', r'offene gesellschaft',
r'e\.u\.', r'eingetragenes unternehmen',
r'ges\.?n\.?b\.?r\.?', r'gesellschaft nach buergerlichem recht',
r'kollektivgesellschaft', r'einzelfirma',
r'gruppe', r'holding', r'international', r'systeme', r'technik', r'logistik',
r'solutions', r'services', r'management', r'consulting', r'produktion',
r'vertrieb', r'entwicklung', r'maschinenbau', r'anlagenbau',
r'engineering', r'technologie'
]
forms_escaped = [re.escape(form) for form in forms]
pattern = r'\b(?:' + '|'.join(forms_escaped) + r')\b'
normalized = re.sub(pattern, '', name, flags=re.IGNORECASE)
normalized = re.sub(r'[.,;:]', '', normalized)
normalized = re.sub(r'[\-/]', ' ', normalized)
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized.lower()
def fuzzy_similarity(str1, str2):
"""Berechnet Aehnlichkeit zwischen 0 und 1 (case-insensitive)."""
if not str1 or not str2: return 0.0
return SequenceMatcher(None, str(str1).lower(), str(str2).lower()).ratio()
def extract_numeric_value(raw_value, is_umsatz=False):
"""
Extrahiert und normalisiert einen numerischen Wert aus einem String.
Gibt 'k.A.' bei Fehlern oder nicht gefundenen Werten zurück.
Umsatz wird in Millionen zurückgegeben, Mitarbeiter als absolute Zahl.
"""
logger = logging.getLogger(__name__ + ".extract_numeric_value")
if raw_value is None or pd.isna(raw_value):
return "k.A."
raw_value_str_original_for_debug = str(raw_value)
text_to_parse = str(raw_value).strip()
if not text_to_parse or text_to_parse.lower() in ['k.a.', 'n/a', '-']:
return "k.A."
text_to_parse = text_to_parse.replace("", "'").replace("", "'")
test_val_for_zero = text_to_parse.replace(',', '.').replace(' ', '')
if test_val_for_zero in ['0', '0.0', '0.00', '0.000']:
logger.debug(f"Input '{raw_value_str_original_for_debug}' direkt als '0' interpretiert.")
return "0"
try:
text_processed = text_to_parse
prefixes_to_remove = [
r'ca\.?\s*', r'circa\s*', r'rund\s*', r'etwa\s*', r'über\s*', r'unter\s*',
r'mehr als\s*', r'weniger als\s*', r'bis zu\s*', r'about\s*', r'over\s*',
r'approx\.?\s*', r'around\s*', r'up to\s*', r'~\s*', r'rd\.?\s*'
]
for prefix_pattern in prefixes_to_remove:
text_processed = re.sub(f'(?i)^{prefix_pattern}', '', text_processed).strip()
currency_patterns = [
r'(?:US|USD)\$\s*', r'US\$\s*', r'EUR\s*€?\s*', r'\s*', r'CHF\s*', r'GBP\s*£?\s*', r'£\s*',
r'JPY\s*¥?\s*', r'¥\s*', r'\s*', r'[Cc][Hh][Ff]\s*'
]
for curr_pattern in currency_patterns:
text_processed = re.sub(curr_pattern, '', text_processed, flags=re.IGNORECASE).strip()
text_processed = re.sub(r'\(.*?\)|\[.*?\]', '', text_processed).strip()
text_processed = re.split(r'\s*(-||bis)\s*', text_processed, 1)[0].strip()
if not text_processed:
logger.debug(f"Text nach erweiterter Vorreinigung leer für '{raw_value_str_original_for_debug}'")
return "k.A."
text_cleaned_for_units = clean_text(text_processed).lower()
num_match = re.search(r'([\d.,\'\s]+)', text_processed)
num_str_candidate = ""
unit_part_str = ""
if num_match:
num_str_candidate = num_match.group(1).strip()
potential_unit_start_index = num_match.end()
unit_part_str = text_processed[potential_unit_start_index:].strip()
else:
logger.debug(f"Kein Zahlen-Match in '{text_processed}' (Original: '{raw_value_str_original_for_debug}')")
return "k.A."
if not num_str_candidate:
logger.debug(f"Zahlenkandidat war leer oder nur Whitespace für '{raw_value_str_original_for_debug}' nach match in '{text_processed}'")
return "k.A."
cleaned_num_str = num_str_candidate.replace("'", "").replace(" ", "")
if not cleaned_num_str:
logger.debug(f"Zahlenkandidat '{num_str_candidate}' wurde zu leerem String nach Entfernung von ' und Leerraum.")
return "k.A."
has_dot = '.' in cleaned_num_str
has_comma = ',' in cleaned_num_str
if has_dot and has_comma:
last_dot_pos = cleaned_num_str.rfind('.')
last_comma_pos = cleaned_num_str.rfind(',')
if last_dot_pos > last_comma_pos:
cleaned_num_str = cleaned_num_str.replace(',', '')
else:
cleaned_num_str = cleaned_num_str.replace('.', '')
cleaned_num_str = cleaned_num_str.replace(',', '.')
elif has_comma:
if is_umsatz:
if re.search(r',\d{1,2}$', cleaned_num_str):
parts = cleaned_num_str.rsplit(',', 1)
integer_part = parts[0].replace(',', '')
cleaned_num_str = f"{integer_part}.{parts[1]}"
else:
cleaned_num_str = cleaned_num_str.replace(',', '')
else:
cleaned_num_str = cleaned_num_str.replace(',', '')
elif has_dot:
if re.search(r'\.\d{1,2}$', cleaned_num_str):
parts = cleaned_num_str.rsplit('.', 1)
integer_part = parts[0].replace('.', '')
cleaned_num_str = f"{integer_part}.{parts[1]}"
else:
cleaned_num_str = cleaned_num_str.replace('.', '')
if not re.fullmatch(r'-?\d+(\.\d+)?', cleaned_num_str):
logger.debug(f"Kein gültiger numerischer String nach Trennzeichenbehandlung: '{cleaned_num_str}' (Num-Kandidat: '{num_str_candidate}', Original: '{raw_value_str_original_for_debug}')")
return "k.A."
num_as_float = float(cleaned_num_str)
scaled_num = num_as_float
string_for_unit_search = unit_part_str.lower() if unit_part_str else text_cleaned_for_units
logger.debug(f"String für Einheitensuche: '{string_for_unit_search}' (num_as_float: {num_as_float})")
if is_umsatz:
multiplikator = 1.0
einheit_gefunden = False
if re.search(r'\b(mrd\.?|milliarden|billion|mia\.?)\b', string_for_unit_search):
multiplikator = 1000.0
einheit_gefunden = True
elif re.search(r'\bcrore\b', string_for_unit_search):
multiplikator = 10.0
einheit_gefunden = True
elif re.search(r'\b(mio\.?|mill\.?|millionen|mn)\b', string_for_unit_search):
multiplikator = 1.0
einheit_gefunden = True
elif re.search(r'\b(tsd\.?|tausend|k\b(?!\w))\b', string_for_unit_search):
multiplikator = 0.001
einheit_gefunden = True
logger.debug(f"Umsatz: num_as_float={num_as_float}, gefundene Einheit? {einheit_gefunden}, Multiplikator={multiplikator}")
scaled_num = num_as_float * multiplikator
else:
multiplikator = 1.0
if re.search(r'\b(mrd\.?|milliarden|billion|mia\.?)\b', string_for_unit_search):
multiplikator = 1000000000.0
elif re.search(r'\b(mio\.?|mill\.?|millionen|mn)\b', string_for_unit_search):
multiplikator = 1000000.0
elif re.search(r'\b(tsd\.?|tausend|k\b(?!\w))\b', string_for_unit_search):
multiplikator = 1000.0
logger.debug(f"Mitarbeiter: num_as_float={num_as_float}, Multiplikator={multiplikator}")
scaled_num = num_as_float * multiplikator
if pd.isna(scaled_num):
return "k.A."
if scaled_num >= 0:
return str(int(round(scaled_num)))
else:
logger.debug(f"Negative Zahl nach Skalierung: {scaled_num} für Input '{raw_value_str_original_for_debug}'")
return "k.A."
except ValueError as e:
logger.debug(f"ValueError bei Konvertierung zu float: '{e}' (cleaned_num_str: '{cleaned_num_str if 'cleaned_num_str' in locals() else 'N/A'}', Original: '{raw_value_str_original_for_debug[:30]}...')")
return "k.A."
except Exception as e_general:
logger.error(f"Unerwarteter Fehler in extract_numeric_value für '{raw_value_str_original_for_debug[:50]}...': {e_general}")
logger.debug(traceback.format_exc())
return "k.A."
def get_numeric_filter_value(value_str, is_umsatz=False):
"""
Extrahiert und normalisiert Zahlenwerte aus Strings für Vergleichslogik.
Gibt 0.0 für Umsatz oder 0 für Mitarbeiter zurück, wenn kein Wert gefunden wird.
"""
logger = logging.getLogger(__name__ + ".get_numeric_filter_value")
if value_str is None or pd.isna(value_str) or str(value_str).strip() == '':
return 0.0 if is_umsatz else 0
raw_value_str_original = str(value_str).strip()
if raw_value_str_original.lower() in ['k.a.', 'n/a', '-', '0', '0.0', '0,0', '0.00', '0,000', '0.000']:
return 0.0 if is_umsatz else 0
try:
processed_value = clean_text(raw_value_str_original)
if processed_value.lower() in ['k.a.', 'n/a', '-']:
return 0.0 if is_umsatz else 0
processed_value = re.sub(r'(?i)^\s*(ca\.?|circa|rund|etwa|ueber|unter|mehr als|weniger als|bis zu)\s+', '', processed_value)
processed_value = re.sub(r'[€$£¥]', '', processed_value).strip()
processed_value = re.split(r'\s*(-||bis)\s*', processed_value, 1)[0].strip()
num_extraction_str = re.sub(r'\(.*?\)', '', processed_value).strip()
num_extraction_str = num_extraction_str.replace("'", "")
num_extraction_str = re.sub(r'(?<=\d)\s+(?=\d)', '', num_extraction_str)
if not num_extraction_str: return 0.0 if is_umsatz else 0
has_dot = '.' in num_extraction_str
has_comma = ',' in num_extraction_str
if has_dot and has_comma:
if num_extraction_str.rfind('.') > num_extraction_str.rfind(','):
num_extraction_str = num_extraction_str.replace(',', '')
else:
num_extraction_str = num_extraction_str.replace('.', '')
num_extraction_str = num_extraction_str.replace(',', '.')
elif has_comma:
if num_extraction_str.count(',') == 1 and re.search(r',\d{1,2}$', num_extraction_str) and not re.search(r',\d{3}(,|\s|\Z)', num_extraction_str):
num_extraction_str = num_extraction_str.replace(',', '.')
else:
num_extraction_str = num_extraction_str.replace(',', '')
elif has_dot:
if num_extraction_str.count('.') == 1 and re.search(r'\.\d{1,2}$', num_extraction_str) and not re.search(r'\.\d{3}(?!\d)', num_extraction_str):
pass
else:
num_extraction_str = num_extraction_str.replace('.', '')
if not re.fullmatch(r'-?\d+(\.\d+)?', num_extraction_str):
logger.debug(f"Kein gültiger numerischer String nach Trennzeichenbehandlung: '{num_extraction_str}' (Original: '{raw_value_str_original}')")
return 0.0 if is_umsatz else 0
num_as_float = float(num_extraction_str)
scaled_num = num_as_float
original_lower = raw_value_str_original.lower()
if is_umsatz:
if re.search(r'\bmrd\s*\b|\bmilliarden\s*\b|\bbillion\s*\b', original_lower):
scaled_num = num_as_float * 1000.0
elif re.search(r'\btsd\s*\b|\btausend\s*\b', original_lower):
scaled_num = num_as_float / 1000.0
else:
if re.search(r'\bmrd\s*\b|\bmilliarden\s*\b|\bbillion\s*\b', original_lower): scaled_num = num_as_float * 1000000000.0
elif re.search(r'\bmio\s*\b|\bmillionen\s*\b|\bmill[.]?\s*\b', original_lower): scaled_num = num_as_float * 1000000.0
elif re.search(r'\btsd\s*\b|\btausend\s*\b', original_lower): scaled_num = num_as_float * 1000.0
return scaled_num if scaled_num > 0 else (0.0 if is_umsatz else 0)
except ValueError as e:
logger.debug(f"ValueError '{e}' bei Konvertierung (get_numeric_filter_value) von '{num_extraction_str if 'num_extraction_str' in locals() and isinstance(num_extraction_str, str) else raw_value_str_original[:30]}...'")
return 0.0 if is_umsatz else 0
except Exception as e_general:
logger.error(f"Unerwarteter Fehler in get_numeric_filter_value für '{raw_value_str_original[:50]}...': {e_general}")
logger.debug(traceback.format_exc())
return 0.0 if is_umsatz else 0
@retry_on_failure
def _call_genderize_api(name, api_key):
"""
Interne Hilfsfunktion, um die Genderize API mit Retry-Logik aufzurufen.
Wird von get_gender aufgerufen.
"""
params = {"name": name, "apikey": api_key, "country_id": "DE"}
response = requests.get("https://api.genderize.io", params=params, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15))
response.raise_for_status()
data = response.json()
return data
def get_gender(firstname):
"""Ermittelt Geschlecht via gender-guesser und Fallback Genderize API."""
logger = logging.getLogger(__name__)
if not firstname or not isinstance(firstname, str): return "unknown"
firstname_clean = str(firstname).strip().split(" ")[0]
if not firstname_clean: return "unknown"
# 1. Versuch: gender-guesser (lokal)
result_gg = "unknown"
if gender_detector:
try:
result_gg = gender_detector.get_gender(firstname_clean)
except Exception as e_gg:
logger.warning(f"Fehler bei gender-guesser fuer '{firstname_clean}': {e_gg}")
result_gg = "unknown"
# 2. Fallback: Genderize API (nur wenn gender-guesser unsicher ist)
if result_gg in ["andy", "unknown", "mostly_male", "mostly_female"]:
genderize_key = Config.API_KEYS.get('genderize')
if not genderize_key:
return result_gg if result_gg.startswith("mostly_") else "unknown"
try:
genderize_data = _call_genderize_api(firstname_clean, genderize_key)
api_gender = genderize_data.get("gender")
probability = genderize_data.get("probability", 0)
count = genderize_data.get("count", 0)
if api_gender and probability is not None and probability > 0.7 and count is not None and count > 0:
logger.debug(f" -> Uebernehme Genderize Ergebnis '{api_gender}' (Prob: {probability}, Count: {count}) fuer '{firstname_clean}'")
return api_gender
else:
return result_gg if result_gg.startswith("mostly_") else "unknown"
except Exception as e:
logger.error(f"FEHLER bei der Genderize API-Anfrage fuer '{firstname_clean}' nach Retries: {e}")
return result_gg if result_gg.startswith("mostly_") else "unknown"
else:
return result_gg
def get_email_address(firstname, lastname, website):
"""
Generiert eine moegliche E-Mail-Adresse im Format vorname.nachname@domain.tld.
Normalisiert Namen und extrahiert die Domain aus der Website-URL.
"""
logger = logging.getLogger(__name__)
if not all([firstname, lastname, website]) or not all(isinstance(x, str) and x.strip() for x in [firstname, lastname, website]):
logger.debug("get_email_address skipped: Fehlende oder ungueltige Eingabe (Name, Website).")
return ""
domain = simple_normalize_url(website)
if domain == "k.A." or '.' not in domain:
logger.debug(f"get_email_address skipped: Ungueltige Domain extrahiert aus '{website}'.")
return ""
normalized_first = normalize_string(firstname).lower()
normalized_last = normalize_string(lastname).lower()
normalized_first = re.sub(r'\s+', '-', normalized_first)
normalized_last = re.sub(r'\s+', '-', normalized_last)
normalized_first = re.sub(r'[^\w\-]+', '', normalized_first)
normalized_last = re.sub(r'[^\w\-]+', '', normalized_last)
normalized_first = normalized_first.strip('-')
normalized_last = normalized_last.strip('-')
if normalized_first and normalized_last and domain:
email_address = f"{normalized_first}.{normalized_last}@{domain}"
return email_address
else:
logger.debug("get_email_address skipped: Vorname oder Nachname leer nach Bereinigung.")
return ""
# ==============================================================================
# 7. SCHEMA LOADING UTILITY
# ==============================================================================
def initialize_target_schema():
"""
Initialisiert das Ziel-Branchenschema aus der Config und gibt es als Dictionary zurück.
Setzt KEINE globalen Variablen mehr.
"""
logger = logging.getLogger(__name__)
from config import Config
logger.info("Lade Ziel-Schema aus Config.BRANCH_GROUP_MAPPING...")
if not hasattr(Config, 'BRANCH_GROUP_MAPPING') or not isinstance(Config.BRANCH_GROUP_MAPPING, dict):
logger.critical("FEHLER: Config.BRANCH_GROUP_MAPPING ist nicht vorhanden oder kein Dictionary.")
return None
allowed_branches = sorted(list(Config.BRANCH_GROUP_MAPPING.keys()), key=str.lower)
if not allowed_branches:
logger.error("Keine Branchen im BRANCH_GROUP_MAPPING in der Config gefunden.")
return None
# Baue den Prompt-String zusammen
schema_lines = ["Ziel-Branchenschema: Folgende Branchenbereiche sind gueltig (Kurzformen):"]
schema_lines.extend(f"- {branch}" for branch in allowed_branches)
schema_lines.append("\nBitte ordne das Unternehmen ausschliesslich in einen dieser Bereiche ein.")
schema_lines.append("Antworte ausschliesslich im folgenden Format (keine Einleitung, kein Schlusssatz):")
schema_lines.append("Branche: <Exakter Kurzname der Branche aus der Liste>")
schema_lines.append("Konfidenz: <Hoch, Mittel oder Niedrig>")
schema_lines.append("Begruendung: <Sehr kurze Begruendung fuer deinen Branchenvorschlag>")
schema_string = "\n".join(schema_lines)
logger.info(f"Ziel-Schema initialisiert: {len(allowed_branches)} eindeutige Zielbranchen gefunden.")
# Gib ein sauberes Dictionary zurück
return {
"allowed_branches": allowed_branches,
"schema_prompt_string": schema_string
}
# ==============================================================================
# 8. OPENAI API WRAPPERS (CHAT & SUMMARY)
# ==============================================================================
@retry_on_failure
def call_openai_chat(prompt, temperature=0.3, model=None):
"""
Zentrale Funktion fuer OpenAI Chat API Aufrufe.
Wird von anderen globalen Helfern oder DataProcessor Methoden aufgerufen.
Args:
prompt (str): Der Prompt-Text an die API.
temperature (float, optional): Die Temperatur fuer die Textgenerierung. Defaults to 0.3.
model (str, optional): Das zu verwendende OpenAI Modell. Defaults to Config.TOKEN_MODEL.
Returns:
str: Der bereinigte Antwortstring von der API.
Wirft Exception bei API-Fehlern nach Retries.
"""
logger = logging.getLogger(__name__)
if not Config.API_KEYS.get('openai'):
logger.error("Fehler: OpenAI API Key nicht konfiguriert.")
raise openai.error.AuthenticationError("OpenAI API Key nicht konfiguriert.")
if not prompt or not isinstance(prompt, str) or not prompt.strip():
logger.error("Fehler: Leerer Prompt fuer OpenAI.")
raise ValueError("Leerer Prompt fuer OpenAI.")
current_model = model if model else getattr(Config, 'TOKEN_MODEL', 'gpt-3.5-turbo')
try:
# Optional: Token-Zählung für Debugging
# prompt_tokens = token_count(prompt, model=current_model)
# logger.debug(f"Sende Prompt an OpenAI ({current_model}, geschaetzt {prompt_tokens} Tokens)...")
response = openai.ChatCompletion.create(
model=current_model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature
)
if not response or not hasattr(response, 'choices') or not response.choices:
logger.error(f"OpenAI Call erfolgreich, aber keine Choices in der Antwort erhalten. Response: {str(response)[:200]}...")
raise openai.error.APIError("Keine Choices in OpenAI Antwort erhalten.")
result = response.choices[0].message.content.strip() if hasattr(response.choices[0], 'message') and hasattr(response.choices[0].message, 'content') else ""
if not result:
logger.warning(f"OpenAI Call erfolgreich, erhielt aber leeren Inhalt in der Antwort. Prompt Anfang: {prompt[:100]}...")
return ""
return result
except Exception as e:
# Wird vom @retry_on_failure Decorator gefangen und behandelt.
# Wir heben die Exception erneut auf, damit der Decorator sie sehen kann.
raise e
def summarize_website_content(raw_text):
"""
Erstellt eine Zusammenfassung eines Website-Rohtextes ueber OpenAI.
Args:
raw_text (str): Der rohe Textinhalt der Website.
Returns:
str: Die generierte Zusammenfassung oder ein Fehlerwert ("k.A.", etc.).
"""
logger = logging.getLogger(__name__)
if not raw_text or str(raw_text).strip() == "" or str(raw_text).strip().lower() in ["k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"]:
logger.debug("summarize_website_content skipped: No valid raw text provided.")
return "k.A."
max_raw_length = 3000
if len(str(raw_text)) > max_raw_length:
logger.debug(f"Kuerze Rohtext fuer Zusammenfassung von {len(str(raw_text))} auf {max_raw_length} Zeichen.")
raw_text = str(raw_text)[:max_raw_length]
prompt = (
"Du bist ein KI-Assistent, der Webinhalte analysiert.\n"
"Fasse den folgenden Text einer Unternehmenswebsite praegnant zusammen. "
"Konzentriere dich dabei auf:\n"
"- Haupttaetigkeitsfeld des Unternehmens\n"
"- Wichtigste Produkte und/oder Dienstleistungen\n"
"- Zielgruppe (falls erkennbar)\n\n"
f"Website-Text:\n```\n{raw_text}\n```\n\n"
"Zusammenfassung (max. 100 Woerter):"
)
try:
summary = call_openai_chat(prompt, temperature=0.2)
return summary if summary and summary.strip() else "k.A. (Keine Zusammenfassung erhalten)"
except Exception as e:
logger.error(f"FEHLER bei Website Zusammenfassung nach Retries: {e}")
return f"k.A. (Fehler Zusammenfassung: {str(e)[:50]}...)"
@retry_on_failure
def summarize_batch_openai(tasks_data):
"""
Fasst eine Liste von Rohtexten in einem einzigen OpenAI API Call zusammen.
Args:
tasks_data (list): Eine Liste von Dictionaries, jedes enthaelt:
{'row_num': int, 'raw_text': str}
Returns:
dict: Ein Dictionary, das Zeilennummern auf ihre Zusammenfassungen mappt.
"""
logger = logging.getLogger(__name__)
if not tasks_data: return {}
valid_tasks = [t for t in tasks_data if t.get("raw_text") and str(t["raw_text"]).strip().lower() not in ["k.a.", "k.a. (nur cookie-banner erkannt)", "k.a. (fehler)"]]
if not valid_tasks:
logger.debug("Keine gueltigen Rohtexte fuer Batch-Zusammenfassung gefunden.")
return {t['row_num']: "k.A. (Kein gueltiger Rohtext im Batch)" for t in tasks_data}
logger.debug(f"Starte Batch-Zusammenfassung fuer {len(valid_tasks)} gueltige Texte (Zeilen: {[t['row_num'] for t in valid_tasks]})...")
prompt_parts = [
"Du bist ein KI-Assistent, der Webinhalte analysiert.",
"Fasse fuer JEDEN der folgenden Texte einer Unternehmenswebsite praegnant zusammen. ",
"Konzentriere dich dabei auf:\n"
"- Haupttaetigkeitsfeld des Unternehmens\n"
"- Wichtigste Produkte und/oder Dienstleistungen\n"
"- Zielgruppe (falls erkennbar)\n\n"
"Gib das Ergebnis fuer JEDEN Text im folgenden Format aus, auf einer neuen Zeile:\n"
"RESULTAT <Zeilennummer>: <Zusammenfassung fuer diese Zeilennummer>\n\n",
"Halte jede Zusammenfassung kurz, max. 100 Woerter.\n\n",
"--- Texte zur Zusammenfassung ---"
]
text_block = ""
row_numbers_in_batch = []
max_chars_per_single_text_in_batch = 1500
for task in valid_tasks:
row_num = task['row_num']
raw_text = str(task['raw_text'])
raw_text_short = raw_text[:max_chars_per_single_text_in_batch]
entry_text = f"\n--- TEXT Zeile {row_num} ---\n{raw_text_short}\n--- ENDE TEXT Zeile {row_num} ---\n"
text_block += entry_text
row_numbers_in_batch.append(row_num)
if not row_numbers_in_batch:
logger.debug("Keine Zeilen uebrig fuer OpenAI Prompt nach Filterung/Kuerzung im Batch.")
return {t['row_num']: "k.A. (Kein Rohtext im Batch)" for t in tasks_data}
prompt_parts.append(text_block)
prompt_parts.append("\n--- Ende der Texte ---")
prompt_parts.append("\nBitte gib NUR die 'RESULTAT <Zeilennummer>: ...' Zeilen zurueck.")
final_prompt = "\n".join(prompt_parts)
chat_response = None
try:
chat_response = call_openai_chat(final_prompt, temperature=0.2)
if not chat_response:
raise openai.error.APIError("Keine Antwort von OpenAI erhalten fuer Batch-Zusammenfassung.")
except Exception as e:
logger.error(f"Endgueltiger FEHLER beim OpenAI-Batch-Aufruf fuer Zusammenfassung: {e}")
return {row_num: f"FEHLER API: {str(e)[:100]}" for row_num in row_numbers_in_batch}
summaries = {}
lines = chat_response.strip().split('\n')
parsed_count = 0
for line in lines:
match = re.match(r"RESULTAT (\d+): (.*)", line.strip())
if match:
row_num = int(match.group(1))
summary_text = match.group(2).strip()
if row_num in row_numbers_in_batch:
summaries[row_num] = summary_text
parsed_count += 1
logger.debug(f"Batch-Zusammenfassung: {parsed_count} von {len(row_numbers_in_batch)} Zeilen erfolgreich geparst.")
if parsed_count < len(row_numbers_in_batch):
logger.warning(f"Nicht alle Zeilen aus dem Batch ({len(row_numbers_in_batch)}) konnten in der OpenAI-Antwort ({len(lines)} Zeilen) geparst werden.")
logger.debug(f"Unerwartete Antwortteile (erste 500 Zeichen): {chat_response[:500]}")
for row_num in row_numbers_in_batch:
if row_num not in summaries:
summaries[row_num] = "FEHLER: Antwort nicht geparst"
original_row_nums = {t['row_num'] for t in tasks_data}
for row_num in original_row_nums:
if row_num not in summaries:
summaries[row_num] = "k.A. (Kein gueltiger Rohtext im Batch)"
return summaries
# ==============================================================================
# 9. OPENAI API WRAPPER (BRANCH EVALUATION)
# ==============================================================================
@retry_on_failure
def evaluate_branche_chatgpt(crm_branche, beschreibung, wiki_branche, wiki_kategorien, website_summary, schema_data):
"""
Bewertet die Branche eines Unternehmens. Stellt die stabile System-Prompt- und Parsing-Logik
aus v1.7.9 wieder her und integriert den intelligenten Fallback.
"""
logger = logging.getLogger(__name__)
if not schema_data or not schema_data.get("allowed_branches"):
return {"branch": "FEHLER - SCHEMA FEHLT", "confidence": "N/A", "consistency": "error_schema_missing", "justification": "Fehler: Schema-Daten fehlen."}
allowed_branches = schema_data["allowed_branches"]
allowed_branches_lookup = {b.lower(): b for b in allowed_branches}
system_prompt_content = (
"Du bist ein Wirtschaftsanalyst, der die Branche eines Unternehmens bewertet.\n"
"Deine Aufgabe ist es, aus den untenstehenden Informationen die am besten passende Branche aus dem 'Ziel-Branchenschema' auszuwählen.\n"
"Bewerte Wikipedia-Daten und externe Branchenbeschreibungen höher als allgemeine Website-Texte.\n"
"Antworte NUR mit den folgenden drei Zeilen im exakten Format:\n"
"Branche: <Exakter Name der Branche aus der Liste>\n"
"Konfidenz: <Hoch, Mittel oder Niedrig>\n"
"Begruendung: <Sehr kurze Begründung für deine Wahl basierend auf den Quelldaten>"
)
user_prompt_parts = [
"--- ZIEL-BRANCHENSCHEMA ---",
"\n".join(f"- {b}" for b in allowed_branches),
"\n--- UNTERNEHMENSDATEN ---"
]
if crm_branche and str(crm_branche).strip() and str(crm_branche).strip().lower() != "k.a.": user_prompt_parts.append(f"- CRM-Branche (Referenz): {str(crm_branche).strip()}")
if wiki_branche and str(wiki_branche).strip() and str(wiki_branche).strip().lower() != "k.a.":
if beschreibung and str(beschreibung).strip() and str(beschreibung).strip().lower() != "k.a.": user_prompt_parts.append(f"- Beschreibung (CRM): {str(beschreibung).strip()[:500]}...")
if website_summary and str(website_summary).strip() and str(website_summary).strip().lower() != "k.a." and not str(website_summary).strip().startswith("k.A. (Fehler"): user_prompt_parts.append(f"- Website-Zusammenfassung: {str(website_summary).strip()[:500]}...")
user_prompt_parts.append(f"- Wikipedia-Branche: {str(wiki_branche).strip()[:300]}...")
if wiki_kategorien and str(wiki_kategorien).strip() and str(wiki_kategorien).strip().lower() != "k.a.": user_prompt_parts.append(f"- Wikipedia-Kategorien: {str(wiki_kategorien).strip()[:500]}...")
else:
if website_summary and str(website_summary).strip() and str(website_summary).strip().lower() != "k.a." and not str(website_summary).strip().startswith("k.A. (Fehler"): user_prompt_parts.append(f"- Website-Zusammenfassung (als Hauptbeschreibung): {str(website_summary).strip()[:800]}...")
elif beschreibung and str(beschreibung).strip() and str(beschreibung).strip().lower() != "k.a.": user_prompt_parts.append(f"- Beschreibung (CRM, als Hauptbeschreibung): {str(beschreibung).strip()[:800]}...")
full_prompt = system_prompt_content + "\n\n" + "\n".join(user_prompt_parts)
try:
chat_response = call_openai_chat(full_prompt, temperature=0.1)
if not chat_response: raise APIError("Keine Antwort von OpenAI erhalten.")
except Exception as e:
return {"branch": "FEHLER API", "confidence": "N/A", "consistency": "error_api_failed", "justification": f"Fehler API: {str(e)[:100]}"}
lines = chat_response.strip().split("\n")
result = {"confidence": "N/A", "justification": ""}
suggested_branch = ""
for line in lines:
if line.lower().strip().startswith("branche:"):
suggested_branch = line.split(":", 1)[1].strip().strip('"\'')
break
if not suggested_branch and lines:
suggested_branch = lines[0].strip().split(":", 1)[-1].strip().strip('"\'')
for line in lines:
if line.lower().strip().startswith("konfidenz:"): result["confidence"] = line.split(":", 1)[1].strip()
elif line.lower().strip().startswith("begruendung:"): result["justification"] = line.split(":", 1)[1].strip()
if not suggested_branch:
return {"branch": "FEHLER PARSING", "confidence": "N/A", "consistency": "error_parsing", "justification": f"Antwort unklar: {chat_response[:100]}"}
final_branch = None
suggested_branch_lower = suggested_branch.lower()
if suggested_branch_lower in allowed_branches_lookup:
final_branch = allowed_branches_lookup[suggested_branch_lower]
else:
best_suggestion_match = next((val for key, val in allowed_branches_lookup.items() if suggested_branch_lower in key.lower()), None)
if best_suggestion_match: final_branch = best_suggestion_match
if final_branch:
result["branch"] = final_branch
result["consistency"] = "ok" if final_branch.lower() == crm_branche.strip().lower() else "X"
else:
crm_short_branch_lower = crm_branche.strip().lower()
best_crm_fallback = next((val for key, val in allowed_branches_lookup.items() if crm_short_branch_lower and crm_short_branch_lower in key.lower()), None)
if best_crm_fallback:
result.update({"branch": best_crm_fallback, "consistency": "fallback_crm_substring", "justification": f"Fallback: KI-Vorschlag ungültig. CRM-Branche '{crm_branche}' passt zu '{best_crm_fallback}'.", "confidence": "N/A (Fallback)"})
else:
result.update({"branch": "FEHLER - UNGUELTIGE ZUWEISUNG", "consistency": "fallback_invalid", "justification": f"Fehler: Weder KI ('{suggested_branch}') noch CRM ('{crm_branche}') passen.", "confidence": "N/A (Fehler)"})
logger.debug(f"Finale Branch-Evaluation: {result}")
return result
# ==============================================================================
# Chat GPT FSM Pitch
# ==============================================================================
@retry_on_failure
def generate_fsm_argument(company_name, crm_branche, website_summary, wiki_absatz, anzahl_ma, anzahl_techniker):
"""
Generiert einen maßgeschneiderten, nicht-werblichen Satz, warum das Unternehmen FSM einsetzen sollte.
Nutzt eine "Chain-of-Thought"-Anweisung für spezifischere Ergebnisse.
"""
logger = logging.getLogger(__name__)
techniker_info = ""
if anzahl_techniker and str(anzahl_techniker).strip().lower() not in ['0', 'k.a.', '']:
techniker_info = f"bei rund {anzahl_techniker} Servicetechnikern im Außendienst"
elif anzahl_ma and str(anzahl_ma).strip().lower() not in ['0', 'k.a.', '']:
techniker_info = f"bei über {anzahl_ma} Mitarbeitern"
else:
techniker_info = "in einem Unternehmen Ihrer Größe"
# Nimm die bessere Beschreibung, wenn vorhanden
beschreibung = website_summary if website_summary and website_summary.lower() != 'k.a.' else wiki_absatz
prompt_parts = [
"Du bist ein B2B-Stratege, der die verborgenen operativen Herausforderungen eines Unternehmens erkennt und präzise auf den Punkt bringt.",
"Aufgabe: Formuliere EINEN EINZIGEN, flüssig lesbaren Satz (ca. 20-35 Wörter), der als hochpersonalisierter Einstieg in einer E-Mail dient.",
"Stil-Regeln:",
"- Absolut NICHT werblich klingen. Keine Produktnamen, keine direkten Lösungsangebote.",
"- Formuliere es als eine scharfsinnige Beobachtung über die operative Tätigkeit des Unternehmens.",
"- Der Satz muss spezifische Keywords aus der Unternehmensbeschreibung aufgreifen.",
"\n--- Denkprozess (Schritt für Schritt): ---",
"1. Analysiere die untenstehenden Unternehmensdaten.",
"2. Identifiziere die **spezifischste genannte Dienstleistung oder das spezifischste Produkt** (z.B. 'Installation von Wärmepumpen', 'Wartung von Aufzügen', 'Verlegung von Glasfaser').",
"3. Leite daraus die **konkrete operative Tätigkeit** ab, die von mobilen Teams durchgeführt wird (z.B. 'Installations-Termine', 'Störungsbehebungen', 'Wartungsarbeiten').",
"4. Formuliere den finalen Satz, der diese spezifische Tätigkeit und die Personalinfo elegant verbindet.",
"\n--- Unternehmenskontext ---",
f"Unternehmen: {company_name}",
f"Branche: {crm_branche}",
f"Beschreibung: {beschreibung}",
f"Personalinfo für den Satz: {techniker_info}",
"\n--- Beispiele für den gewünschten Output-Stil ---",
"Beispiel 1 (Kontext: Branche 'Energie', Beschreibung '...baut Ladeinfrastruktur aus...'): Angesichts des beschleunigten Ausbaus der Ladeinfrastruktur ist die reibungslose Koordination der Installationstermine Ihrer mobilen Teams entscheidend für den Projekterfolg.",
"Beispiel 2 (Kontext: Branche 'Anlagenbau', Beschreibung '...Service für Produktionsanlagen...'): Bei der Wartung komplexer Produktionsanlagen hängt die Kundenzufriedenheit direkt von der pünktlichen und effizienten Durchführung der Serviceeinsätze ab.",
"\n--- Deine Aufgabe ---",
"Führe den Denkprozess durch und gib NUR den finalen, perfekten Satz für das oben genannte Unternehmen aus:",
]
prompt = "\n".join(prompt_parts)
try:
fsm_pitch = call_openai_chat(prompt, temperature=0.7)
return fsm_pitch.strip().replace('"', '') if fsm_pitch else "k.A. (Pitch-Generierung fehlgeschlagen)"
except Exception as e:
logger.error(f"Fehler bei der Generierung des FSM-Pitches für {company_name}: {e}")
return "k.A. (Fehler bei Pitch-Generierung)"
@retry_on_failure
def serp_website_lookup(company_name):
"""
Ermittelt die offizielle Website eines Unternehmens ueber SerpAPI (Google Suche).
Gibt die URL als String oder "k.A." zurück.
"""
logger = logging.getLogger(__name__)
serp_key = Config.API_KEYS.get('serpapi')
if not serp_key:
logger.warning("SerpAPI Key nicht konfiguriert. Website-Suche via SerpAPI übersprungen.")
return "k.A. (SerpAPI Key fehlt)"
if not company_name or str(company_name).strip() == "":
logger.warning("serp_website_lookup: Kein Firmenname angegeben.")
return "k.A. (Kein Firmenname)"
blacklist = ["bloomberg.com", "northdata.de", "finanzen.net", "handelsblatt.com", "wikipedia.org", "linkedin.com", "xing.com", "youtube.com", "facebook.com", "twitter.com", "instagram.com", "glassdoor.com", "kununu.com"]
query = f'{company_name} offizielle Website'
logger.info(f"Starte SerpAPI Website-Suche fuer '{company_name}' mit Query: '{query[:100]}...'")
params = {
"engine": "google", "q": query, "api_key": serp_key, "hl": "de", "gl": "de", "safe": "active"
}
api_url = "https://serpapi.com/search"
try:
response = requests.get(api_url, params=params, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15))
response.raise_for_status()
data = response.json()
if "knowledge_graph" in data and "website" in data["knowledge_graph"]:
kg_url = data["knowledge_graph"].get("website")
if kg_url and isinstance(kg_url, str):
if any(bad_domain in kg_url.lower() for bad_domain in blacklist):
logger.debug(f" -> SerpAPI Website Lookup: KG URL '{kg_url[:100]}...' auf Blacklist. Uebersprungen.")
else:
normalized_url = simple_normalize_url(kg_url)
if normalized_url != "k.A.":
logger.info(f"SERP Lookup: Website '{normalized_url}' aus Knowledge Graph fuer '{company_name}' gefunden.")
return normalized_url
if "organic_results" in data:
for result in data["organic_results"][:5]:
url = result.get("link", "")
title = result.get("title", "")
snippet = result.get("snippet", "")
if url and isinstance(url, str) and url.lower().startswith(("http://", "https://")) and not any(bad_domain in url.lower() for bad_domain in blacklist):
normalized_url = simple_normalize_url(url)
if normalized_url != "k.A.":
normalized_company = normalize_company_name(company_name)
domain_part_normalized = normalized_url.replace('www.', '').split('.')[0]
title_lower = title.lower()
snippet_lower = snippet.lower()
domain_name_match = domain_part_normalized in normalized_company
name_in_result_text = normalized_company in title_lower or normalized_company in snippet_lower
if domain_name_match or name_in_result_text:
logger.info(f"SERP Lookup: Website '{normalized_url}' aus Organic Results fuer '{company_name}' gefunden.")
return normalized_url
logger.info(f"SERP Lookup: Keine passende Website fuer '{company_name}' gefunden nach Pruefung KG und Top Organic Results.")
return "k.A."
except Exception as e:
logger.error(f"FEHLER bei der SerpAPI Website Suche fuer '{company_name}': {e}")
return f"k.A. (Fehler Suche: {str(e)[:100]}...)"
@retry_on_failure
def search_linkedin_contacts(company_name, website, position_query, crm_kurzform, num_results=10):
"""
Sucht LinkedIn Kontakte fuer ein Unternehmen und eine Position ueber SerpAPI (Google).
"""
logger = logging.getLogger(__name__)
serp_key = Config.API_KEYS.get('serpapi')
if not serp_key:
logger.error("Fehler: SerpAPI Key nicht verfuegbar fuer LinkedIn Suche.")
raise ConnectionRefusedError("SerpAPI Key nicht konfiguriert.")
if not all([company_name, position_query, crm_kurzform]) or not all(isinstance(x, str) and x.strip() for x in [company_name, position_query, crm_kurzform]):
logger.warning(f"search_linkedin_contacts: Fehlende oder ungueltige Eingabedaten.")
raise ValueError("Fehlende oder ungueltige Eingabedaten fuer LinkedIn Suche.")
query = f'site:linkedin.com/in/ "{position_query.strip()}" "{crm_kurzform.strip()}"'
logger.info(f"Starte SerpAPI LinkedIn-Suche fuer '{crm_kurzform}' (Position: '{position_query}') mit Query: '{query[:100]}...'")
params = {
"engine": "google", "q": query, "api_key": serp_key, "hl": "de", "gl": "de", "num": num_results
}
api_url = "https://serpapi.com/search"
found_contacts = []
try:
response = requests.get(api_url, params=params, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15))
response.raise_for_status()
data = response.json()
if "organic_results" in data:
for result in data["organic_results"]:
title = result.get("title", "")
linkedin_url = result.get("link", "")
snippet = result.get("snippet", "")
if not linkedin_url or not isinstance(linkedin_url, str) or "linkedin.com/in/" not in linkedin_url.lower() or "/sales/" in linkedin_url.lower():
continue
title_lower = title.lower()
snippet_lower = snippet.lower()
crm_kurzform_lower = crm_kurzform.lower()
position_query_lower = position_query.lower()
is_relevant_result = (crm_kurzform_lower in title_lower or crm_kurzform_lower in snippet_lower) or \
(position_query_lower in title_lower or position_query_lower in snippet_lower)
if not is_relevant_result:
continue
name_part = ""
pos_part = position_query
separators = [" ", " - ", " | ", " at ", " bei "]
title_cleaned = title.replace("...", "").strip()
found_sep = False
for sep in separators:
if sep in title_cleaned:
parts = title_cleaned.split(sep, 1)
name_part = parts[0].strip()
potential_pos_company = parts[1].strip()
potential_pos_company = re.sub(r'[\s|\-]*LinkedIn[\s|\-]*Profile.*$', '', potential_pos_company, flags=re.IGNORECASE).strip()
potential_pos_company = re.sub(r'[\s|\-]*LinkedIn$', '', potential_pos_company, flags=re.IGNORECASE).strip()
if crm_kurzform_lower in potential_pos_company.lower():
pos_company_cleaned = re.sub(r'\b' + re.escape(crm_kurzform_lower) + r'\b', '', potential_pos_company, flags=re.IGNORECASE).strip()
pos_company_cleaned = re.sub(r'\s+', ' ', pos_company_cleaned).strip()
else:
pos_company_cleaned = potential_pos_company
pos_part = pos_company_cleaned if pos_company_cleaned else position_query
found_sep = True
break
if not found_sep:
if position_query_lower in title_lower:
name_before_pos = title_lower.split(position_query_lower, 1)[0].strip()
name_part = title_cleaned[:len(name_before_pos)].strip()
pos_part = position_query
else:
name_part = re.sub(r'[\s|\-]*LinkedIn[\s|\-]*Profile.*$', '', title_cleaned, flags=re.IGNORECASE).strip()
name_part = re.sub(r'[\s|\-]*LinkedIn$', '', name_part, flags=re.IGNORECASE).strip()
pos_part = position_query
firstname, lastname = "", ""
name_parts = name_part.split()
if len(name_parts) > 1:
firstname = name_parts[0]
lastname = " ".join(name_parts[1:])
elif len(name_parts) == 1:
firstname = name_parts[0]
if not firstname or not name_part.strip():
continue
contact_data = {
"Firmenname": company_name,
"CRM Kurzform": crm_kurzform,
"Website": website,
"Vorname": firstname,
"Nachname": lastname,
"Position": pos_part,
"LinkedInURL": linkedin_url
}
found_contacts.append(contact_data)
logger.info(f"LinkedIn Suche fuer '{position_query}' bei '{crm_kurzform}' ergab {len(found_contacts)} Kontakte.")
return found_contacts
except Exception as e:
logger.error(f"FEHLER bei der SerpAPI LinkedIn Suche (Query: '{position_query}', Firma: '{crm_kurzform}'): {e}")
return []
# ==============================================================================
# 11. WEBSITE SCRAPING & VALIDATION UTILITIES
# ==============================================================================
@retry_on_failure
def get_website_raw(url, max_length=20000): # verify_cert wird entfernt
"""
Holt Textinhalt von einer Website, versucht Cookie-Banner zu umgehen.
Versucht zuerst eine sichere Verbindung, bei SSL-Fehler einen unsicheren Fallback.
"""
logger = logging.getLogger(__name__)
if not url or not isinstance(url, str) or url.strip().lower() in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"]:
logger.debug(f"get_website_raw skipped: Ungueltige oder leere URL '{url}'.")
return "k.A."
if not url.lower().startswith(("http://", "https://")):
url = "https://" + url
headers = {"User-Agent": random.choice(USER_AGENTS)}
response = None
error_reason = "Unbekannter Fehler"
return_marker = False
try:
# Erster Versuch: Immer mit Zertifikatsprüfung (sicher)
logger.debug(f"Versuche Website sicher abzurufen: {url[:100]}... (verify=True)")
response = requests.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 20), headers=headers, verify=True, allow_redirects=True, stream=False)
response.raise_for_status()
error_reason = None
except requests.exceptions.SSLError:
# Zweiter Versuch bei SSL-Fehler: Ohne Zertifikatsprüfung
logger.warning(f"SSL-Fehler fuer {url[:100]}... Versuche erneut mit verify=False.")
try:
response = requests.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 20), headers=headers, verify=False, allow_redirects=True, stream=False)
response.raise_for_status()
error_reason = None # Fehler wurde behoben
except Exception as e_fallback:
# Wenn auch der Fallback fehlschlägt, ist es ein anderer Fehler
error_reason = f"Fallback-Request fehlgeschlagen nach SSLError: {type(e_fallback).__name__}"
logger.error(f"{error_reason} fuer {url[:100]}")
except requests.exceptions.Timeout as e_timeout:
error_reason = f"Timeout ({getattr(Config, 'REQUEST_TIMEOUT', 20)}s)"
logger.warning(f"{error_reason} fuer {url[:100]}...")
except requests.exceptions.ConnectionError as e_conn:
error_reason = f"Connection Error: {str(e_conn)[:100]}..."
logger.warning(f"{error_reason} fuer {url[:100]}...")
if "[Errno -2]" in str(e_conn) or "[Errno -3]" in str(e_conn) or "[Errno 111]" in str(e_conn) or "[Errno 113]" in str(e_conn) or "Failed to establish" in str(e_conn):
return_marker = True
except requests.exceptions.HTTPError as e_http:
status_code = e_http.response.status_code
error_reason = f"HTTP Error {status_code} ({e_http.response.reason})"
logger.warning(f"{error_reason} fuer {url[:100]}...")
if status_code == 404:
return_marker = True
except Exception as e_gen:
error_reason = f"Allg. Fehler: {type(e_gen).__name__} - {str(e_gen)[:100]}..."
logger.error(f"Allgemeiner Fehler beim Abrufen von {url[:100]}...: {e_gen}")
logger.debug(traceback.format_exc())
if return_marker:
logger.warning(f"Markiere URL {url[:100]}... zur erneuten Prüfung (Grund: {error_reason}).")
return URL_CHECK_MARKER
elif response is None or error_reason:
return f"k.A. ({error_reason})"
try:
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser'))
content_selectors = ['main', 'article', '#content', '#main-content', '.main-content', '.content', 'div[role="main"]', 'div.page-content', 'div.container']
content_area = None
for selector in content_selectors:
content_area = soup.select_one(selector)
if content_area: break
if not content_area:
content_area = soup.find('body')
if content_area:
banner_selectors = ['[id*="cookie"]', '[class*="cookie"]', '[id*="consent"]', '[class*="consent"]', '.cookie-banner', '.consent-banner', '.modal', '#modal', '.popup', '#popup', '[role="dialog"]', '[aria-modal="true"]']
banners_removed_count = 0
for selector in banner_selectors:
try:
potential_banners = content_area.select(selector)
for banner in potential_banners:
banner_text = banner.get_text(" ", strip=True).lower()
keywords = ["cookie", "zustimm", "ablehnen", "einverstanden", "datenschutz", "privacy", "akzeptier", "einstellung", "partner", "analyse", "marketing"]
element_id_class = (banner.get('id', '') + ' ' + ' '.join(banner.get('class', []))).lower()
if any(keyword in banner_text for keyword in keywords) or any(keyword in element_id_class for keyword in keywords):
banner.decompose()
banners_removed_count += 1
except Exception as e_select:
logger.debug(f"Fehler beim Versuch Banner mit Selektor '{selector}' zu entfernen: {e_select}")
if banners_removed_count > 0:
logger.debug(f"{banners_removed_count} potenzielle Banner-Elemente fuer {url[:100]}... entfernt.")
if content_area:
for script_or_style in content_area(["script", "style"]):
script_or_style.decompose()
text = content_area.get_text(separator=' ', strip=True)
text = re.sub(r'\s+', ' ', text).strip()
banner_keywords_strict = ["cookie", "zustimmen", "ablehnen", "einverstanden", "datenschutz", "privacy", "akzeptier", "einstellung", "partner", "marketing"]
text_lower = text.lower()
keyword_hits = sum(1 for keyword in banner_keywords_strict if keyword in text_lower)
if len(text) < 500 and keyword_hits >= 3:
logger.warning(f"WARNUNG: Extrahierter Text fuer {url[:100]}... scheint nur Cookie-Banner zu sein (Laenge {len(text)}, {keyword_hits} Keywords). Verwerfe Text.")
return "k.A. (Nur Cookie-Banner erkannt)"
result = text[:max_length]
logger.debug(f"Website {url[:100]}... erfolgreich gescrapt. Extrahierter Text (Laenge {len(result)}).")
return result if result else "k.A. (Extraktion leer)"
else:
logger.warning(f"Kein <body> oder spezifischer Inhaltsbereich gefunden in {url[:100]}...")
return "k.A. (Kein Body gefunden)"
except Exception as e_parse:
logger.error(f"Fehler beim Parsen von HTML von {url[:100]}...: {type(e_parse).__name__} - {e_parse}")
logger.debug(traceback.format_exc())
return f"k.A. (Fehler Parsing: {str(e_parse)[:50]}...)"
def scrape_website_details(url):
"""
EXPERIMENTELL: Scrapt eine Website und extrahiert spezifische Details.
"""
logger = logging.getLogger(__name__)
if not url or not isinstance(url, str) or url.strip().lower() in ["k.a.", "kein artikel gefunden", "fehler bei suche", "http:"]:
logger.debug(f"scrape_website_details skipped: Ungueltige oder leere URL '{url}'.")
return "k.A."
logger.warning(f"Ausführe 'scrape_website_details' fuer URL {url[:100]}...")
@retry_on_failure
def get_soup_for_details(target_url):
if not target_url or not isinstance(target_url, str):
raise ValueError(f"Ungültige URL für get_soup_for_details: {target_url}")
if not target_url.lower().startswith(("http://", "https://")):
target_url = "https://" + target_url
response = requests.get(target_url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15), verify=True)
response.raise_for_status()
response.encoding = response.apparent_encoding
return BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser'))
try:
soup = get_soup_for_details(url)
if soup:
title = soup.find('title')
meta_desc = soup.find('meta', attrs={'name': 'description'})
h1 = soup.find('h1')
details_list = []
if title: details_list.append(f"Title: {clean_text(title.get_text())}")
if meta_desc and meta_desc.get('content'): details_list.append(f"Description: {clean_text(meta_desc['content'])}")
if h1: details_list.append(f"H1: {clean_text(h1.get_text())}")
if details_list:
result_string = " | ".join(details_list)
logger.debug(f"Details fuer {url[:100]}... extrahiert: {result_string[:100]}...")
return result_string
else:
logger.debug(f"Keine Standard-Details (Title, Description, H1) gefunden fuer {url[:100]}...")
return "k.A. (Keine Standard-Details gefunden)"
else:
logger.error(f"Scraping fuer Details fehlgeschlagen nach Retries fuer {url[:100]}...")
return "k.A. (Scraping fehlgeschlagen)"
except Exception as e:
logger.error(f"FEHLER in scrape_website_details fuer {url[:100]}...: {type(e).__name__} - {e}")
logger.debug(traceback.format_exc())
return f"k.A. (Fehler: {str(e)[:100]}...)"
def is_valid_wikipedia_article_url(url_to_check, lang=None):
"""
Prueft, ob eine gegebene URL zu einem gueltigen, existierenden Wikipedia-Artikel fuehrt.
"""
logger = logging.getLogger(__name__)
if not url_to_check or not isinstance(url_to_check, str) or "wikipedia.org/wiki/" not in url_to_check.lower():
logger.debug(f"is_valid_wikipedia_article_url: Ungueltige URL-Struktur: {url_to_check[:100]}...")
return False
original_lang = None
if lang:
try:
original_lang = wikipedia.get_lang()
wikipedia.set_lang(lang)
logger.debug(f"Temporaer Wikipedia-Sprache auf '{lang}' gesetzt für Validierung.")
except Exception as e_lang:
logger.warning(f"Konnte Wikipedia-Sprache nicht auf '{lang}' setzen für Validierung: {e_lang}")
is_valid = False
try:
title_part = url_to_check.split('/wiki/', 1)[1].split('#')[0]
title = unquote(title_part).replace('_', ' ')
logger.debug(f"Validiere Wikipedia-Artikel: '{title[:100]}...' (URL: {url_to_check[:100]}...)")
page = wikipedia.page(title, auto_suggest=False, preload=True)
is_valid = True
logger.debug(f" -> Artikel '{title[:100]}...' scheint valide zu sein (Seite geladen).")
except wikipedia.exceptions.PageError:
logger.debug(f" -> Seite '{title[:100]}...' nicht gefunden (PageError).")
is_valid = False
except wikipedia.exceptions.DisambiguationError:
logger.debug(f" -> Seite '{title[:100]}...' ist eine Begriffsklaerungsseite.")
is_valid = False
except Exception as e:
logger.error(f" -> Unerwarteter Fehler bei Validierung von '{title[:100]}...': {type(e).__name__} - {e}")
logger.debug(traceback.format_exc())
is_valid = False
finally:
if original_lang:
try:
wikipedia.set_lang(original_lang)
logger.debug(f"Wikipedia-Sprache zurueck auf '{original_lang}' gesetzt.")
except Exception as e_lang_reset:
logger.warning(f"Konnte Wikipedia-Sprache nicht zurueck auf '{original_lang}' setzen: {e_lang_reset}")
return is_valid
# ==============================================================================
# 12. ALIGNMENT DEMO UTILITY
# ==============================================================================
def alignment_demo(sheet):
"""
Schreibt die Header-Struktur (v2.1.4, 71 Spalten) ins Google Sheet zur Dokumentation.
Dies ist die Single Source of Truth für die Spaltenbedeutung. Version mit ungekürzten Texten.
"""
logger = logging.getLogger(__name__)
logger.info("Starte Alignment Demo für das Hauptblatt (v2.1.4)...")
new_headers = [
[ # Zeile 1: Spaltenname (71 Spalten, A-BS)
"ReEval Flag", "CRM Name", "CRM Kurzform", "Parent Account Name", "CRM Website", "CRM Ort", "CRM Land", "CRM Beschreibung", "CRM Branche", "CRM Beschreibung Branche extern", "CRM Anzahl Techniker", "CRM Umsatz", "CRM Anzahl Mitarbeiter", "CRM Vorschlag Wiki URL", "System Vorschlag Parent Account", "Parent Vorschlag Status", "Parent Vorschlag Timestamp", "Wiki URL", "Wiki Sitz Stadt", "Wiki Sitz Land", "Wiki Absatz", "Wiki Branche", "Wiki Umsatz", "Wiki Mitarbeiter", "Wiki Kategorien", "Wikipedia Timestamp", "Wiki Verif. Timestamp", "SerpAPI Wiki Search Timestamp", "Chat Wiki Konsistenzpruefung", "Chat Begründung Wiki Inkonsistenz", "Chat Vorschlag Wiki Artikel", "Begründung bei Abweichung", "Website Rohtext", "Website Zusammenfassung", "Website Meta-Details", "Website Scrape Timestamp", "URL Prüfstatus", "Chat Vorschlag Branche", "Chat Branche Konfidenz", "Chat Konsistenz Branche", "Chat Begruendung Abweichung Branche", "Chat Prüfung FSM Relevanz", "Chat Begründung für FSM Relevanz", "Chat Schätzung Anzahl Mitarbeiter", "Chat Konsistenzprüfung Mitarbeiterzahl", "Chat Begruendung Abweichung Mitarbeiterzahl", "Chat Einschätzung Anzahl Servicetechniker", "Chat Begründung Abweichung Anzahl Servicetechniker", "Chat Schätzung Umsatz", "Chat Begründung Abweichung Umsatz", "FSM Pitch", "FSM Pitch Timestamp", "Linked Serviceleiter gefunden", "Linked It-Leiter gefunden", "Linked Management gefunden", "Linked Disponent gefunden", "Contact Search Timestamp", "Finaler Umsatz (Wiki>CRM)", "Finaler Mitarbeiter (Wiki>CRM)", "Geschaetzter Techniker Bucket", "Plausibilität Umsatz", "Plausibilität Mitarbeiter", "Plausibilität Umsatz/MA Ratio", "Abweichung Umsatz CRM/Wiki", "Abweichung MA CRM/Wiki", "Plausibilität Begründung", "Plausibilität Prüfdatum", "Timestamp letzte Pruefung", "Version", "Tokens", "CRM ID"
],
[ # Zeile 2: Quelle der Daten
"CRM", "CRM", "CRM", "CRM/Manuell", "CRM", "CRM", "CRM/Manuell", "CRM", "CRM", "CRM", "CRM", "CRM", "CRM", "CRM", "System", "Manuell/System", "System", "Wikipediascraper/SerpAPI/ChatGPT/Manuell", "Wikipediascraper", "Wikipediascraper", "Wikipediascraper", "Wikipediascraper", "Wikipediascraper", "Wikipediascraper", "Wikipediascraper", "System", "System", "System", "ChatGPT API", "ChatGPT API", "ChatGPT API", "System/Manuell", "Web Scraper", "ChatGPT API", "Web Scraper", "System", "System/Web Scraper", "ChatGPT API", "ChatGPT API", "System/ChatGPT API", "ChatGPT API", "ChatGPT API", "ChatGPT API", "ChatGPT API", "System/ChatGPT API", "ChatGPT API", "ChatGPT API", "ChatGPT API", "ChatGPT API", "ChatGPT API", "ChatGPT API", "System", "LinkedIn (via SerpApi)", "LinkedIn (via SerpApi)", "LinkedIn (via SerpApi)", "LinkedIn (via SerpApi)", "System", "Skript (Wiki/CRM Logik)", "Skript (Wiki/CRM Logik)", "ML Modell / Skript", "Skript (Plausi-Check)", "Skript (Plausi-Check)", "Skript (Plausi-Check)", "Skript (Plausi-Check)", "Skript (Plausi-Check)", "Skript (Plausi-Check)", "System (Plausi-Check TS)", "System", "System", "System", "CRM"
],
[ # Zeile 3: Feldkategorie
"Prozess", "Firmenname", "Firmenname", "Konzernstruktur", "Website", "Ort", "Land", "Beschreibung (Text)", "Branche", "Branche", "Anzahl Servicetechniker", "Umsatz", "Anzahl Mitarbeiter", "Wikipedia Artikel URL", "Konzernstruktur (Vorschlag)", "Konzernstruktur (Status)", "Timestamp", "Wikipedia Artikel URL", "Ort", "Land", "Beschreibung (Text)", "Branche", "Umsatz", "Anzahl Mitarbeiter", "Kategorien (Text)", "Timestamp", "Timestamp", "Timestamp", "Verifizierung Wiki-Artikel", "Begründung Verifizierung", "Wikipedia Artikel URL (Vorschlag)", "Begründung URL-Abweichung", "Website-Content", "Website-Content (Zusammenfassung)", "Website-Content (Meta)", "Timestamp", "Prozess-Status", "Branche (Vorschlag KI)", "Branche (Konfidenz KI)", "Branche (Konsistenz)", "Branche (Begründung KI)", "FSM Relevanz (KI)", "FSM Relevanz (Begründung KI)", "Anzahl Mitarbeiter (KI)", "Anzahl Mitarbeiter (Konsistenz KI)", "Anzahl Mitarbeiter (Begründung KI)", "Anzahl Servicetechniker (KI)", "Anzahl Servicetechniker (Begründung KI)", "Umsatz (KI)", "Umsatz (Begründung KI)", "Argumentation", "Timestamp", "Kontakte (Anzahl)", "Kontakte (Anzahl)", "Kontakte (Anzahl)", "Kontakte (Anzahl)", "Timestamp", "Umsatz (Konsolidiert)", "Anzahl Mitarbeiter (Konsolidiert)", "Anzahl Servicetechniker (Bucket ML)", "Plausibilität", "Plausibilität", "Plausibilität", "Datenqualitäts-Indikator", "Datenqualitäts-Indikator", "Plausibilität (Text)", "Timestamp (Plausi)", "Timestamp", "Skript Version", "API Tokens", "System-ID"
],
[ # Zeile 4: Kurze Beschreibung (UNGEKÜRZT)
"Systemspalte, irrelevant für den Prompt. Wird genutzt um die manuelle Neuprüfung dieses Accounts durchzuführen.", "Enthält den Firmennamen nach bestem Gewissen. Firmennamen sind manchmal herausfordernd, insbesondere was unterschiedliche Schreibweisen, Firmierung, Tochter/Mutterfirmen etc. anbelangt. Zur besseren Trefferquote in der Wikipedia-Suche normalisieren wir den Firmennamen und entfernen sämtliche Firmenformen, wie z.B. AG, GmbH, SE etc.", "Enthält eine manuell gepflegte (normalisierte) Kurzform des Firmennamens, wie auch ein Mensch die Firma nennen würde. Dies bedeutet insbesondere, dass die Firmenform wie z.B. GmbH oder AG aus dem Namen entfernt wird. Meist entspricht die Kurzform den ersten beiden Worten des Firmennamens. Manchmal sind auch Worte nötig, wenn die ersten beiden worte zu wenig Aussagekraft haben. Beispiele dafür sind beispielsweise Firmen wie 'Schmidt & Söhne', bei denen 'Schmidt &' wenig Sinn machen würde, oder 'Philip Morris Tabakwaren' - weil in diesem Fall 'Philip Morris' zu generisch wäre bzw. wenig eindeutig.", "Name der direkten Muttergesellschaft / des Hauptkonzerns (falls zutreffend). Manuell gepflegt oder aus CRM. Beeinflusst Konsolidierung und Plausi-Checks.", "Von uns ermittelte Website des Unternehmens, sofern verfügbar.", "von uns ermittelter Ort des Unternehmens", "Land des Unternehmenssitzes laut CRM oder manueller Recherche. Wichtig für regionale Analysen (z.B. DACH).", "Kurze Beschreibung der Haupttätigkeit des Unternehmens aus dem CRM-System. Dient als Input für KI-Analysen.", "Branchenzuweisung aus dem CRM-System. Entspricht idealerweise einer Branche aus dem Ziel-Branchenschema.", "Von externen Datenanbietern (z.B. Dealfront) gelieferte Beschreibung der Branche des Unternehmens. Diese Branchenbeschreibung sollte in den allermeisten Fällen sehr zutreffend sein und ist vermutlich verlässlicher als die aktuelle Branche aus Spalte I.", "Bekannte Anzahl der Servicetechniker des Unternehmens (aus CRM oder Recherche). Dient als Ground Truth für ML.", "Umsatz des Unternehmens in Millionen Euro laut CRM oder Recherche.", "Anzahl der Mitarbeiter des Unternehmens laut CRM oder Recherche.", "Enthält aus einer alten Recherche Vorschläge für die Wikipedia URL zum Unternehmen. Dieser muss aber nicht stimmen. Sollte als Ausgangs- und Vergleichspunkt für die nachgelagerte Wikipedia-Suche dienen. Der Wert soll mit den üblichen Methoden geprüft werden z.B. kommt die normalisierte Website vor, Ähnlichkeitsprüfung des Firmennamens mit dem Artikelnamen von Wikipedia etc.", "Vom System heuristisch ermittelter Vorschlag für den Parent Account (basierend auf Namensähnlichkeiten, Wiki-Infos etc.).", "Status des System-Vorschlags für Parent Account (z.B. 'x' für akzeptiert, '-' für abgelehnt, '?' für unklar zur manuellen Prüfung).", "Zeitstempel der letzten Generierung/Änderung des Parent-Vorschlags/-Status.", "Wikipedia URL aus der Recherche im laufenden Prozess", "Aus Wikipedia-Infobox extrahierte Stadt des Unternehmenssitzes.", "Aus Wikipedia-Infobox extrahiertes Land des Unternehmenssitzes.", "Erster Absatz des Wikipedia-Artikels", "Branche aus Wikipedia-Artikel soweit verfügbar", "Umsatz aus Wikipediaartikel soweit verfügbar.", "Anzahl Mitarbeiter laut Wikipedia sofern verfügbar.", "Komma-separierte Liste der Kategorien, denen der Artikel in Wikipedia zugewiesen wurde. Hier ist auch häufig eine Branche enthalten, häufig auch noch weitere Informationen etwa zur Gründung, ob sie etwa im DAX gelistet ist etc. Guter Anhaltspunkt zur Differenzierung von Unternehmenseinträgen und Wikipedia-Seiten, die kein Unternehmen beschreiben und fälschlicherweise zugewiesen wurden. \nBei jeder Unternehmensseite MUSS das Wort unternehmen in irgendeiner Art und Weise vorkommen.\nNEGATIVSIGNAL: EHEMALIGES UNTERNEHMEN -> Weist darauf hin, dass das Unternehmen nicht mehr besteht.", "Zeitstempel der letzten Wikipedia-Suche und Datenextraktion für diese Zeile (jetzt für Spalten R-Y).", "Zeitstempel der letzten Wikipedia-Artikel-Verifizierung durch ChatGPT (Ergebnis in Spalten AC-AE).", "Zeitstempel des letzten Versuchs, eine fehlende Wiki-URL (R) über SerpAPI zu suchen.", "\"OK\" wird bei Firmen eingetragen, wo Firma und Wikipedia-Eintrag zusammenpassen. \"X\" wird bei Firmen eingetragen, wo Firma und Wikipedia-Eintrag nicht zusammenpassen.", "Begründung welche Inkonsistenz aus den Daten hervorgeht.", "URL des durch ChatGPT recherchierten Wikipedia-Artikels", "XXX derzeit nicht verwendet, wird vermutlich gelöscht xxx", "Roh extrahierter Textinhalt der Firmenwebsite. Basis für Zusammenfassung und KI-Analysen.", "KI-generierte Zusammenfassung des Website-Rohtextes (AG). Input für Branchenbewertung.", "Extrahierte Meta-Daten der Website (Title, Description, H-Tags). Für schnelle Analyse & Validierung.", "Zeitstempel des letzten Website-Scraping/Summarization-Versuchs (für AG-AI).", "Status der URL-Prüfung (z.B. 'URL_CHECK_NEEDED', 'URL_OK', 'FEHLER_SSL'). Wird von 'check_urls' Modus gesetzt/genutzt.", "Durch ChatGPT ermittelte Branche des Unternehmens", "Konfidenz des ChatGPT-Branchenvorschlags (AL), z.B. Hoch/Mittel/Niedrig.", "\"OK\" wird bei Firmen eingetragen, wo die Einschätzung zur Branche mit der CRM Branche übereinstimmt. \"X\" wird ausgegeben, wenn die Einschätzungen nicht zusammenpassen.", "Begründung für Abweichung der Branche von CRM Branche", "\"OK\" wird bei Firmen eingetragen, für die FSM relevant ist, \"X\" für Firmen, für die FSM irrelevant ist.", "Begründung für die Beurteilung in Spalte Chat Begründung für FSM Relevanz", "Anzahl der Mitarbeiter durch ChatGPT geschätzt.", "\"OK\" wird bei Firmen eingetragen, für die Anzahl der Mitarbeiter grob mit der aus Spalte CRM Anzahl Mitarbeiter bzw. der Spalte Wiki Mitarbeiter übereinstimmt. \"X\" für Firmen, bei denen dies nicht zutrifft.", "Begründung für Abweichende Mitarbeiterzahl", "Anzahl der Servicetechniker geschätzt durch Chat GPT", "Begründung für Abweichungen zur Anzahl der Techniker", "Umsatz durch ChatGTP geschätzt", "Begründung für Abweichungen zum Umsatz", "Ein maßgeschneiderter Satz (ca. 20-35 Wörter), der den Nutzen von FSM im spezifischen Unternehmenskontext beleuchtet. Ideal für E-Mail-Automationen.", "Timestamp, wann der FSM Pitch generiert wurde. Steuert die Wiederholung.", "Anzahl der Kontakte die zur Suche 'Serviceleiter', 'Leiter Service', 'technischer Leiter', 'Service Manager', 'Leiter Kundendienst' gefunden wurden", "Anzahl der Kontakte die zur Suche 'Leiter IT', 'IT Leiter', 'Head of IT', 'IT-Leiter', 'CIO' gefunden wurden", "Anzahl der Kontakte die zur Suche 'Geschäftsführer', 'Geschäftsführung', 'GF', 'CEO', 'Geschäftsführerin', 'Managing Director', 'Geschäftsführender Gesellschafter' gefunden wurden", "Anzahl der Kontakte die zur Suche 'Disponent', 'Einsatzplaner' gefunden wurden", "Timestamp des Zeitpunkts zu dem die Kontaktsuche fertiggestellt wurde", "Konsolidierter Umsatzwert in Millionen Euro. Priorisiert Wiki (W) > CRM (L). Berücksichtigt Parent-Account (D).", "Konsolidierte Mitarbeiterzahl (absolut). Priorisiert Wiki (X) > CRM (M). Berücksichtigt Parent-Account (D).", "Ergebnis der Schätzung durch das trainierte Machine-Learning-Modell (Techniker-Bucket).", "Plausibilitätsstatus für den finalen Umsatzwert (BD) (z.B. OK, WARNUNG_HOCH, FEHLER_FORMAT).", "Plausibilitätsstatus für die finale Mitarbeiterzahl (BE) (z.B. OK, WARNUNG_NIEDRIG).", "Plausibilitätsstatus für die Umsatz-pro-Mitarbeiter-Ratio (BD/BE).", "Indikator für Abweichung (>30%) zwischen CRM-Umsatz (L) und Wiki-Umsatz (W). Berücksichtigt Parent-Logik.", "Indikator für Abweichung (>30%) zwischen CRM-MA (M) und Wiki-MA (X). Berücksichtigt Parent-Logik.", "Gesammelte Begründungen für Plausibilitätswarnungen oder -fehler aus den Spalten BG-BK.", "Zeitstempel des letzten Laufs der Plausibilitäts-Checks für diese Zeile.", "Timestamp des Zeitpunkts zu dem die Validierung durch ChatGPT durchgeführt wurde", "Systemspalte zur Ausgabe der Skriptversion die das Ergebnis generiert hat", "Zeigt an, wie viele Tokens für den Request benötigt wurden", "Die eindeutige ID des Accounts aus dem CRM (z.B. Dynamics 365)."
],
[ # Zeile 5: Aufgabe / Funktion (UNGEKÜRZT)
"Datenquelle/Prozesssteuerung: 'x' markiert Zeile für Re-Evaluation.", "Datenquelle", "Datenquelle/Such-Input", "Datenquelle/Konsolidierungs-Logik", "Datenquelle/Scraping-Ziel", "Datenquelle", "Datenquelle", "Datenquelle/KI-Input", "Datenquelle/Referenz", "Datenquelle/KI-Input", "Datenquelle/ML-Groundtruth", "Datenquelle", "Datenquelle", "Quelle (Priorität 1)", "Ziel/System", "Prozesssteuerung", "System", "Ziel/Quelle (Priorität 2)", "Ziel", "Ziel", "Ziel/KI-Input", "Ziel/KI-Input", "Ziel/Konsolidierung", "Ziel/Konsolidierung", "Ziel/KI-Input", "System", "System", "System", "Ziel", "Ziel", "Ziel", "Manuell", "Quelle/Ziel", "Ziel/KI-Input", "Ziel", "System", "System/Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel (E-Mail-Automation)", "System", "Ziel", "Ziel", "Ziel", "Ziel", "System", "Ziel/ML-Input", "Ziel/ML-Input", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "Ziel", "System", "System", "System", "System", "Datenquelle"
]
]
num_cols = len(new_headers[0])
if not all(len(row) == num_cols for row in new_headers):
logger.critical(f"FEHLER in alignment_demo: Inkonsistente Spaltenanzahl! Erwartet {num_cols}. Längen: {[len(r) for r in new_headers]}")
return
end_col_letter = self.sheet_handler._get_col_letter(num_cols)
header_range = f"A1:{end_col_letter}{len(new_headers)}"
logger.info(f"Schreibe Alignment-Demo Header in Bereich {header_range}...")
try:
sheet.update(values=new_headers, range_name=header_range, value_input_option='USER_ENTERED')
logger.info("Alignment-Demo Header erfolgreich geschrieben.")
except Exception as e:
logger.error(f"FEHLER beim Schreiben der Alignment-Demo Header: {e}")