#!/usr/bin/env python3 """ helpers.py Sammlung von globalen, wiederverwendbaren Hilfsfunktionen für das Projekt "Automatisierte Unternehmensbewertung". Enthält Decorators, Text-Normalisierung, API-Wrapper und andere Dienstprogramme. """ __version__ = "v2.4.0_Final_Fix" ALLOWED_TARGET_BRANCHES = [] # ============================================================================== # 1. IMPORTS # ============================================================================== # Standardbibliotheken import os import time import re import csv import json import random import logging import traceback import unicodedata from datetime import datetime from urllib.parse import urlparse, unquote from difflib import SequenceMatcher import base64 import sys # Externe Bibliotheken try: import gspread GSPREAD_AVAILABLE = True except ImportError: GSPREAD_AVAILABLE = False gspread = None try: import wikipedia WIKIPEDIA_AVAILABLE = True except ImportError: WIKIPEDIA_AVAILABLE = False wikipedia = None import requests from bs4 import BeautifulSoup try: import pandas as pd PANDAS_AVAILABLE = True except Exception as e: logging.warning(f"Pandas import failed: {e}") PANDAS_AVAILABLE = False pd = None # --- KI UMSCHALTUNG: Google Generative AI (Dual Support) --- HAS_NEW_GENAI = False HAS_OLD_GENAI = False # 1. Neue Bibliothek (google-genai) try: from google import genai from google.genai import types HAS_NEW_GENAI = True logging.info("Bibliothek 'google.genai' (v1.0+) geladen.") except ImportError: logging.warning("Bibliothek 'google.genai' nicht gefunden. Versuche Fallback.") # 2. Alte Bibliothek (google-generativeai) try: import google.generativeai as old_genai HAS_OLD_GENAI = True logging.info("Bibliothek 'google.generativeai' (Legacy) geladen.") except ImportError: logging.warning("Bibliothek 'google.generativeai' nicht gefunden.") HAS_GEMINI = HAS_NEW_GENAI or HAS_OLD_GENAI # OpenAI Imports (Legacy) try: import openai from openai.error import AuthenticationError, OpenAIError, RateLimitError, APIError, Timeout, InvalidRequestError, ServiceUnavailableError OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False class AuthenticationError(Exception): pass class OpenAIError(Exception): pass class RateLimitError(Exception): pass class APIError(Exception): pass class Timeout(Exception): pass class InvalidRequestError(Exception): pass class ServiceUnavailableError(Exception): pass from config import (Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS, LOG_DIR) from config import Config, COLUMN_MAP, COLUMN_ORDER # Optionale Bibliotheken try: import tiktoken except ImportError: tiktoken = None gender = None gender_detector = None def get_col_idx(key): try: return COLUMN_ORDER.index(key) except ValueError: return None # ============================================================================== # 2. RETRY DECORATOR # ============================================================================== decorator_logger = logging.getLogger(__name__ + ".Retry") def retry_on_failure(func): def wrapper(*args, **kwargs): func_name = func.__name__ self_arg = args[0] if args and hasattr(args[0], func_name) and isinstance(args[0], object) else None effective_func_name = f"{self_arg.__class__.__name__}.{func_name}" if self_arg else func_name max_retries_config = getattr(Config, 'MAX_RETRIES', 3) base_delay = getattr(Config, 'RETRY_DELAY', 5) if max_retries_config <= 0: return func(*args, **kwargs) for attempt in range(max_retries_config): try: if attempt > 0: decorator_logger.warning(f"Wiederhole Versuch {attempt + 1}/{max_retries_config} fuer '{effective_func_name}'...") return func(*args, **kwargs) except Exception as e: permanent_errors = [ValueError] if GSPREAD_AVAILABLE: permanent_errors.append(gspread.exceptions.SpreadsheetNotFound) if any(isinstance(e, error_type) for error_type in permanent_errors): raise e if attempt < max_retries_config - 1: wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1) time.sleep(wait_time) else: raise e raise RuntimeError(f"Retry loop error for {effective_func_name}") return wrapper # ============================================================================== # 3. LOGGING & UTILS # ============================================================================== def token_count(text, model=None): if not text or not isinstance(text, str): return 0 return len(str(text).split()) def log_module_versions(modules_to_log): pass def create_log_filename(mode): try: now = datetime.now().strftime("%Y-%m-%d_%H-%M") ver_short = getattr(Config, 'VERSION', 'unknown').replace(".", "") return os.path.join(LOG_DIR, f"{now}_{ver_short}_Modus-{mode}.txt") except Exception: return None # ============================================================================== # 4. TEXT, STRING & URL UTILITIES # ============================================================================== def simple_normalize_url(url): return url if url else "k.A." def normalize_string(s): return s def clean_text(text): return str(text).strip() if text else "k.A." def normalize_company_name(name): return name.lower().strip() if name else "" def _get_col_letter(col_num): return "" def fuzzy_similarity(str1, str2): return 0.0 def extract_numeric_value(raw_value, is_umsatz=False): return "k.A." def get_numeric_filter_value(value_str, is_umsatz=False): return 0.0 @retry_on_failure def _call_genderize_api(name, api_key): return {} def get_gender(firstname): return "unknown" def get_email_address(firstname, lastname, website): return "" # ============================================================================== # 8. GEMINI API WRAPPERS # ============================================================================== def _get_gemini_api_key(): api_key = Config.API_KEYS.get('gemini') or Config.API_KEYS.get('openai') if api_key: return api_key api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("OPENAI_API_KEY") if api_key: return api_key raise ValueError("API Key missing.") @retry_on_failure def call_gemini_flash(prompt, system_instruction=None, temperature=0.3, json_mode=False): """ Ruft Gemini auf (Text). Nutzt gemini-2.0-flash als Standard. """ logger = logging.getLogger(__name__) api_key = _get_gemini_api_key() # Priorität 1: Alte Bibliothek (bewährt für Text in diesem Setup) if HAS_OLD_GENAI: try: old_genai.configure(api_key=api_key) generation_config = { "temperature": temperature, "top_p": 0.95, "top_k": 40, "max_output_tokens": 8192, } if json_mode: generation_config["response_mime_type"] = "application/json" # WICHTIG: Nutze 2.0, da 1.5 nicht verfügbar war model = old_genai.GenerativeModel( model_name="gemini-2.0-flash", generation_config=generation_config, system_instruction=system_instruction ) contents = [prompt] if isinstance(prompt, str) else prompt response = model.generate_content(contents) return response.text.strip() except Exception as e: logger.error(f"Fehler mit alter GenAI Lib: {e}") if not HAS_NEW_GENAI: raise e # Fallthrough to new lib # Priorität 2: Neue Bibliothek if HAS_NEW_GENAI: try: client = genai.Client(api_key=api_key) config = { "temperature": temperature, "top_p": 0.95, "top_k": 40, "max_output_tokens": 8192, } if json_mode: config["response_mime_type"] = "application/json" response = client.models.generate_content( model="gemini-2.0-flash", contents=[prompt] if isinstance(prompt, str) else prompt, config=config ) return response.text.strip() except Exception as e: logger.error(f"Fehler mit neuer GenAI Lib: {e}") raise e raise ImportError("Keine Gemini Bibliothek verfügbar.") @retry_on_failure def call_gemini_image(prompt, reference_image_b64=None, aspect_ratio=None): """ Generiert ein Bild. - Mit Referenzbild: Gemini 2.5 Flash Image. - Ohne Referenzbild: Imagen 4.0. - NEU: Akzeptiert `aspect_ratio` (z.B. "16:9"). - NEU: Wendet einen zentralen Corporate Design Prompt an. """ logger = logging.getLogger(__name__) api_key = _get_gemini_api_key() if HAS_NEW_GENAI: try: client = genai.Client(api_key=api_key) # --- FALL A: REFERENZBILD VORHANDEN (Gemini 2.5) --- if reference_image_b64: try: from PIL import Image import io except ImportError: raise ImportError("Pillow (PIL) fehlt. Bitte 'pip install Pillow' ausführen.") logger.info(f"Start Image-to-Image Generation mit gemini-2.5-flash-image. Seitenverhältnis: {aspect_ratio or 'default'}") # Base64 zu PIL Image try: if "," in reference_image_b64: reference_image_b64 = reference_image_b64.split(",")[1] image_data = base64.b64decode(reference_image_b64) raw_image = Image.open(io.BytesIO(image_data)) except Exception as e: logger.error(f"Fehler beim Laden des Referenzbildes: {e}") raise ValueError("Ungültiges Referenzbild.") # Strengerer Prompt full_prompt = ( "Use the provided reference image as the absolute truth. " f"Place EXACTLY this product into the scene: {prompt}. " "Do NOT alter the product's design, shape, or colors. " "Keep the product 100% identical to the reference. " "Only adjust lighting and perspective to match the scene." ) # Hier können wir das Seitenverhältnis nicht direkt steuern, # da es vom Referenzbild abhängt. Wir könnten es aber in den Prompt einbauen. if aspect_ratio: full_prompt += f" The final image composition should have an aspect ratio of {aspect_ratio}." response = client.models.generate_content( model='gemini-2.5-flash-image', contents=[raw_image, full_prompt] ) if response.candidates and response.candidates[0].content.parts: for part in response.candidates[0].content.parts: if part.inline_data: return base64.b64encode(part.inline_data.data).decode('utf-8') raise ValueError("Gemini 2.5 hat kein Bild zurückgeliefert.") # --- FALL B: KEIN REFERENZBILD (Imagen 4) --- else: img_config = { "number_of_images": 1, "output_mime_type": "image/jpeg", } # Füge Seitenverhältnis hinzu, falls vorhanden if aspect_ratio in ["16:9", "9:16", "1:1", "4:3"]: img_config["aspect_ratio"] = aspect_ratio logger.info(f"Seitenverhältnis auf {aspect_ratio} gesetzt.") # Wende zentralen Stil an final_prompt = f"{Config.CORPORATE_DESIGN_PROMPT}\n\nTask: {prompt}" method = getattr(client.models, 'generate_images', None) if not method: available_methods = [m for m in dir(client.models) if not m.startswith('_')] raise AttributeError(f"Client hat keine Image-Methode. Verfügbar: {available_methods}") candidates = [ 'imagen-4.0-generate-001', 'imagen-4.0-fast-generate-001', 'imagen-4.0-ultra-generate-001' ] last_error = None for model_name in candidates: try: logger.info(f"Versuche Text-zu-Bild mit Modell: {model_name}") response = method( model=model_name, prompt=final_prompt, config=img_config ) if response.generated_images: image_bytes = response.generated_images[0].image.image_bytes return base64.b64encode(image_bytes).decode('utf-8') except Exception as e: logger.warning(f"Modell {model_name} fehlgeschlagen: {e}") last_error = e if last_error: raise last_error raise ValueError("Kein Modell konnte Bilder generieren.") except Exception as e: logger.error(f"Fehler bei Image Gen: {e}") raise e else: logger.error("Image Generation erfordert die neue 'google-genai' Bibliothek.") raise ImportError("Installieren Sie 'google-genai' für Bildgenerierung.") @retry_on_failure def call_openai_chat(prompt, temperature=0.3, model=None, response_format_json=False): return call_gemini_flash( prompt=prompt, temperature=temperature, json_mode=response_format_json, system_instruction=None ) def summarize_website_content(raw_text, company_name): return "k.A." def summarize_wikipedia_article(full_text, company_name): return "k.A." def evaluate_branche_chatgpt(company_name, website_summary, wiki_absatz): return {} def evaluate_branches_batch(companies_data): return [] def verify_wiki_article_chatgpt(company_name, parent_name, website, wiki_title, wiki_summary): return {} def generate_fsm_pitch(company_name, company_short_name, ki_branche, website_summary, wiki_absatz, anzahl_ma, anzahl_techniker, techniker_bucket_ml): return "" def serp_website_lookup(company_name): return "k.A." def search_linkedin_contacts(company_name, website, position_query, crm_kurzform, num_results=10): return [] def get_website_raw(url, max_length=30000, verify_cert=False): return "k.A." def scrape_website_details(url): logger = logging.getLogger(__name__) if not url or not isinstance(url, str) or not url.startswith('http'): return "Keine gültige URL angegeben." try: headers = {'User-Agent': random.choice(USER_AGENTS)} response = requests.get(url, headers=headers, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15), verify=False) response.raise_for_status() if 'text/html' not in response.headers.get('Content-Type', ''): return "Kein HTML." soup = BeautifulSoup(response.content, 'html.parser') for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside', 'form', 'button', 'a']): element.decompose() body = soup.find('body') text = body.get_text(separator=' ', strip=True) if body else soup.get_text(separator=' ', strip=True) text = re.sub(r'\s+', ' ', text).strip() return text[:25000] if text else "Leer." except Exception as e: logger.error(f"Fehler URL {url}: {e}") return "Fehler beim Scraping." def is_valid_wikipedia_article_url(url): return False def alignment_demo(sheet_handler): pass