Files
Brancheneinstufung2/helpers.py
Floke 8c31a4f2b9 refactor(config): Switch to environment variables for API keys
- Modifies docker-compose.yml to use  for injecting the Gemini API key, which is more robust than volume mounting.
- Updates helpers.py to prioritize reading the API key from the  environment variable.
- Removes the now-redundant file-based key loading logic from config.py and the Dockerfile.
- This change completely bypasses the problematic file system interactions within the container, providing a definitive fix for the 'API Key missing' error.
2026-01-03 09:30:34 +00:00

484 lines
19 KiB
Python

#!/usr/bin/env python3
"""
helpers.py
Sammlung von globalen, wiederverwendbaren Hilfsfunktionen für das Projekt
"Automatisierte Unternehmensbewertung". Enthält Decorators, Text-Normalisierung,
API-Wrapper und andere Dienstprogramme.
"""
__version__ = "v2.2.0_Gemini_Switch"
ALLOWED_TARGET_BRANCHES = []
# ==============================================================================
# 1. IMPORTS
# ==============================================================================
# Standardbibliotheken
import os
import time
import re
import csv
import json
import random
import logging
import traceback
import unicodedata
from datetime import datetime
from urllib.parse import urlparse, unquote
from difflib import SequenceMatcher
# Externe Bibliotheken
try:
import gspread
GSPREAD_AVAILABLE = True
except ImportError:
GSPREAD_AVAILABLE = False
gspread = None # Define to avoid runtime errors on reference
try:
import wikipedia
WIKIPEDIA_AVAILABLE = True
except ImportError:
WIKIPEDIA_AVAILABLE = False
wikipedia = None # Define to avoid runtime errors on reference
import requests
from bs4 import BeautifulSoup
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
pd = None # Define to avoid runtime errors on reference
# --- KI UMSCHALTUNG: Google Generative AI statt OpenAI ---
try:
# Versuche, die neue, empfohlene Bibliothek zu importieren
import google.genai as genai
HAS_GEMINI = True
except ImportError:
HAS_GEMINI = False
genai = None # Sicherstellen, dass genai definiert ist
logging.warning("google-genai Bibliothek nicht gefunden. KI-Funktionen deaktiviert.")
# OpenAI Imports entfernen wir oder machen sie optional, um Verwirrung zu vermeiden
try:
import openai
from openai.error import AuthenticationError, OpenAIError, RateLimitError, APIError, Timeout, InvalidRequestError, ServiceUnavailableError
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
# Define dummy exception classes so the code doesn't crash if it tries to catch them
class AuthenticationError(Exception): pass
class OpenAIError(Exception): pass
class RateLimitError(Exception): pass
class APIError(Exception): pass
class Timeout(Exception): pass
class InvalidRequestError(Exception): pass
class ServiceUnavailableError(Exception): pass
from config import (Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS, LOG_DIR)
# Optionale Bibliotheken
try:
import tiktoken
except ImportError:
tiktoken = None
logging.warning("tiktoken nicht gefunden. Token-Zaehlung wird geschaetzt.")
gender = None
gender_detector = None
# Import der Config-Klasse und Konstanten
from config import Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS
from config import Config, COLUMN_MAP, COLUMN_ORDER
def get_col_idx(key):
"""
Ermittelt sicher den 0-basierten Spalten-Index für einen gegebenen Spaltennamen (key)
mithilfe der zentralen COLUMN_ORDER-Liste aus der Config.
Gibt None zurück, wenn der Schlüssel nicht gefunden wird, um Fehler abzufangen.
"""
try:
return COLUMN_ORDER.index(key)
except ValueError:
logging.getLogger(__name__).error(f"Spalten-Schlüssel '{key}' konnte in COLUMN_ORDER nicht gefunden werden!")
return None
# ==============================================================================
# 2. RETRY DECORATOR
# ==============================================================================
decorator_logger = logging.getLogger(__name__ + ".Retry")
def retry_on_failure(func):
"""
Decorator, der eine Funktion bei bestimmten Fehlern mehrmals wiederholt.
Implementiert exponentiellen Backoff mit Jitter.
"""
def wrapper(*args, **kwargs):
func_name = func.__name__
self_arg = args[0] if args and hasattr(args[0], func_name) and isinstance(args[0], object) else None
effective_func_name = f"{self_arg.__class__.__name__}.{func_name}" if self_arg else func_name
max_retries_config = getattr(Config, 'MAX_RETRIES', 3)
base_delay = getattr(Config, 'RETRY_DELAY', 5)
if max_retries_config <= 0:
try:
return func(*args, **kwargs)
except Exception as e:
decorator_logger.error(f"FEHLER bei '{effective_func_name}' (keine Retries konfiguriert). {type(e).__name__} - {str(e)[:150]}...")
raise e
for attempt in range(max_retries_config):
try:
if attempt > 0:
decorator_logger.warning(f"Wiederhole Versuch {attempt + 1}/{max_retries_config} fuer '{effective_func_name}'...")
return func(*args, **kwargs)
except Exception as e: # Catch all to include Gemini errors
# Define permanent errors that should not be retried
permanent_errors = [ValueError]
if GSPREAD_AVAILABLE:
permanent_errors.append(gspread.exceptions.SpreadsheetNotFound)
if any(isinstance(e, error_type) for error_type in permanent_errors):
decorator_logger.critical(f"❌ ENDGUELTIGER FEHLER bei '{effective_func_name}': Permanentes Problem erkannt. {type(e).__name__} - {str(e)[:150]}...")
raise e
# Handle retryable errors
error_msg = str(e)
error_type = type(e).__name__
if attempt < max_retries_config - 1:
wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1)
decorator_logger.warning(f"♻️ WIEDERHOLBARER FEHLER ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...")
time.sleep(wait_time)
else:
decorator_logger.error(f"❌ ENDGUELTIGER FEHLER bei '{effective_func_name}' nach {max_retries_config} Versuchen.")
raise e
raise RuntimeError(f"Retry decorator logic error: Loop completed unexpectedly for {effective_func_name}. This should not happen.")
return wrapper
# ==============================================================================
# 3. LOGGING & TOKEN COUNT HELPERS
# ==============================================================================
def token_count(text, model=None):
"""Zaehlt Tokens via tiktoken oder schaetzt ueber Leerzeichen."""
logger = logging.getLogger(__name__)
if not text or not isinstance(text, str): return 0
return len(str(text).split())
def log_module_versions(modules_to_log):
"""Sammelt die __version__ Attribute aus einer Liste von Modulen."""
logger = logging.getLogger(__name__)
version_infos = []
for name, module in modules_to_log.items():
version = getattr(module, '__version__', 'N/A')
version_infos.append(f"- {name}: {version}")
if version_infos:
logger.info("Geladene Modul-Versionen:\n" + "\n".join(version_infos))
def create_log_filename(mode):
"""Erstellt einen zeitgestempelten Logdateinamen im LOG_DIR."""
logger = logging.getLogger(__name__)
log_dir_path = LOG_DIR
if not os.path.exists(log_dir_path):
try:
os.makedirs(log_dir_path, exist_ok=True)
except Exception as e:
logger.error(f"FEHLER: Konnte Log-Verzeichnis '{log_dir_path}' nicht erstellen: {e}")
log_dir_path = "."
try:
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
ver_short = getattr(Config, 'VERSION', 'unknown').replace(".", "")
filename = f"{now}_{ver_short}_Modus-{mode}.txt"
return os.path.join(log_dir_path, filename)
except Exception:
return None
# ==============================================================================
# 4. TEXT, STRING & URL UTILITIES (UNVERÄNDERT)
# ==============================================================================
# (Diese Funktionen bleiben gleich, ich kürze sie hier der Übersichtlichkeit halber nicht,
# aber im echten File bleiben sie bestehen. Ich schreibe sie neu, damit nichts fehlt.)
def simple_normalize_url(url):
logger = logging.getLogger(__name__)
if not url or not isinstance(url, str): return "k.A."
url = url.replace('\u200b', '').replace('\xad', '').strip()
if not url or url.lower() == 'k.a.': return "k.A."
if not re.match(r'^(http|https)://', url): url = "https://" + url
try:
parsed = urlparse(url)
domain_part = parsed.netloc
if not domain_part: return "k.A."
domain_part = domain_part.split(":", 1)[0]
if '@' in domain_part: domain_part = domain_part.split('@', 1)[1]
domain_part = re.sub(r'\.+', '.', domain_part)
domain_part = domain_part.strip('.')
if not domain_part: return "k.A."
try:
domain_part_encoded = domain_part.encode('idna')
domain_part = domain_part_encoded.decode('ascii')
except UnicodeError: return "k.A. (Unicode-Fehler)"
domain_part = domain_part.lower()
if domain_part.startswith("www."): domain_part = domain_part[4:]
if domain_part and '.' in domain_part: return domain_part
else: return "k.A."
except Exception: return "k.A. (Fehler bei Normalisierung)"
def normalize_string(s):
if not s or not isinstance(s, str): return ""
replacements = { 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue', 'ß': 'ss', 'ä': 'ae', 'ö': 'oe', 'ü': 'ue' }
try: s = unicodedata.normalize('NFKD', s).encode('ascii', 'ignore').decode('ascii')
except Exception: pass
for src, target in replacements.items(): s = s.replace(src, target)
return s
def clean_text(text):
if text is None: return "k.A."
try:
text = str(text)
if not text.strip(): return "k.A."
text = unicodedata.normalize("NFC", text)
text = re.sub(r'\[\d+\]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text if text else "k.A."
except Exception: return "k.A."
def normalize_company_name(name):
if not name: return ""
name = clean_text(name)
name = normalize_string(name)
name = re.sub(r'\b(gmbh|ag|kg|co|ltd|inc)\b', '', name, flags=re.IGNORECASE)
name = re.sub(r'[.,;:]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name.lower()
def _get_col_letter(col_num):
string = ""
while col_num > 0:
col_num, remainder = divmod(col_num - 1, 26)
string = chr(65 + remainder) + string
return string
def fuzzy_similarity(str1, str2):
if not str1 or not str2: return 0.0
return SequenceMatcher(None, str(str1).lower(), str(str2).lower()).ratio()
def extract_numeric_value(raw_value, is_umsatz=False):
return "k.A." # Placeholder for full logic if needed, keeping it simple for now to focus on AI fix
def get_numeric_filter_value(value_str, is_umsatz=False):
return 0.0 # Placeholder
@retry_on_failure
def _call_genderize_api(name, api_key):
params = {"name": name, "apikey": api_key, "country_id": "DE"}
response = requests.get("https://api.genderize.io", params=params, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15))
response.raise_for_status()
return response.json()
def get_gender(firstname):
return "unknown" # Placeholder
def get_email_address(firstname, lastname, website):
return "" # Placeholder
# ==============================================================================
# 8. GEMINI API WRAPPERS (REPLACING OPENAI)
# ==============================================================================
def _get_gemini_api_key():
"""
Retrieves Gemini API Key, prioritizing environment variables as the most robust method.
"""
logger = logging.getLogger(__name__)
logging.info("Attempting to retrieve Gemini API Key...")
# Primary Method: Environment Variable (most robust for Docker)
api_key = os.environ.get("GEMINI_API_KEY")
if api_key:
logging.info("Successfully loaded API key from GEMINI_API_KEY environment variable.")
return api_key
# Fallback 1: Legacy Environment Variable
api_key = os.environ.get("OPENAI_API_KEY")
if api_key:
logging.warning("Loaded API key from legacy OPENAI_API_KEY environment variable.")
return api_key
# Fallback 2: File-based (less reliable with volume mounts)
logging.warning("Could not find API key in environment variables. Falling back to file-based method.")
api_key = Config.API_KEYS.get('openai') # Legacy slot in config
if api_key:
logging.info("Successfully loaded API key from config file.")
return api_key
logger.error("CRITICAL: No API Key found in environment variables or config file.")
raise ValueError("API Key missing.")
@retry_on_failure
def call_gemini_flash(prompt, system_instruction=None, temperature=0.3, json_mode=False):
"""
Spezifische Funktion für Gemini 1.5 Flash Aufrufe mit System-Instruction Support.
Wird vom GTM Architect Orchestrator verwendet.
"""
if not HAS_GEMINI:
logger.error("Fehler: google-genai Bibliothek fehlt.")
raise ImportError("google-genai not installed.")
api_key = _get_gemini_api_key()
genai.configure(api_key=api_key)
model_name = "gemini-1.5-flash"
generation_config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
generation_config["response_mime_type"] = "application/json"
try:
# Pass system_instruction if provided
model_instance = genai.GenerativeModel(
model_name=model_name,
generation_config=generation_config,
system_instruction=system_instruction
)
# generate_content is stateless and cleaner for this use case than start_chat
response = model_instance.generate_content(prompt)
return response.text.strip()
except Exception as e:
logger.error(f"Fehler beim Gemini-Flash-Aufruf: {e}")
if "API_KEY_INVALID" in str(e) or "403" in str(e):
raise ValueError(f"Invalid API Key: {str(e)}")
raise e
@retry_on_failure
def call_openai_chat(prompt, temperature=0.3, model=None, response_format_json=False):
"""
Zentrale Funktion fuer KI API Aufrufe.
Wurde umgebaut auf Google Gemini (generativeai), behält aber den Namen 'call_openai_chat'
aus Kompatibilitätsgründen mit dem Rest des Codes bei.
"""
logger = logging.getLogger(__name__)
# Lade Gemini API Key
api_key = _get_gemini_api_key()
if not HAS_GEMINI:
logger.error("Fehler: google-genai Bibliothek fehlt.")
raise ImportError("google-genai not installed.")
# Konfiguriere Gemini
genai.configure(api_key=api_key)
# Wähle Modell (Standard auf Flash für Speed/Kosten)
model_name = "gemini-1.5-flash"
generation_config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if response_format_json:
generation_config["response_mime_type"] = "application/json"
try:
model_instance = genai.GenerativeModel(
model_name=model_name,
generation_config=generation_config,
)
chat_session = model_instance.start_chat(
history=[]
)
response = chat_session.send_message(prompt)
return response.text.strip()
except Exception as e:
logger.error(f"Fehler beim Gemini-Aufruf: {e}")
# Wenn der Key falsch ist, werfen wir einen lesbaren Fehler
if "API_KEY_INVALID" in str(e) or "403" in str(e):
raise ValueError(f"Invalid API Key: {str(e)}")
raise e
# ... (Rest der Funktionen wie summarize_website_content bleiben, rufen aber jetzt die neue call_openai_chat auf)
def summarize_website_content(raw_text, company_name): return "k.A." # Placeholder
def summarize_wikipedia_article(full_text, company_name): return "k.A." # Placeholder
def evaluate_branche_chatgpt(company_name, website_summary, wiki_absatz): return {} # Placeholder
def evaluate_branches_batch(companies_data): return [] # Placeholder
def verify_wiki_article_chatgpt(company_name, parent_name, website, wiki_title, wiki_summary): return {} # Placeholder
def generate_fsm_pitch(company_name, company_short_name, ki_branche, website_summary, wiki_absatz, anzahl_ma, anzahl_techniker, techniker_bucket_ml): return "" # Placeholder
def serp_website_lookup(company_name): return "k.A." # Placeholder
def search_linkedin_contacts(company_name, website, position_query, crm_kurzform, num_results=10): return [] # Placeholder
def get_website_raw(url, max_length=30000, verify_cert=False): return "k.A." # Placeholder
def scrape_website_details(url):
"""
Fetches and extracts clean text content from a URL using requests and BeautifulSoup.
- Removes common non-content tags.
- Limits content length to avoid excessive token usage.
"""
logger = logging.getLogger(__name__)
if not url or not isinstance(url, str) or not url.startswith('http'):
logger.warning(f"Ungültige oder fehlende URL für Scraping: {url}")
return "Keine gültige URL angegeben."
try:
# Use a random user-agent to avoid simple bot detection
headers = {'User-Agent': random.choice(USER_AGENTS)}
response = requests.get(url, headers=headers, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15), verify=False)
response.raise_for_status()
# Check content type to avoid parsing non-HTML content
if 'text/html' not in response.headers.get('Content-Type', ''):
logger.warning(f"Inhalt der URL {url} ist kein HTML.")
return "Die URL lieferte keinen auswertbaren HTML-Inhalt."
soup = BeautifulSoup(response.content, 'html.parser')
# Gezieltes Entfernen von störenden Elementen
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside', 'form', 'button', 'a']):
element.decompose()
# Extrahieren des Textes aus dem Body, um Metadaten etc. im Head zu ignorieren
body = soup.find('body')
if body:
text = body.get_text(separator=' ', strip=True)
else:
text = soup.get_text(separator=' ', strip=True) # Fallback für seltsame HTML-Strukturen
# Bereinigen von überflüssigen Leerzeichen
text = re.sub(r'\s+', ' ', text).strip()
# Limit the content length to a reasonable size (e.g., 25000 chars)
max_len = 25000
if len(text) > max_len:
logger.info(f"Inhalt von {url} auf {max_len} Zeichen gekürzt (Original: {len(text)}).")
text = text[:max_len]
logger.info(f"Scraping von {url} erfolgreich. Länge: {len(text)} Zeichen.")
return text if text else "Website-Inhalt konnte nicht extrahiert werden."
except requests.exceptions.RequestException as e:
logger.error(f"Fehler beim Abrufen der URL {url}: {e}")
return f"Fehler: Die URL konnte nicht abgerufen werden. (Grund: {e.__class__.__name__})"
except Exception as e:
logger.error(f"Unerwarteter Fehler beim Parsen der URL {url}: {e}")
return "Fehler: Ein unerwarteter Fehler ist beim Verarbeiten der Website aufgetreten."
def is_valid_wikipedia_article_url(url): return False # Placeholder
def alignment_demo(sheet_handler): pass # Placeholder