Files
Brancheneinstufung2/helpers.py

399 lines
15 KiB
Python

#!/usr/bin/env python3
"""
helpers.py
Sammlung von globalen, wiederverwendbaren Hilfsfunktionen für das Projekt
"Automatisierte Unternehmensbewertung". Enthält Decorators, Text-Normalisierung,
API-Wrapper und andere Dienstprogramme.
"""
__version__ = "v2.4.0_Final_Fix"
ALLOWED_TARGET_BRANCHES = []
# ==============================================================================
# 1. IMPORTS
# ==============================================================================
# Standardbibliotheken
import os
import time
import re
import csv
import json
import random
import logging
import traceback
import unicodedata
from datetime import datetime
from urllib.parse import urlparse, unquote
from difflib import SequenceMatcher
import base64
import sys
# Externe Bibliotheken
try:
import gspread
GSPREAD_AVAILABLE = True
except ImportError:
GSPREAD_AVAILABLE = False
gspread = None
try:
import wikipedia
WIKIPEDIA_AVAILABLE = True
except ImportError:
WIKIPEDIA_AVAILABLE = False
wikipedia = None
import requests
from bs4 import BeautifulSoup
try:
import pandas as pd
PANDAS_AVAILABLE = True
except Exception as e:
logging.warning(f"Pandas import failed: {e}")
PANDAS_AVAILABLE = False
pd = None
# --- KI UMSCHALTUNG: Google Generative AI (Dual Support) ---
HAS_NEW_GENAI = False
HAS_OLD_GENAI = False
# 1. Neue Bibliothek (google-genai)
try:
from google import genai
from google.genai import types
HAS_NEW_GENAI = True
logging.info("Bibliothek 'google.genai' (v1.0+) geladen.")
except ImportError:
logging.warning("Bibliothek 'google.genai' nicht gefunden. Versuche Fallback.")
# 2. Alte Bibliothek (google-generativeai)
try:
import google.generativeai as old_genai
HAS_OLD_GENAI = True
logging.info("Bibliothek 'google.generativeai' (Legacy) geladen.")
except ImportError:
logging.warning("Bibliothek 'google.generativeai' nicht gefunden.")
HAS_GEMINI = HAS_NEW_GENAI or HAS_OLD_GENAI
# OpenAI Imports (Legacy)
try:
import openai
from openai.error import AuthenticationError, OpenAIError, RateLimitError, APIError, Timeout, InvalidRequestError, ServiceUnavailableError
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
class AuthenticationError(Exception): pass
class OpenAIError(Exception): pass
class RateLimitError(Exception): pass
class APIError(Exception): pass
class Timeout(Exception): pass
class InvalidRequestError(Exception): pass
class ServiceUnavailableError(Exception): pass
from config import (Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS, LOG_DIR)
from config import Config, COLUMN_MAP, COLUMN_ORDER
# Optionale Bibliotheken
try:
import tiktoken
except ImportError:
tiktoken = None
gender = None
gender_detector = None
def get_col_idx(key):
try:
return COLUMN_ORDER.index(key)
except ValueError:
return None
# ==============================================================================
# 2. RETRY DECORATOR
# ==============================================================================
decorator_logger = logging.getLogger(__name__ + ".Retry")
def retry_on_failure(func):
def wrapper(*args, **kwargs):
func_name = func.__name__
self_arg = args[0] if args and hasattr(args[0], func_name) and isinstance(args[0], object) else None
effective_func_name = f"{self_arg.__class__.__name__}.{func_name}" if self_arg else func_name
max_retries_config = getattr(Config, 'MAX_RETRIES', 3)
base_delay = getattr(Config, 'RETRY_DELAY', 5)
if max_retries_config <= 0:
return func(*args, **kwargs)
for attempt in range(max_retries_config):
try:
if attempt > 0:
decorator_logger.warning(f"Wiederhole Versuch {attempt + 1}/{max_retries_config} fuer '{effective_func_name}'...")
return func(*args, **kwargs)
except Exception as e:
permanent_errors = [ValueError]
if GSPREAD_AVAILABLE:
permanent_errors.append(gspread.exceptions.SpreadsheetNotFound)
if any(isinstance(e, error_type) for error_type in permanent_errors):
raise e
if attempt < max_retries_config - 1:
wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
else:
raise e
raise RuntimeError(f"Retry loop error for {effective_func_name}")
return wrapper
# ==============================================================================
# 3. LOGGING & UTILS
# ==============================================================================
def token_count(text, model=None):
if not text or not isinstance(text, str): return 0
return len(str(text).split())
def log_module_versions(modules_to_log):
pass
def create_log_filename(mode):
try:
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
ver_short = getattr(Config, 'VERSION', 'unknown').replace(".", "")
return os.path.join(LOG_DIR, f"{now}_{ver_short}_Modus-{mode}.txt")
except Exception:
return None
# ==============================================================================
# 4. TEXT, STRING & URL UTILITIES
# ==============================================================================
def simple_normalize_url(url): return url if url else "k.A."
def normalize_string(s): return s
def clean_text(text): return str(text).strip() if text else "k.A."
def normalize_company_name(name): return name.lower().strip() if name else ""
def _get_col_letter(col_num): return ""
def fuzzy_similarity(str1, str2): return 0.0
def extract_numeric_value(raw_value, is_umsatz=False): return "k.A."
def get_numeric_filter_value(value_str, is_umsatz=False): return 0.0
@retry_on_failure
def _call_genderize_api(name, api_key): return {}
def get_gender(firstname): return "unknown"
def get_email_address(firstname, lastname, website): return ""
# ==============================================================================
# 8. GEMINI API WRAPPERS
# ==============================================================================
def _get_gemini_api_key():
api_key = Config.API_KEYS.get('gemini') or Config.API_KEYS.get('openai')
if api_key: return api_key
api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("OPENAI_API_KEY")
if api_key: return api_key
raise ValueError("API Key missing.")
@retry_on_failure
def call_gemini_flash(prompt, system_instruction=None, temperature=0.3, json_mode=False):
"""
Ruft Gemini auf (Text). Nutzt gemini-2.0-flash als Standard.
"""
logger = logging.getLogger(__name__)
api_key = _get_gemini_api_key()
# Priorität 1: Alte Bibliothek (bewährt für Text in diesem Setup)
if HAS_OLD_GENAI:
try:
old_genai.configure(api_key=api_key)
generation_config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
generation_config["response_mime_type"] = "application/json"
# WICHTIG: Nutze 2.0, da 1.5 nicht verfügbar war
model = old_genai.GenerativeModel(
model_name="gemini-2.0-flash",
generation_config=generation_config,
system_instruction=system_instruction
)
contents = [prompt] if isinstance(prompt, str) else prompt
response = model.generate_content(contents)
return response.text.strip()
except Exception as e:
logger.error(f"Fehler mit alter GenAI Lib: {e}")
if not HAS_NEW_GENAI: raise e
# Fallthrough to new lib
# Priorität 2: Neue Bibliothek
if HAS_NEW_GENAI:
try:
client = genai.Client(api_key=api_key)
config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
config["response_mime_type"] = "application/json"
response = client.models.generate_content(
model="gemini-2.0-flash",
contents=[prompt] if isinstance(prompt, str) else prompt,
config=config
)
return response.text.strip()
except Exception as e:
logger.error(f"Fehler mit neuer GenAI Lib: {e}")
raise e
raise ImportError("Keine Gemini Bibliothek verfügbar.")
@retry_on_failure
def call_gemini_image(prompt, reference_image_b64=None):
"""
Generiert ein Bild.
- Mit Referenzbild: Gemini 2.5 Flash Image.
- Ohne Referenzbild: Imagen 4.0.
"""
logger = logging.getLogger(__name__)
api_key = _get_gemini_api_key()
if HAS_NEW_GENAI:
try:
client = genai.Client(api_key=api_key)
# --- FALL A: REFERENZBILD VORHANDEN (Gemini 2.5) ---
if reference_image_b64:
try:
from PIL import Image
import io
except ImportError:
raise ImportError("Pillow (PIL) fehlt. Bitte 'pip install Pillow' ausführen.")
logger.info("Start Image-to-Image Generation mit gemini-2.5-flash-image...")
# Base64 zu PIL Image
try:
if "," in reference_image_b64:
reference_image_b64 = reference_image_b64.split(",")[1]
image_data = base64.b64decode(reference_image_b64)
raw_image = Image.open(io.BytesIO(image_data))
except Exception as e:
logger.error(f"Fehler beim Laden des Referenzbildes: {e}")
raise ValueError("Ungültiges Referenzbild.")
# Strengerer Prompt
full_prompt = (
"Use the provided reference image as the absolute truth. "
f"Place EXACTLY this product into the scene: {prompt}. "
"Do NOT alter the product's design, shape, or colors. "
"Keep the product 100% identical to the reference. "
"Only adjust lighting and perspective to match the scene."
)
# KEIN config mit response_mime_type="application/json", das verursacht Fehler!
response = client.models.generate_content(
model='gemini-2.5-flash-image',
contents=[raw_image, full_prompt]
)
if response.candidates and response.candidates[0].content.parts:
for part in response.candidates[0].content.parts:
if part.inline_data:
return base64.b64encode(part.inline_data.data).decode('utf-8')
raise ValueError("Gemini 2.5 hat kein Bild zurückgeliefert.")
# --- FALL B: KEIN REFERENZBILD (Imagen 4) ---
else:
img_config = {
"number_of_images": 1,
"output_mime_type": "image/jpeg"
}
method = getattr(client.models, 'generate_images', None)
if not method:
available_methods = [m for m in dir(client.models) if not m.startswith('_')]
raise AttributeError(f"Client hat keine Image-Methode. Verfügbar: {available_methods}")
candidates = [
'imagen-4.0-generate-001',
'imagen-4.0-fast-generate-001',
'imagen-4.0-ultra-generate-001'
]
last_error = None
for model_name in candidates:
try:
logger.info(f"Versuche Text-zu-Bild mit Modell: {model_name}")
response = method(
model=model_name,
prompt=prompt,
config=img_config
)
if response.generated_images:
image_bytes = response.generated_images[0].image.image_bytes
return base64.b64encode(image_bytes).decode('utf-8')
except Exception as e:
logger.warning(f"Modell {model_name} fehlgeschlagen: {e}")
last_error = e
if last_error: raise last_error
raise ValueError("Kein Modell konnte Bilder generieren.")
except Exception as e:
logger.error(f"Fehler bei Image Gen: {e}")
raise e
else:
logger.error("Image Generation erfordert die neue 'google-genai' Bibliothek.")
raise ImportError("Installieren Sie 'google-genai' für Bildgenerierung.")
@retry_on_failure
def call_openai_chat(prompt, temperature=0.3, model=None, response_format_json=False):
return call_gemini_flash(
prompt=prompt,
temperature=temperature,
json_mode=response_format_json,
system_instruction=None
)
def summarize_website_content(raw_text, company_name): return "k.A."
def summarize_wikipedia_article(full_text, company_name): return "k.A."
def evaluate_branche_chatgpt(company_name, website_summary, wiki_absatz): return {}
def evaluate_branches_batch(companies_data): return []
def verify_wiki_article_chatgpt(company_name, parent_name, website, wiki_title, wiki_summary): return {}
def generate_fsm_pitch(company_name, company_short_name, ki_branche, website_summary, wiki_absatz, anzahl_ma, anzahl_techniker, techniker_bucket_ml): return ""
def serp_website_lookup(company_name): return "k.A."
def search_linkedin_contacts(company_name, website, position_query, crm_kurzform, num_results=10): return []
def get_website_raw(url, max_length=30000, verify_cert=False): return "k.A."
def scrape_website_details(url):
logger = logging.getLogger(__name__)
if not url or not isinstance(url, str) or not url.startswith('http'):
return "Keine gültige URL angegeben."
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
response = requests.get(url, headers=headers, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15), verify=False)
response.raise_for_status()
if 'text/html' not in response.headers.get('Content-Type', ''): return "Kein HTML."
soup = BeautifulSoup(response.content, 'html.parser')
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside', 'form', 'button', 'a']):
element.decompose()
body = soup.find('body')
text = body.get_text(separator=' ', strip=True) if body else soup.get_text(separator=' ', strip=True)
text = re.sub(r'\s+', ' ', text).strip()
return text[:25000] if text else "Leer."
except Exception as e:
logger.error(f"Fehler URL {url}: {e}")
return "Fehler beim Scraping."
def is_valid_wikipedia_article_url(url): return False
def alignment_demo(sheet_handler): pass