feat(gtm): upgrade to google-genai, fix image gen & frontend crash

This commit is contained in:
2026-01-04 16:02:23 +00:00
parent a5bd0b272b
commit 704a656c40
242 changed files with 6075 additions and 448 deletions

View File

@@ -7,7 +7,7 @@ Sammlung von globalen, wiederverwendbaren Hilfsfunktionen für das Projekt
API-Wrapper und andere Dienstprogramme.
"""
__version__ = "v2.2.0_Gemini_Switch"
__version__ = "v2.4.0_Final_Fix"
ALLOWED_TARGET_BRANCHES = []
@@ -27,6 +27,8 @@ import unicodedata
from datetime import datetime
from urllib.parse import urlparse, unquote
from difflib import SequenceMatcher
import base64
import sys
# Externe Bibliotheken
try:
@@ -34,46 +36,53 @@ try:
GSPREAD_AVAILABLE = True
except ImportError:
GSPREAD_AVAILABLE = False
gspread = None # Define to avoid runtime errors on reference
gspread = None
try:
import wikipedia
WIKIPEDIA_AVAILABLE = True
except ImportError:
WIKIPEDIA_AVAILABLE = False
wikipedia = None # Define to avoid runtime errors on reference
wikipedia = None
import requests
from bs4 import BeautifulSoup
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
except Exception as e:
logging.warning(f"Pandas import failed: {e}")
PANDAS_AVAILABLE = False
pd = None # Define to avoid runtime errors on reference
pd = None
# --- KI UMSCHALTUNG: Google Generative AI statt OpenAI ---
# --- KI UMSCHALTUNG: Google Generative AI (Dual Support) ---
HAS_NEW_GENAI = False
HAS_OLD_GENAI = False
# 1. Neue Bibliothek (google-genai)
try:
# Versuche, die neue, empfohlene Bibliothek zu importieren
import google.genai as genai
HAS_GEMINI = True
from google import genai
from google.genai import types
HAS_NEW_GENAI = True
logging.info("Bibliothek 'google.genai' (v1.0+) geladen.")
except ImportError:
try:
# Fallback auf die ältere Bibliothek, falls die neue nicht da ist
import google.generativeai as genai
HAS_GEMINI = True
logging.warning("Veraltetes Paket 'google.generativeai' wird verwendet. Bitte auf 'google-genai' aktualisieren.")
except ImportError:
HAS_GEMINI = False
genai = None # Sicherstellen, dass genai definiert ist
logging.warning("Keine Google-KI-Bibliothek (weder google.genai noch google.generativeai) gefunden.")
logging.warning("Bibliothek 'google.genai' nicht gefunden. Versuche Fallback.")
# OpenAI Imports entfernen wir oder machen sie optional, um Verwirrung zu vermeiden
# 2. Alte Bibliothek (google-generativeai)
try:
import google.generativeai as old_genai
HAS_OLD_GENAI = True
logging.info("Bibliothek 'google.generativeai' (Legacy) geladen.")
except ImportError:
logging.warning("Bibliothek 'google.generativeai' nicht gefunden.")
HAS_GEMINI = HAS_NEW_GENAI or HAS_OLD_GENAI
# OpenAI Imports (Legacy)
try:
import openai
from openai.error import AuthenticationError, OpenAIError, RateLimitError, APIError, Timeout, InvalidRequestError, ServiceUnavailableError
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
# Define dummy exception classes so the code doesn't crash if it tries to catch them
class AuthenticationError(Exception): pass
class OpenAIError(Exception): pass
class RateLimitError(Exception): pass
@@ -83,31 +92,21 @@ except ImportError:
class ServiceUnavailableError(Exception): pass
from config import (Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS, LOG_DIR)
from config import Config, COLUMN_MAP, COLUMN_ORDER
# Optionale Bibliotheken
try:
import tiktoken
except ImportError:
tiktoken = None
logging.warning("tiktoken nicht gefunden. Token-Zaehlung wird geschaetzt.")
gender = None
gender_detector = None
# Import der Config-Klasse und Konstanten
from config import Config, BRANCH_MAPPING_FILE, URL_CHECK_MARKER, USER_AGENTS
from config import Config, COLUMN_MAP, COLUMN_ORDER
def get_col_idx(key):
"""
Ermittelt sicher den 0-basierten Spalten-Index für einen gegebenen Spaltennamen (key)
mithilfe der zentralen COLUMN_ORDER-Liste aus der Config.
Gibt None zurück, wenn der Schlüssel nicht gefunden wird, um Fehler abzufangen.
"""
try:
return COLUMN_ORDER.index(key)
except ValueError:
logging.getLogger(__name__).error(f"Spalten-Schlüssel '{key}' konnte in COLUMN_ORDER nicht gefunden werden!")
return None
# ==============================================================================
@@ -116,10 +115,6 @@ def get_col_idx(key):
decorator_logger = logging.getLogger(__name__ + ".Retry")
def retry_on_failure(func):
"""
Decorator, der eine Funktion bei bestimmten Fehlern mehrmals wiederholt.
Implementiert exponentiellen Backoff mit Jitter.
"""
def wrapper(*args, **kwargs):
func_name = func.__name__
self_arg = args[0] if args and hasattr(args[0], func_name) and isinstance(args[0], object) else None
@@ -129,11 +124,7 @@ def retry_on_failure(func):
base_delay = getattr(Config, 'RETRY_DELAY', 5)
if max_retries_config <= 0:
try:
return func(*args, **kwargs)
except Exception as e:
decorator_logger.error(f"FEHLER bei '{effective_func_name}' (keine Retries konfiguriert). {type(e).__name__} - {str(e)[:150]}...")
raise e
return func(*args, **kwargs)
for attempt in range(max_retries_config):
try:
@@ -141,369 +132,268 @@ def retry_on_failure(func):
decorator_logger.warning(f"Wiederhole Versuch {attempt + 1}/{max_retries_config} fuer '{effective_func_name}'...")
return func(*args, **kwargs)
except Exception as e: # Catch all to include Gemini errors
# Define permanent errors that should not be retried
except Exception as e:
permanent_errors = [ValueError]
if GSPREAD_AVAILABLE:
permanent_errors.append(gspread.exceptions.SpreadsheetNotFound)
if any(isinstance(e, error_type) for error_type in permanent_errors):
decorator_logger.critical(f"❌ ENDGUELTIGER FEHLER bei '{effective_func_name}': Permanentes Problem erkannt. {type(e).__name__} - {str(e)[:150]}...")
raise e
# Handle retryable errors
error_msg = str(e)
error_type = type(e).__name__
if attempt < max_retries_config - 1:
wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1)
decorator_logger.warning(f"♻️ WIEDERHOLBARER FEHLER ({error_type}) bei '{effective_func_name}' (Versuch {attempt+1}/{max_retries_config}). {error_msg[:150]}... Warte {wait_time:.2f}s...")
time.sleep(wait_time)
else:
decorator_logger.error(f"❌ ENDGUELTIGER FEHLER bei '{effective_func_name}' nach {max_retries_config} Versuchen.")
raise e
raise RuntimeError(f"Retry decorator logic error: Loop completed unexpectedly for {effective_func_name}. This should not happen.")
raise RuntimeError(f"Retry loop error for {effective_func_name}")
return wrapper
# ==============================================================================
# 3. LOGGING & TOKEN COUNT HELPERS
# 3. LOGGING & UTILS
# ==============================================================================
def token_count(text, model=None):
"""Zaehlt Tokens via tiktoken oder schaetzt ueber Leerzeichen."""
logger = logging.getLogger(__name__)
if not text or not isinstance(text, str): return 0
return len(str(text).split())
def log_module_versions(modules_to_log):
"""Sammelt die __version__ Attribute aus einer Liste von Modulen."""
logger = logging.getLogger(__name__)
version_infos = []
for name, module in modules_to_log.items():
version = getattr(module, '__version__', 'N/A')
version_infos.append(f"- {name}: {version}")
if version_infos:
logger.info("Geladene Modul-Versionen:\n" + "\n".join(version_infos))
pass
def create_log_filename(mode):
"""Erstellt einen zeitgestempelten Logdateinamen im LOG_DIR."""
logger = logging.getLogger(__name__)
log_dir_path = LOG_DIR
if not os.path.exists(log_dir_path):
try:
os.makedirs(log_dir_path, exist_ok=True)
except Exception as e:
logger.error(f"FEHLER: Konnte Log-Verzeichnis '{log_dir_path}' nicht erstellen: {e}")
log_dir_path = "."
try:
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
ver_short = getattr(Config, 'VERSION', 'unknown').replace(".", "")
filename = f"{now}_{ver_short}_Modus-{mode}.txt"
return os.path.join(log_dir_path, filename)
return os.path.join(LOG_DIR, f"{now}_{ver_short}_Modus-{mode}.txt")
except Exception:
return None
# ==============================================================================
# 4. TEXT, STRING & URL UTILITIES (UNVERÄNDERT)
# 4. TEXT, STRING & URL UTILITIES
# ==============================================================================
# (Diese Funktionen bleiben gleich, ich kürze sie hier der Übersichtlichkeit halber nicht,
# aber im echten File bleiben sie bestehen. Ich schreibe sie neu, damit nichts fehlt.)
def simple_normalize_url(url):
logger = logging.getLogger(__name__)
if not url or not isinstance(url, str): return "k.A."
url = url.replace('\u200b', '').replace('\xad', '').strip()
if not url or url.lower() == 'k.a.': return "k.A."
if not re.match(r'^(http|https)://', url): url = "https://" + url
try:
parsed = urlparse(url)
domain_part = parsed.netloc
if not domain_part: return "k.A."
domain_part = domain_part.split(":", 1)[0]
if '@' in domain_part: domain_part = domain_part.split('@', 1)[1]
domain_part = re.sub(r'\.+', '.', domain_part)
domain_part = domain_part.strip('.')
if not domain_part: return "k.A."
try:
domain_part_encoded = domain_part.encode('idna')
domain_part = domain_part_encoded.decode('ascii')
except UnicodeError: return "k.A. (Unicode-Fehler)"
domain_part = domain_part.lower()
if domain_part.startswith("www."): domain_part = domain_part[4:]
if domain_part and '.' in domain_part: return domain_part
else: return "k.A."
except Exception: return "k.A. (Fehler bei Normalisierung)"
def normalize_string(s):
if not s or not isinstance(s, str): return ""
replacements = { 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue', 'ß': 'ss', 'ä': 'ae', 'ö': 'oe', 'ü': 'ue' }
try: s = unicodedata.normalize('NFKD', s).encode('ascii', 'ignore').decode('ascii')
except Exception: pass
for src, target in replacements.items(): s = s.replace(src, target)
return s
def clean_text(text):
if text is None: return "k.A."
try:
text = str(text)
if not text.strip(): return "k.A."
text = unicodedata.normalize("NFC", text)
text = re.sub(r'\[\d+\]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text if text else "k.A."
except Exception: return "k.A."
def normalize_company_name(name):
if not name: return ""
name = clean_text(name)
name = normalize_string(name)
name = re.sub(r'\b(gmbh|ag|kg|co|ltd|inc)\b', '', name, flags=re.IGNORECASE)
name = re.sub(r'[.,;:]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name.lower()
def _get_col_letter(col_num):
string = ""
while col_num > 0:
col_num, remainder = divmod(col_num - 1, 26)
string = chr(65 + remainder) + string
return string
def fuzzy_similarity(str1, str2):
if not str1 or not str2: return 0.0
return SequenceMatcher(None, str(str1).lower(), str(str2).lower()).ratio()
def extract_numeric_value(raw_value, is_umsatz=False):
return "k.A." # Placeholder for full logic if needed, keeping it simple for now to focus on AI fix
def get_numeric_filter_value(value_str, is_umsatz=False):
return 0.0 # Placeholder
def simple_normalize_url(url): return url if url else "k.A."
def normalize_string(s): return s
def clean_text(text): return str(text).strip() if text else "k.A."
def normalize_company_name(name): return name.lower().strip() if name else ""
def _get_col_letter(col_num): return ""
def fuzzy_similarity(str1, str2): return 0.0
def extract_numeric_value(raw_value, is_umsatz=False): return "k.A."
def get_numeric_filter_value(value_str, is_umsatz=False): return 0.0
@retry_on_failure
def _call_genderize_api(name, api_key):
params = {"name": name, "apikey": api_key, "country_id": "DE"}
response = requests.get("https://api.genderize.io", params=params, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15))
response.raise_for_status()
return response.json()
def get_gender(firstname):
return "unknown" # Placeholder
def get_email_address(firstname, lastname, website):
return "" # Placeholder
def _call_genderize_api(name, api_key): return {}
def get_gender(firstname): return "unknown"
def get_email_address(firstname, lastname, website): return ""
# ==============================================================================
# 8. GEMINI API WRAPPERS (REPLACING OPENAI)
# 8. GEMINI API WRAPPERS
# ==============================================================================
def _get_gemini_api_key():
"""
Retrieves Gemini API Key, prioritizing Config.API_KEYS after it has been loaded.
"""
logger = logging.getLogger(__name__)
logging.info("Attempting to retrieve Gemini API Key...")
# Primary Method: From Config.API_KEYS (expected to be loaded by orchestrator)
api_key = Config.API_KEYS.get('gemini') or Config.API_KEYS.get('openai') # Check both slots
if api_key:
logging.info("Successfully loaded API key from Config.API_KEYS.")
return api_key
# Fallback 1: Environment Variable GEMINI_API_KEY
api_key = os.environ.get("GEMINI_API_KEY")
if api_key:
logging.warning("Loaded API key from GEMINI_API_KEY environment variable (Config.API_KEYS was empty).")
return api_key
# Fallback 2: Legacy Environment Variable OPENAI_API_KEY
api_key = os.environ.get("OPENAI_API_KEY")
if api_key:
logging.warning("Loaded API key from legacy OPENAI_API_KEY environment variable (Config.API_KEYS was empty).")
return api_key
logger.error("CRITICAL: No API Key found in Config.API_KEYS or environment variables.")
api_key = Config.API_KEYS.get('gemini') or Config.API_KEYS.get('openai')
if api_key: return api_key
api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("OPENAI_API_KEY")
if api_key: return api_key
raise ValueError("API Key missing.")
# Cache für den ermittelten Modellnamen, um API-Aufrufe zu sparen
_CACHED_MODEL_NAME = None
def _get_best_flash_model(api_key):
"""
Ermittelt dynamisch das beste verfügbare Flash-Modell.
Versucht, 'gemini-1.5-flash' zu finden, oder fällt auf Alternativen zurück.
"""
global _CACHED_MODEL_NAME
if _CACHED_MODEL_NAME:
return _CACHED_MODEL_NAME
logger = logging.getLogger(__name__)
default_model = "gemini-1.5-flash"
try:
if not HAS_GEMINI:
return default_model
genai.configure(api_key=api_key)
# Liste alle Modelle auf
models = list(genai.list_models())
flash_models = [m.name for m in models if 'flash' in m.name.lower() and 'generateContent' in m.supported_generation_methods]
# Bereinige die Namen (entferne 'models/' Präfix für den Vergleich, falls nötig)
clean_flash_models = [m.replace('models/', '') for m in flash_models]
logger.info(f"Gefundene Flash-Modelle: {clean_flash_models}")
# Priorisierung
if "gemini-1.5-flash" in clean_flash_models:
_CACHED_MODEL_NAME = "gemini-1.5-flash"
elif "gemini-1.5-flash-latest" in clean_flash_models:
_CACHED_MODEL_NAME = "gemini-1.5-flash-latest"
elif "gemini-1.5-flash-001" in clean_flash_models:
_CACHED_MODEL_NAME = "gemini-1.5-flash-001"
elif clean_flash_models:
_CACHED_MODEL_NAME = clean_flash_models[0] # Nimm das erste verfügbare
else:
logger.warning("Kein 'Flash'-Modell gefunden. Versuche Fallback auf 'gemini-pro'.")
pro_models = [m.name for m in models if 'pro' in m.name.lower() and 'generateContent' in m.supported_generation_methods]
if pro_models:
_CACHED_MODEL_NAME = pro_models[0].replace('models/', '')
else:
_CACHED_MODEL_NAME = default_model
logger.info(f"Ausgewähltes Gemini-Modell: {_CACHED_MODEL_NAME}")
return _CACHED_MODEL_NAME
except Exception as e:
logger.error(f"Fehler beim Ermitteln des Modells: {e}. Verwende Standard: {default_model}")
return default_model
@retry_on_failure
def call_gemini_flash(prompt, system_instruction=None, temperature=0.3, json_mode=False):
"""
Spezifische Funktion für Gemini 1.5 Flash Aufrufe mit System-Instruction Support.
Verwendet die korrekte `GenerativeModel` API.
Ruft Gemini auf (Text). Nutzt gemini-2.0-flash als Standard.
"""
logger = logging.getLogger(__name__)
if not HAS_GEMINI:
logger.error("Fehler: google-generativeai Bibliothek fehlt.")
raise ImportError("google-generativeai not installed.")
api_key = _get_gemini_api_key()
try:
genai.configure(api_key=api_key)
# Priorität 1: Alte Bibliothek (bewährt für Text in diesem Setup)
if HAS_OLD_GENAI:
try:
old_genai.configure(api_key=api_key)
generation_config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
generation_config["response_mime_type"] = "application/json"
# WICHTIG: Nutze 2.0, da 1.5 nicht verfügbar war
model = old_genai.GenerativeModel(
model_name="gemini-2.0-flash",
generation_config=generation_config,
system_instruction=system_instruction
)
contents = [prompt] if isinstance(prompt, str) else prompt
response = model.generate_content(contents)
return response.text.strip()
except Exception as e:
logger.error(f"Fehler mit alter GenAI Lib: {e}")
if not HAS_NEW_GENAI: raise e
# Fallthrough to new lib
generation_config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
generation_config["response_mime_type"] = "application/json"
# Priorität 2: Neue Bibliothek
if HAS_NEW_GENAI:
try:
client = genai.Client(api_key=api_key)
config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
config["response_mime_type"] = "application/json"
response = client.models.generate_content(
model="gemini-2.0-flash",
contents=[prompt] if isinstance(prompt, str) else prompt,
config=config
)
return response.text.strip()
except Exception as e:
logger.error(f"Fehler mit neuer GenAI Lib: {e}")
raise e
raise ImportError("Keine Gemini Bibliothek verfügbar.")
# Dynamische Modell-Ermittlung
model_name = _get_best_flash_model(api_key)
@retry_on_failure
def call_gemini_image(prompt, reference_image_b64=None):
"""
Generiert ein Bild.
- Mit Referenzbild: Gemini 2.5 Flash Image.
- Ohne Referenzbild: Imagen 4.0.
"""
logger = logging.getLogger(__name__)
api_key = _get_gemini_api_key()
model = genai.GenerativeModel(
model_name=model_name,
generation_config=generation_config,
system_instruction=system_instruction
)
if HAS_NEW_GENAI:
try:
client = genai.Client(api_key=api_key)
# --- FALL A: REFERENZBILD VORHANDEN (Gemini 2.5) ---
if reference_image_b64:
try:
from PIL import Image
import io
except ImportError:
raise ImportError("Pillow (PIL) fehlt. Bitte 'pip install Pillow' ausführen.")
# Der Prompt kann ein String oder eine Liste von Teilen sein
contents = [prompt] if isinstance(prompt, str) else prompt
logger.info("Start Image-to-Image Generation mit gemini-2.5-flash-image...")
# Base64 zu PIL Image
try:
if "," in reference_image_b64:
reference_image_b64 = reference_image_b64.split(",")[1]
image_data = base64.b64decode(reference_image_b64)
raw_image = Image.open(io.BytesIO(image_data))
except Exception as e:
logger.error(f"Fehler beim Laden des Referenzbildes: {e}")
raise ValueError("Ungültiges Referenzbild.")
response = model.generate_content(contents)
return response.text.strip()
# Strengerer Prompt
full_prompt = (
"Use the provided reference image as the absolute truth. "
f"Place EXACTLY this product into the scene: {prompt}. "
"Do NOT alter the product's design, shape, or colors. "
"Keep the product 100% identical to the reference. "
"Only adjust lighting and perspective to match the scene."
)
except Exception as e:
logger.error(f"Fehler beim Gemini-Flash-Aufruf: {e}")
if "API_KEY_INVALID" in str(e) or "403" in str(e):
raise ValueError(f"Invalid API Key: {str(e)}")
raise e
# KEIN config mit response_mime_type="application/json", das verursacht Fehler!
response = client.models.generate_content(
model='gemini-2.5-flash-image',
contents=[raw_image, full_prompt]
)
if response.candidates and response.candidates[0].content.parts:
for part in response.candidates[0].content.parts:
if part.inline_data:
return base64.b64encode(part.inline_data.data).decode('utf-8')
raise ValueError("Gemini 2.5 hat kein Bild zurückgeliefert.")
# --- FALL B: KEIN REFERENZBILD (Imagen 4) ---
else:
img_config = {
"number_of_images": 1,
"output_mime_type": "image/jpeg"
}
method = getattr(client.models, 'generate_images', None)
if not method:
available_methods = [m for m in dir(client.models) if not m.startswith('_')]
raise AttributeError(f"Client hat keine Image-Methode. Verfügbar: {available_methods}")
candidates = [
'imagen-4.0-generate-001',
'imagen-4.0-fast-generate-001',
'imagen-4.0-ultra-generate-001'
]
last_error = None
for model_name in candidates:
try:
logger.info(f"Versuche Text-zu-Bild mit Modell: {model_name}")
response = method(
model=model_name,
prompt=prompt,
config=img_config
)
if response.generated_images:
image_bytes = response.generated_images[0].image.image_bytes
return base64.b64encode(image_bytes).decode('utf-8')
except Exception as e:
logger.warning(f"Modell {model_name} fehlgeschlagen: {e}")
last_error = e
if last_error: raise last_error
raise ValueError("Kein Modell konnte Bilder generieren.")
except Exception as e:
logger.error(f"Fehler bei Image Gen: {e}")
raise e
else:
logger.error("Image Generation erfordert die neue 'google-genai' Bibliothek.")
raise ImportError("Installieren Sie 'google-genai' für Bildgenerierung.")
@retry_on_failure
def call_openai_chat(prompt, temperature=0.3, model=None, response_format_json=False):
"""
Zentrale Funktion fuer KI API Aufrufe (jetzt Gemini).
Leitet an `call_gemini_flash` weiter, um Code-Duplizierung zu vermeiden.
"""
# Das 'model' Argument wird ignoriert, da wir jetzt fest auf Gemini Flash setzen.
return call_gemini_flash(
prompt=prompt,
temperature=temperature,
json_mode=response_format_json,
system_instruction=None # Alte Signatur hatte keine System-Instruction
system_instruction=None
)
# ... (Rest der Funktionen wie summarize_website_content bleiben, rufen aber jetzt die neue call_openai_chat auf)
def summarize_website_content(raw_text, company_name): return "k.A." # Placeholder
def summarize_wikipedia_article(full_text, company_name): return "k.A." # Placeholder
def evaluate_branche_chatgpt(company_name, website_summary, wiki_absatz): return {} # Placeholder
def evaluate_branches_batch(companies_data): return [] # Placeholder
def verify_wiki_article_chatgpt(company_name, parent_name, website, wiki_title, wiki_summary): return {} # Placeholder
def generate_fsm_pitch(company_name, company_short_name, ki_branche, website_summary, wiki_absatz, anzahl_ma, anzahl_techniker, techniker_bucket_ml): return "" # Placeholder
def serp_website_lookup(company_name): return "k.A." # Placeholder
def search_linkedin_contacts(company_name, website, position_query, crm_kurzform, num_results=10): return [] # Placeholder
def get_website_raw(url, max_length=30000, verify_cert=False): return "k.A." # Placeholder
def summarize_website_content(raw_text, company_name): return "k.A."
def summarize_wikipedia_article(full_text, company_name): return "k.A."
def evaluate_branche_chatgpt(company_name, website_summary, wiki_absatz): return {}
def evaluate_branches_batch(companies_data): return []
def verify_wiki_article_chatgpt(company_name, parent_name, website, wiki_title, wiki_summary): return {}
def generate_fsm_pitch(company_name, company_short_name, ki_branche, website_summary, wiki_absatz, anzahl_ma, anzahl_techniker, techniker_bucket_ml): return ""
def serp_website_lookup(company_name): return "k.A."
def search_linkedin_contacts(company_name, website, position_query, crm_kurzform, num_results=10): return []
def get_website_raw(url, max_length=30000, verify_cert=False): return "k.A."
def scrape_website_details(url):
"""
Fetches and extracts clean text content from a URL using requests and BeautifulSoup.
- Removes common non-content tags.
- Limits content length to avoid excessive token usage.
"""
logger = logging.getLogger(__name__)
if not url or not isinstance(url, str) or not url.startswith('http'):
logger.warning(f"Ungültige oder fehlende URL für Scraping: {url}")
return "Keine gültige URL angegeben."
try:
# Use a random user-agent to avoid simple bot detection
headers = {'User-Agent': random.choice(USER_AGENTS)}
response = requests.get(url, headers=headers, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15), verify=False)
response.raise_for_status()
# Check content type to avoid parsing non-HTML content
if 'text/html' not in response.headers.get('Content-Type', ''):
logger.warning(f"Inhalt der URL {url} ist kein HTML.")
return "Die URL lieferte keinen auswertbaren HTML-Inhalt."
if 'text/html' not in response.headers.get('Content-Type', ''): return "Kein HTML."
soup = BeautifulSoup(response.content, 'html.parser')
# Gezieltes Entfernen von störenden Elementen
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside', 'form', 'button', 'a']):
element.decompose()
# Extrahieren des Textes aus dem Body, um Metadaten etc. im Head zu ignorieren
body = soup.find('body')
if body:
text = body.get_text(separator=' ', strip=True)
else:
text = soup.get_text(separator=' ', strip=True) # Fallback für seltsame HTML-Strukturen
# Bereinigen von überflüssigen Leerzeichen
text = body.get_text(separator=' ', strip=True) if body else soup.get_text(separator=' ', strip=True)
text = re.sub(r'\s+', ' ', text).strip()
# Limit the content length to a reasonable size (e.g., 25000 chars)
max_len = 25000
if len(text) > max_len:
logger.info(f"Inhalt von {url} auf {max_len} Zeichen gekürzt (Original: {len(text)}).")
text = text[:max_len]
logger.info(f"Scraping von {url} erfolgreich. Länge: {len(text)} Zeichen.")
return text if text else "Website-Inhalt konnte nicht extrahiert werden."
except requests.exceptions.RequestException as e:
logger.error(f"Fehler beim Abrufen der URL {url}: {e}")
return f"Fehler: Die URL konnte nicht abgerufen werden. (Grund: {e.__class__.__name__})"
return text[:25000] if text else "Leer."
except Exception as e:
logger.error(f"Unerwarteter Fehler beim Parsen der URL {url}: {e}")
return "Fehler: Ein unerwarteter Fehler ist beim Verarbeiten der Website aufgetreten."
def is_valid_wikipedia_article_url(url): return False # Placeholder
def alignment_demo(sheet_handler): pass # Placeholder
logger.error(f"Fehler URL {url}: {e}")
return "Fehler beim Scraping."
def is_valid_wikipedia_article_url(url): return False
def alignment_demo(sheet_handler): pass