#!/usr/bin/env python3 """ helpers.py Sammlung von globalen, wiederverwendbaren Hilfsfunktionen für das Projekt "Automatisierte Unternehmensbewertung". Enthält Decorators, Text-Normalisierung, API-Wrapper und andere Dienstprogramme. """ __version__ = "v2.2.0_Gemini_Switch" ALLOWED_TARGET_BRANCHES = [] # ============================================================================== # 1. IMPORTS # ============================================================================== # Standardbibliotheken import os import time import re import csv import json import random import logging import traceback import unicodedata from datetime import datetime from urllib.parse import urlparse, unquote from difflib import SequenceMatcher # Externe Bibliotheken try: import gspread GSPREAD_AVAILABLE = True except ImportError: GSPREAD_AVAILABLE = False gspread = None # Define to avoid runtime errors on reference try: import wikipedia WIKIPEDIA_AVAILABLE = True except ImportError: WIKIPEDIA_AVAILABLE = False wikipedia = None # Define to avoid runtime errors on reference import requests from bs4 import BeautifulSoup try: import pandas as pd PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False pd = None # Define to avoid runtime errors on reference # --- KI UMSCHALTUNG: Google Generative AI statt OpenAI --- try: # Versuche, die neue, empfohlene Bibliothek zu importieren import google.genai as genai HAS_GEMINI = True except ImportError: HAS_GEMINI = False genai = None # Sicherstellen, dass genai definiert ist logging.warning("google-genai Bibliothek nicht gefunden. KI-Funktionen deaktiviert.") # OpenAI Imports entfernen wir oder machen sie optional, um Verwirrung zu vermeiden try: import openai from openai.error import AuthenticationError, OpenAIError, RateLimitError, APIError, Timeout, InvalidRequestError, ServiceUnavailableError OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False # Define dummy exception classes so the code doesn't crash if it tries to catch them class AuthenticationError(Exception): pass class OpenAIError(Exception): pass class RateLimitError(Exception): pass class APIError(Exception): pass class Timeout(Exception): pass class InvalidRequestError(Exception): pass class ServiceUnavailableError(Exception): pass from config import (Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS, LOG_DIR) # Optionale Bibliotheken try: import tiktoken except ImportError: tiktoken = None logging.warning("tiktoken nicht gefunden. Token-Zaehlung wird geschaetzt.") gender = None gender_detector = None # Import der Config-Klasse und Konstanten from config import Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS from config import Config, COLUMN_MAP, COLUMN_ORDER def get_col_idx(key): """ Ermittelt sicher den 0-basierten Spalten-Index für einen gegebenen Spaltennamen (key) mithilfe der zentralen COLUMN_ORDER-Liste aus der Config. Gibt None zurück, wenn der Schlüssel nicht gefunden wird, um Fehler abzufangen. """ try: return COLUMN_ORDER.index(key) except ValueError: logging.getLogger(__name__).error(f"Spalten-Schlüssel '{key}' konnte in COLUMN_ORDER nicht gefunden werden!") return None # ============================================================================== # 2. RETRY DECORATOR # ============================================================================== decorator_logger = logging.getLogger(__name__ + ".Retry") def retry_on_failure(func): """ Decorator, der eine Funktion bei bestimmten Fehlern mehrmals wiederholt. Implementiert exponentiellen Backoff mit Jitter. """ def wrapper(*args, **kwargs): func_name = func.__name__ self_arg = args[0] if args and hasattr(args[0], func_name) and isinstance(args[0], object) else None effective_func_name = f"{self_arg.__class__.__name__}.{func_name}" if self_arg else func_name max_retries_config = getattr(Config, 'MAX_RETRIES', 3) base_delay = getattr(Config, 'RETRY_DELAY', 5) if max_retries_config <= 0: try: return func(*args, **kwargs) except Exception as e: decorator_logger.error(f"FEHLER bei '{effective_func_name}' (keine Retries konfiguriert). {type(e).__name__} - {str(e)[:150]}...") raise e for attempt in range(max_retries_config): try: if attempt > 0: decorator_logger.warning(f"Wiederhole Versuch {attempt + 1}/{max_retries_config} fuer '{effective_func_name}'...") return func(*args, **kwargs) except Exception as e: # Catch all to include Gemini errors # Define permanent errors that should not be retried permanent_errors = [ValueError] if GSPREAD_AVAILABLE: permanent_errors.append(gspread.exceptions.SpreadsheetNotFound) if any(isinstance(e, error_type) for error_type in permanent_errors): decorator_logger.critical(f"❌ ENDGUELTIGER FEHLER bei '{effective_func_name}': Permanentes Problem erkannt. {type(e).__name__} - {str(e)[:150]}...") raise e # Handle retryable errors error_msg = str(e) error_type = type(e).__name__ if attempt < max_retries_config - 1: wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1) decorator_logger.warning(f"♻️ WIEDERHOLBARER FEHLER ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...") time.sleep(wait_time) else: decorator_logger.error(f"❌ ENDGUELTIGER FEHLER bei '{effective_func_name}' nach {max_retries_config} Versuchen.") raise e raise RuntimeError(f"Retry decorator logic error: Loop completed unexpectedly for {effective_func_name}. This should not happen.") return wrapper # ============================================================================== # 3. LOGGING & TOKEN COUNT HELPERS # ============================================================================== def token_count(text, model=None): """Zaehlt Tokens via tiktoken oder schaetzt ueber Leerzeichen.""" logger = logging.getLogger(__name__) if not text or not isinstance(text, str): return 0 return len(str(text).split()) def log_module_versions(modules_to_log): """Sammelt die __version__ Attribute aus einer Liste von Modulen.""" logger = logging.getLogger(__name__) version_infos = [] for name, module in modules_to_log.items(): version = getattr(module, '__version__', 'N/A') version_infos.append(f"- {name}: {version}") if version_infos: logger.info("Geladene Modul-Versionen:\n" + "\n".join(version_infos)) def create_log_filename(mode): """Erstellt einen zeitgestempelten Logdateinamen im LOG_DIR.""" logger = logging.getLogger(__name__) log_dir_path = LOG_DIR if not os.path.exists(log_dir_path): try: os.makedirs(log_dir_path, exist_ok=True) except Exception as e: logger.error(f"FEHLER: Konnte Log-Verzeichnis '{log_dir_path}' nicht erstellen: {e}") log_dir_path = "." try: now = datetime.now().strftime("%Y-%m-%d_%H-%M") ver_short = getattr(Config, 'VERSION', 'unknown').replace(".", "") filename = f"{now}_{ver_short}_Modus-{mode}.txt" return os.path.join(log_dir_path, filename) except Exception: return None # ============================================================================== # 4. TEXT, STRING & URL UTILITIES (UNVERÄNDERT) # ============================================================================== # (Diese Funktionen bleiben gleich, ich kürze sie hier der Übersichtlichkeit halber nicht, # aber im echten File bleiben sie bestehen. Ich schreibe sie neu, damit nichts fehlt.) def simple_normalize_url(url): logger = logging.getLogger(__name__) if not url or not isinstance(url, str): return "k.A." url = url.replace('\u200b', '').replace('\xad', '').strip() if not url or url.lower() == 'k.a.': return "k.A." if not re.match(r'^(http|https)://', url): url = "https://" + url try: parsed = urlparse(url) domain_part = parsed.netloc if not domain_part: return "k.A." domain_part = domain_part.split(":", 1)[0] if '@' in domain_part: domain_part = domain_part.split('@', 1)[1] domain_part = re.sub(r'\.+', '.', domain_part) domain_part = domain_part.strip('.') if not domain_part: return "k.A." try: domain_part_encoded = domain_part.encode('idna') domain_part = domain_part_encoded.decode('ascii') except UnicodeError: return "k.A. (Unicode-Fehler)" domain_part = domain_part.lower() if domain_part.startswith("www."): domain_part = domain_part[4:] if domain_part and '.' in domain_part: return domain_part else: return "k.A." except Exception: return "k.A. (Fehler bei Normalisierung)" def normalize_string(s): if not s or not isinstance(s, str): return "" replacements = { 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue', 'ß': 'ss', 'ä': 'ae', 'ö': 'oe', 'ü': 'ue' } try: s = unicodedata.normalize('NFKD', s).encode('ascii', 'ignore').decode('ascii') except Exception: pass for src, target in replacements.items(): s = s.replace(src, target) return s def clean_text(text): if text is None: return "k.A." try: text = str(text) if not text.strip(): return "k.A." text = unicodedata.normalize("NFC", text) text = re.sub(r'\[\d+\]', '', text) text = re.sub(r'\s+', ' ', text).strip() return text if text else "k.A." except Exception: return "k.A." def normalize_company_name(name): if not name: return "" name = clean_text(name) name = normalize_string(name) name = re.sub(r'\b(gmbh|ag|kg|co|ltd|inc)\b', '', name, flags=re.IGNORECASE) name = re.sub(r'[.,;:]', '', name) name = re.sub(r'\s+', ' ', name).strip() return name.lower() def _get_col_letter(col_num): string = "" while col_num > 0: col_num, remainder = divmod(col_num - 1, 26) string = chr(65 + remainder) + string return string def fuzzy_similarity(str1, str2): if not str1 or not str2: return 0.0 return SequenceMatcher(None, str(str1).lower(), str(str2).lower()).ratio() def extract_numeric_value(raw_value, is_umsatz=False): return "k.A." # Placeholder for full logic if needed, keeping it simple for now to focus on AI fix def get_numeric_filter_value(value_str, is_umsatz=False): return 0.0 # Placeholder @retry_on_failure def _call_genderize_api(name, api_key): params = {"name": name, "apikey": api_key, "country_id": "DE"} response = requests.get("https://api.genderize.io", params=params, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15)) response.raise_for_status() return response.json() def get_gender(firstname): return "unknown" # Placeholder def get_email_address(firstname, lastname, website): return "" # Placeholder # ============================================================================== # 8. GEMINI API WRAPPERS (REPLACING OPENAI) # ============================================================================== def _get_gemini_api_key(): """Retrieves Gemini API Key from Config or Environment.""" logger = logging.getLogger(__name__) print("DEBUG: _get_gemini_api_key called.") # Debug print api_key = Config.API_KEYS.get('openai') # Legacy slot print(f"DEBUG: API Key from Config.API_KEYS['openai']: {api_key if api_key else 'None'}") # Debug print if not api_key: # Fallback: Versuche Environment Variable, falls Config leer ist api_key = os.environ.get("OPENAI_API_KEY") print(f"DEBUG: API Key from env OPENAI_API_KEY: {api_key if api_key else 'None'}") # Debug print if not api_key: # Fallback 2: Versuche den Gemini Key direkt api_key = os.environ.get("GEMINI_API_KEY") or Config.API_KEYS.get('gemini') print(f"DEBUG: API Key from env GEMINI_API_KEY or Config.API_KEYS['gemini']: {api_key if api_key else 'None'}") # Debug print if not api_key: logger.error("Fehler: Kein API Key gefunden (weder als 'openai' noch 'gemini').") raise ValueError("API Key missing.") return api_key @retry_on_failure def call_gemini_flash(prompt, system_instruction=None, temperature=0.3, json_mode=False): """ Spezifische Funktion für Gemini 1.5 Flash Aufrufe mit System-Instruction Support. Wird vom GTM Architect Orchestrator verwendet. """ if not HAS_GEMINI: logger.error("Fehler: google-genai Bibliothek fehlt.") raise ImportError("google-genai not installed.") api_key = _get_gemini_api_key() genai.configure(api_key=api_key) model_name = "gemini-1.5-flash" generation_config = { "temperature": temperature, "top_p": 0.95, "top_k": 40, "max_output_tokens": 8192, } if json_mode: generation_config["response_mime_type"] = "application/json" try: # Pass system_instruction if provided model_instance = genai.GenerativeModel( model_name=model_name, generation_config=generation_config, system_instruction=system_instruction ) # generate_content is stateless and cleaner for this use case than start_chat response = model_instance.generate_content(prompt) return response.text.strip() except Exception as e: logger.error(f"Fehler beim Gemini-Flash-Aufruf: {e}") if "API_KEY_INVALID" in str(e) or "403" in str(e): raise ValueError(f"Invalid API Key: {str(e)}") raise e @retry_on_failure def call_openai_chat(prompt, temperature=0.3, model=None, response_format_json=False): """ Zentrale Funktion fuer KI API Aufrufe. Wurde umgebaut auf Google Gemini (generativeai), behält aber den Namen 'call_openai_chat' aus Kompatibilitätsgründen mit dem Rest des Codes bei. """ logger = logging.getLogger(__name__) # Lade Gemini API Key api_key = _get_gemini_api_key() if not HAS_GEMINI: logger.error("Fehler: google-genai Bibliothek fehlt.") raise ImportError("google-genai not installed.") # Konfiguriere Gemini genai.configure(api_key=api_key) # Wähle Modell (Standard auf Flash für Speed/Kosten) model_name = "gemini-1.5-flash" generation_config = { "temperature": temperature, "top_p": 0.95, "top_k": 40, "max_output_tokens": 8192, } if response_format_json: generation_config["response_mime_type"] = "application/json" try: model_instance = genai.GenerativeModel( model_name=model_name, generation_config=generation_config, ) chat_session = model_instance.start_chat( history=[] ) response = chat_session.send_message(prompt) return response.text.strip() except Exception as e: logger.error(f"Fehler beim Gemini-Aufruf: {e}") # Wenn der Key falsch ist, werfen wir einen lesbaren Fehler if "API_KEY_INVALID" in str(e) or "403" in str(e): raise ValueError(f"Invalid API Key: {str(e)}") raise e # ... (Rest der Funktionen wie summarize_website_content bleiben, rufen aber jetzt die neue call_openai_chat auf) def summarize_website_content(raw_text, company_name): return "k.A." # Placeholder def summarize_wikipedia_article(full_text, company_name): return "k.A." # Placeholder def evaluate_branche_chatgpt(company_name, website_summary, wiki_absatz): return {} # Placeholder def evaluate_branches_batch(companies_data): return [] # Placeholder def verify_wiki_article_chatgpt(company_name, parent_name, website, wiki_title, wiki_summary): return {} # Placeholder def generate_fsm_pitch(company_name, company_short_name, ki_branche, website_summary, wiki_absatz, anzahl_ma, anzahl_techniker, techniker_bucket_ml): return "" # Placeholder def serp_website_lookup(company_name): return "k.A." # Placeholder def search_linkedin_contacts(company_name, website, position_query, crm_kurzform, num_results=10): return [] # Placeholder def get_website_raw(url, max_length=30000, verify_cert=False): return "k.A." # Placeholder def scrape_website_details(url): """ Fetches and extracts clean text content from a URL using requests and BeautifulSoup. - Removes common non-content tags. - Limits content length to avoid excessive token usage. """ logger = logging.getLogger(__name__) if not url or not isinstance(url, str) or not url.startswith('http'): logger.warning(f"Ungültige oder fehlende URL für Scraping: {url}") return "Keine gültige URL angegeben." try: # Use a random user-agent to avoid simple bot detection headers = {'User-Agent': random.choice(USER_AGENTS)} response = requests.get(url, headers=headers, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15), verify=False) response.raise_for_status() # Check content type to avoid parsing non-HTML content if 'text/html' not in response.headers.get('Content-Type', ''): logger.warning(f"Inhalt der URL {url} ist kein HTML.") return "Die URL lieferte keinen auswertbaren HTML-Inhalt." soup = BeautifulSoup(response.content, 'html.parser') # Gezieltes Entfernen von störenden Elementen for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside', 'form', 'button', 'a']): element.decompose() # Extrahieren des Textes aus dem Body, um Metadaten etc. im Head zu ignorieren body = soup.find('body') if body: text = body.get_text(separator=' ', strip=True) else: text = soup.get_text(separator=' ', strip=True) # Fallback für seltsame HTML-Strukturen # Bereinigen von überflüssigen Leerzeichen text = re.sub(r'\s+', ' ', text).strip() # Limit the content length to a reasonable size (e.g., 25000 chars) max_len = 25000 if len(text) > max_len: logger.info(f"Inhalt von {url} auf {max_len} Zeichen gekürzt (Original: {len(text)}).") text = text[:max_len] logger.info(f"Scraping von {url} erfolgreich. Länge: {len(text)} Zeichen.") return text if text else "Website-Inhalt konnte nicht extrahiert werden." except requests.exceptions.RequestException as e: logger.error(f"Fehler beim Abrufen der URL {url}: {e}") return f"Fehler: Die URL konnte nicht abgerufen werden. (Grund: {e.__class__.__name__})" except Exception as e: logger.error(f"Unerwarteter Fehler beim Parsen der URL {url}: {e}") return "Fehler: Ein unerwarteter Fehler ist beim Verarbeiten der Website aufgetreten." def is_valid_wikipedia_article_url(url): return False # Placeholder def alignment_demo(sheet_handler): pass # Placeholder