- Konsolidiert Dockerfiles in . - Verschiebt Datenbank- und Log-Dateien in . - Organisiert Konfigurations- und Modelldateien in . - Fasst Shell-Skripte in zusammen. - Verschiebt nach . - Verschiebt nach . - Das Verzeichnis wurde in verschoben. - Behält Kern-Dateien (, , , , etc.) im Root-Verzeichnis, um die Lauffähigkeit zu gewährleisten.
412 lines
16 KiB
Python
412 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
helpers.py
|
|
|
|
Sammlung von globalen, wiederverwendbaren Hilfsfunktionen für das Projekt
|
|
"Automatisierte Unternehmensbewertung". Enthält Decorators, Text-Normalisierung,
|
|
API-Wrapper und andere Dienstprogramme.
|
|
"""
|
|
|
|
__version__ = "v2.4.0_Final_Fix"
|
|
|
|
ALLOWED_TARGET_BRANCHES = []
|
|
|
|
# ==============================================================================
|
|
# 1. IMPORTS
|
|
# ==============================================================================
|
|
# Standardbibliotheken
|
|
import os
|
|
import time
|
|
import re
|
|
import csv
|
|
import json
|
|
import random
|
|
import logging
|
|
import traceback
|
|
import unicodedata
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse, unquote
|
|
from difflib import SequenceMatcher
|
|
import base64
|
|
import sys
|
|
|
|
# Externe Bibliotheken
|
|
try:
|
|
import gspread
|
|
GSPREAD_AVAILABLE = True
|
|
except ImportError:
|
|
GSPREAD_AVAILABLE = False
|
|
gspread = None
|
|
try:
|
|
import wikipedia
|
|
WIKIPEDIA_AVAILABLE = True
|
|
except ImportError:
|
|
WIKIPEDIA_AVAILABLE = False
|
|
wikipedia = None
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
try:
|
|
import pandas as pd
|
|
PANDAS_AVAILABLE = True
|
|
except Exception as e:
|
|
logging.warning(f"Pandas import failed: {e}")
|
|
PANDAS_AVAILABLE = False
|
|
pd = None
|
|
|
|
# --- KI UMSCHALTUNG: Google Generative AI (Dual Support) ---
|
|
HAS_NEW_GENAI = False
|
|
HAS_OLD_GENAI = False
|
|
|
|
# 1. Neue Bibliothek (google-genai)
|
|
try:
|
|
from google import genai
|
|
from google.genai import types
|
|
HAS_NEW_GENAI = True
|
|
logging.info("Bibliothek 'google.genai' (v1.0+) geladen.")
|
|
except ImportError:
|
|
logging.warning("Bibliothek 'google.genai' nicht gefunden. Versuche Fallback.")
|
|
|
|
# 2. Alte Bibliothek (google-generativeai)
|
|
try:
|
|
import google.generativeai as old_genai
|
|
HAS_OLD_GENAI = True
|
|
logging.info("Bibliothek 'google.generativeai' (Legacy) geladen.")
|
|
except ImportError:
|
|
logging.warning("Bibliothek 'google.generativeai' nicht gefunden.")
|
|
|
|
HAS_GEMINI = HAS_NEW_GENAI or HAS_OLD_GENAI
|
|
|
|
# OpenAI Imports (Legacy)
|
|
try:
|
|
import openai
|
|
from openai.error import AuthenticationError, OpenAIError, RateLimitError, APIError, Timeout, InvalidRequestError, ServiceUnavailableError
|
|
OPENAI_AVAILABLE = True
|
|
except ImportError:
|
|
OPENAI_AVAILABLE = False
|
|
class AuthenticationError(Exception): pass
|
|
class OpenAIError(Exception): pass
|
|
class RateLimitError(Exception): pass
|
|
class APIError(Exception): pass
|
|
class Timeout(Exception): pass
|
|
class InvalidRequestError(Exception): pass
|
|
class ServiceUnavailableError(Exception): pass
|
|
|
|
from config import (Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS, LOG_DIR)
|
|
from config import Config, COLUMN_MAP, COLUMN_ORDER
|
|
|
|
# Optionale Bibliotheken
|
|
try:
|
|
import tiktoken
|
|
except ImportError:
|
|
tiktoken = None
|
|
|
|
gender = None
|
|
gender_detector = None
|
|
|
|
def get_col_idx(key):
|
|
try:
|
|
return COLUMN_ORDER.index(key)
|
|
except ValueError:
|
|
return None
|
|
|
|
# ==============================================================================
|
|
# 2. RETRY DECORATOR
|
|
# ==============================================================================
|
|
decorator_logger = logging.getLogger(__name__ + ".Retry")
|
|
|
|
def retry_on_failure(func):
|
|
def wrapper(*args, **kwargs):
|
|
func_name = func.__name__
|
|
self_arg = args[0] if args and hasattr(args[0], func_name) and isinstance(args[0], object) else None
|
|
effective_func_name = f"{self_arg.__class__.__name__}.{func_name}" if self_arg else func_name
|
|
|
|
max_retries_config = getattr(Config, 'MAX_RETRIES', 3)
|
|
base_delay = getattr(Config, 'RETRY_DELAY', 5)
|
|
|
|
if max_retries_config <= 0:
|
|
return func(*args, **kwargs)
|
|
|
|
for attempt in range(max_retries_config):
|
|
try:
|
|
if attempt > 0:
|
|
decorator_logger.warning(f"Wiederhole Versuch {attempt + 1}/{max_retries_config} fuer '{effective_func_name}'...")
|
|
return func(*args, **kwargs)
|
|
|
|
except Exception as e:
|
|
permanent_errors = [ValueError]
|
|
if GSPREAD_AVAILABLE:
|
|
permanent_errors.append(gspread.exceptions.SpreadsheetNotFound)
|
|
|
|
if any(isinstance(e, error_type) for error_type in permanent_errors):
|
|
raise e
|
|
|
|
if attempt < max_retries_config - 1:
|
|
wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1)
|
|
time.sleep(wait_time)
|
|
else:
|
|
raise e
|
|
raise RuntimeError(f"Retry loop error for {effective_func_name}")
|
|
|
|
return wrapper
|
|
|
|
# ==============================================================================
|
|
# 3. LOGGING & UTILS
|
|
# ==============================================================================
|
|
|
|
def token_count(text, model=None):
|
|
if not text or not isinstance(text, str): return 0
|
|
return len(str(text).split())
|
|
|
|
def log_module_versions(modules_to_log):
|
|
pass
|
|
|
|
def create_log_filename(mode):
|
|
try:
|
|
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
|
|
ver_short = getattr(Config, 'VERSION', 'unknown').replace(".", "")
|
|
return os.path.join(LOG_DIR, f"{now}_{ver_short}_Modus-{mode}.txt")
|
|
except Exception:
|
|
return None
|
|
|
|
# ==============================================================================
|
|
# 4. TEXT, STRING & URL UTILITIES
|
|
# ==============================================================================
|
|
def simple_normalize_url(url): return url if url else "k.A."
|
|
def normalize_string(s): return s
|
|
def clean_text(text): return str(text).strip() if text else "k.A."
|
|
def normalize_company_name(name): return name.lower().strip() if name else ""
|
|
def _get_col_letter(col_num): return ""
|
|
def fuzzy_similarity(str1, str2): return 0.0
|
|
def extract_numeric_value(raw_value, is_umsatz=False): return "k.A."
|
|
def get_numeric_filter_value(value_str, is_umsatz=False): return 0.0
|
|
@retry_on_failure
|
|
def _call_genderize_api(name, api_key): return {}
|
|
def get_gender(firstname): return "unknown"
|
|
def get_email_address(firstname, lastname, website): return ""
|
|
|
|
# ==============================================================================
|
|
# 8. GEMINI API WRAPPERS
|
|
# ==============================================================================
|
|
|
|
def _get_gemini_api_key():
|
|
api_key = Config.API_KEYS.get('gemini') or Config.API_KEYS.get('openai')
|
|
if api_key: return api_key
|
|
api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("OPENAI_API_KEY")
|
|
if api_key: return api_key
|
|
raise ValueError("API Key missing.")
|
|
|
|
@retry_on_failure
|
|
def call_gemini_flash(prompt, system_instruction=None, temperature=0.3, json_mode=False):
|
|
"""
|
|
Ruft Gemini auf (Text). Nutzt gemini-2.0-flash als Standard.
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
api_key = _get_gemini_api_key()
|
|
|
|
# Priorität 1: Alte Bibliothek (bewährt für Text in diesem Setup)
|
|
if HAS_OLD_GENAI:
|
|
try:
|
|
old_genai.configure(api_key=api_key)
|
|
generation_config = {
|
|
"temperature": temperature,
|
|
"top_p": 0.95,
|
|
"top_k": 40,
|
|
"max_output_tokens": 8192,
|
|
}
|
|
if json_mode:
|
|
generation_config["response_mime_type"] = "application/json"
|
|
|
|
# WICHTIG: Nutze 2.0, da 1.5 nicht verfügbar war
|
|
model = old_genai.GenerativeModel(
|
|
model_name="gemini-2.0-flash",
|
|
generation_config=generation_config,
|
|
system_instruction=system_instruction
|
|
)
|
|
contents = [prompt] if isinstance(prompt, str) else prompt
|
|
response = model.generate_content(contents)
|
|
return response.text.strip()
|
|
except Exception as e:
|
|
logger.error(f"Fehler mit alter GenAI Lib: {e}")
|
|
if not HAS_NEW_GENAI: raise e
|
|
# Fallthrough to new lib
|
|
|
|
# Priorität 2: Neue Bibliothek
|
|
if HAS_NEW_GENAI:
|
|
try:
|
|
client = genai.Client(api_key=api_key)
|
|
config = {
|
|
"temperature": temperature,
|
|
"top_p": 0.95,
|
|
"top_k": 40,
|
|
"max_output_tokens": 8192,
|
|
}
|
|
if json_mode:
|
|
config["response_mime_type"] = "application/json"
|
|
|
|
response = client.models.generate_content(
|
|
model="gemini-2.0-flash",
|
|
contents=[prompt] if isinstance(prompt, str) else prompt,
|
|
config=config
|
|
)
|
|
return response.text.strip()
|
|
except Exception as e:
|
|
logger.error(f"Fehler mit neuer GenAI Lib: {e}")
|
|
raise e
|
|
|
|
raise ImportError("Keine Gemini Bibliothek verfügbar.")
|
|
|
|
@retry_on_failure
|
|
def call_gemini_image(prompt, reference_image_b64=None, aspect_ratio=None):
|
|
"""
|
|
Generiert ein Bild.
|
|
- Mit Referenzbild: Gemini 2.5 Flash Image.
|
|
- Ohne Referenzbild: Imagen 4.0.
|
|
- NEU: Akzeptiert `aspect_ratio` (z.B. "16:9").
|
|
- NEU: Wendet einen zentralen Corporate Design Prompt an.
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
api_key = _get_gemini_api_key()
|
|
|
|
if HAS_NEW_GENAI:
|
|
try:
|
|
client = genai.Client(api_key=api_key)
|
|
|
|
# --- FALL A: REFERENZBILD VORHANDEN (Gemini 2.5) ---
|
|
if reference_image_b64:
|
|
try:
|
|
from PIL import Image
|
|
import io
|
|
except ImportError:
|
|
raise ImportError("Pillow (PIL) fehlt. Bitte 'pip install Pillow' ausführen.")
|
|
|
|
logger.info(f"Start Image-to-Image Generation mit gemini-2.5-flash-image. Seitenverhältnis: {aspect_ratio or 'default'}")
|
|
|
|
# Base64 zu PIL Image
|
|
try:
|
|
if "," in reference_image_b64:
|
|
reference_image_b64 = reference_image_b64.split(",")[1]
|
|
image_data = base64.b64decode(reference_image_b64)
|
|
raw_image = Image.open(io.BytesIO(image_data))
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Laden des Referenzbildes: {e}")
|
|
raise ValueError("Ungültiges Referenzbild.")
|
|
|
|
# Strengerer Prompt
|
|
full_prompt = (
|
|
"Use the provided reference image as the absolute truth. "
|
|
f"Place EXACTLY this product into the scene: {prompt}. "
|
|
"Do NOT alter the product's design, shape, or colors. "
|
|
"Keep the product 100% identical to the reference. "
|
|
"Only adjust lighting and perspective to match the scene."
|
|
)
|
|
|
|
# Hier können wir das Seitenverhältnis nicht direkt steuern,
|
|
# da es vom Referenzbild abhängt. Wir könnten es aber in den Prompt einbauen.
|
|
if aspect_ratio:
|
|
full_prompt += f" The final image composition should have an aspect ratio of {aspect_ratio}."
|
|
|
|
response = client.models.generate_content(
|
|
model='gemini-2.5-flash-image',
|
|
contents=[raw_image, full_prompt]
|
|
)
|
|
|
|
if response.candidates and response.candidates[0].content.parts:
|
|
for part in response.candidates[0].content.parts:
|
|
if part.inline_data:
|
|
return base64.b64encode(part.inline_data.data).decode('utf-8')
|
|
|
|
raise ValueError("Gemini 2.5 hat kein Bild zurückgeliefert.")
|
|
|
|
# --- FALL B: KEIN REFERENZBILD (Imagen 4) ---
|
|
else:
|
|
img_config = {
|
|
"number_of_images": 1,
|
|
"output_mime_type": "image/jpeg",
|
|
}
|
|
# Füge Seitenverhältnis hinzu, falls vorhanden
|
|
if aspect_ratio in ["16:9", "9:16", "1:1", "4:3"]:
|
|
img_config["aspect_ratio"] = aspect_ratio
|
|
logger.info(f"Seitenverhältnis auf {aspect_ratio} gesetzt.")
|
|
|
|
# Wende zentralen Stil an
|
|
final_prompt = f"{Config.CORPORATE_DESIGN_PROMPT}\n\nTask: {prompt}"
|
|
|
|
method = getattr(client.models, 'generate_images', None)
|
|
if not method:
|
|
available_methods = [m for m in dir(client.models) if not m.startswith('_')]
|
|
raise AttributeError(f"Client hat keine Image-Methode. Verfügbar: {available_methods}")
|
|
|
|
candidates = [
|
|
'imagen-4.0-generate-001',
|
|
'imagen-4.0-fast-generate-001',
|
|
'imagen-4.0-ultra-generate-001'
|
|
]
|
|
|
|
last_error = None
|
|
for model_name in candidates:
|
|
try:
|
|
logger.info(f"Versuche Text-zu-Bild mit Modell: {model_name}")
|
|
response = method(
|
|
model=model_name,
|
|
prompt=final_prompt,
|
|
config=img_config
|
|
)
|
|
|
|
if response.generated_images:
|
|
image_bytes = response.generated_images[0].image.image_bytes
|
|
return base64.b64encode(image_bytes).decode('utf-8')
|
|
except Exception as e:
|
|
logger.warning(f"Modell {model_name} fehlgeschlagen: {e}")
|
|
last_error = e
|
|
|
|
if last_error: raise last_error
|
|
raise ValueError("Kein Modell konnte Bilder generieren.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei Image Gen: {e}")
|
|
raise e
|
|
else:
|
|
logger.error("Image Generation erfordert die neue 'google-genai' Bibliothek.")
|
|
raise ImportError("Installieren Sie 'google-genai' für Bildgenerierung.")
|
|
|
|
@retry_on_failure
|
|
def call_openai_chat(prompt, temperature=0.3, model=None, response_format_json=False):
|
|
return call_gemini_flash(
|
|
prompt=prompt,
|
|
temperature=temperature,
|
|
json_mode=response_format_json,
|
|
system_instruction=None
|
|
)
|
|
|
|
def summarize_website_content(raw_text, company_name): return "k.A."
|
|
def summarize_wikipedia_article(full_text, company_name): return "k.A."
|
|
def evaluate_branche_chatgpt(company_name, website_summary, wiki_absatz): return {}
|
|
def evaluate_branches_batch(companies_data): return []
|
|
def verify_wiki_article_chatgpt(company_name, parent_name, website, wiki_title, wiki_summary): return {}
|
|
def generate_fsm_pitch(company_name, company_short_name, ki_branche, website_summary, wiki_absatz, anzahl_ma, anzahl_techniker, techniker_bucket_ml): return ""
|
|
def serp_website_lookup(company_name): return "k.A."
|
|
def search_linkedin_contacts(company_name, website, position_query, crm_kurzform, num_results=10): return []
|
|
def get_website_raw(url, max_length=30000, verify_cert=False): return "k.A."
|
|
|
|
def scrape_website_details(url):
|
|
logger = logging.getLogger(__name__)
|
|
if not url or not isinstance(url, str) or not url.startswith('http'):
|
|
return "Keine gültige URL angegeben."
|
|
try:
|
|
headers = {'User-Agent': random.choice(USER_AGENTS)}
|
|
response = requests.get(url, headers=headers, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15), verify=False)
|
|
response.raise_for_status()
|
|
if 'text/html' not in response.headers.get('Content-Type', ''): return "Kein HTML."
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside', 'form', 'button', 'a']):
|
|
element.decompose()
|
|
body = soup.find('body')
|
|
text = body.get_text(separator=' ', strip=True) if body else soup.get_text(separator=' ', strip=True)
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
return text[:25000] if text else "Leer."
|
|
except Exception as e:
|
|
logger.error(f"Fehler URL {url}: {e}")
|
|
return "Fehler beim Scraping."
|
|
|
|
def is_valid_wikipedia_article_url(url): return False
|
|
def alignment_demo(sheet_handler): pass |