Files
Brancheneinstufung2/company-explorer/backend/lib/core_utils.py
Floke a43b01bb6e feat(company-explorer): add wikipedia integration, robotics settings, and manual overrides
- Ported robust Wikipedia extraction logic (categories, first paragraph) from legacy system.
- Implemented database-driven Robotics Category configuration with frontend settings UI.
- Updated Robotics Potential analysis to use Chain-of-Thought infrastructure reasoning.
- Added Manual Override features for Wikipedia URL (with locking) and Website URL (with re-scrape trigger).
- Enhanced Inspector UI with Wikipedia profile, category tags, and action buttons.
2026-01-08 16:14:01 +01:00

248 lines
8.0 KiB
Python

import time
import logging
import random
import os
import re
import unicodedata
from urllib.parse import urlparse
from functools import wraps
from typing import Optional, Union, List
from thefuzz import fuzz
# Versuche neue Google GenAI Lib (v1.0+)
try:
from google import genai
from google.genai import types
HAS_NEW_GENAI = True
except ImportError:
HAS_NEW_GENAI = False
# Fallback auf alte Lib
try:
import google.generativeai as old_genai
HAS_OLD_GENAI = True
except ImportError:
HAS_OLD_GENAI = False
from ..config import settings
logger = logging.getLogger(__name__)
# ==============================================================================
# 1. DECORATORS
# ==============================================================================
def retry_on_failure(max_retries: int = 3, delay: float = 2.0):
"""
Decorator for retrying functions with exponential backoff.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
# Don't retry on certain fatal errors (can be extended)
if isinstance(e, ValueError) and "API Key" in str(e):
raise e
wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Retry {attempt + 1}/{max_retries} for '{func.__name__}' after error: {e}. Waiting {wait_time:.1f}s")
time.sleep(wait_time)
logger.error(f"Function '{func.__name__}' failed after {max_retries} attempts.")
raise last_exception
return wrapper
return decorator
# ==============================================================================
# 2. TEXT TOOLS
# ==============================================================================
def clean_text(text: str) -> str:
"""Removes excess whitespace and control characters."""
if not text:
return ""
text = str(text).strip()
# Normalize unicode characters
text = unicodedata.normalize('NFKC', text)
# Remove control characters
text = "".join(ch for ch in text if unicodedata.category(ch)[0] != "C")
text = re.sub(r'\s+', ' ', text)
return text
def normalize_string(s: str) -> str:
"""Basic normalization (lowercase, stripped)."""
return s.lower().strip() if s else ""
def simple_normalize_url(url: str) -> str:
"""Normalizes a URL to its core domain (e.g. 'https://www.example.com/foo' -> 'example.com')."""
if not url or url.lower() in ["k.a.", "nan", "none"]:
return "k.A."
# Ensure protocol for urlparse
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
# Remove www.
if domain.startswith('www.'):
domain = domain[4:]
return domain.lower()
except Exception:
return "k.A."
def normalize_company_name(name: str) -> str:
"""Normalizes a company name by removing legal forms and special characters."""
if not name:
return ""
name = name.lower()
# Remove common legal forms
legal_forms = [
r'\bgmbh\b', r'\bag\b', r'\bkg\b', r'\bohg\b', r'\bug\b', r'\bltd\b',
r'\bllc\b', r'\binc\b', r'\bcorp\b', r'\bco\b', r'\b& co\b', r'\be\.v\.\b'
]
for form in legal_forms:
name = re.sub(form, '', name)
# Remove special chars and extra spaces
name = re.sub(r'[^\w\s]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def extract_numeric_value(raw_value: str, is_umsatz: bool = False) -> str:
"""
Extracts a numeric value from a string, handling 'Mio', 'Mrd', etc.
Returns string representation of the number or 'k.A.'.
"""
if not raw_value:
return "k.A."
raw_value = str(raw_value).strip().lower()
if raw_value in ["k.a.", "nan", "none"]:
return "k.A."
# Simple multiplier handling
multiplier = 1.0
if 'mrd' in raw_value or 'billion' in raw_value:
multiplier = 1000.0 if is_umsatz else 1000000000.0
elif 'mio' in raw_value or 'million' in raw_value:
multiplier = 1.0 if is_umsatz else 1000000.0
elif 'tsd' in raw_value or 'thousand' in raw_value:
multiplier = 0.001 if is_umsatz else 1000.0
# Extract number
# Matches 123,45 or 123.45
matches = re.findall(r'(\d+[.,]?\d*)', raw_value)
if not matches:
return "k.A."
try:
# Take the first number found
num_str = matches[0].replace(',', '.')
# Fix for thousands separator if like 1.000.000 -> 1000000
if num_str.count('.') > 1:
num_str = num_str.replace('.', '')
val = float(num_str) * multiplier
# Round appropriately
if is_umsatz:
# Return in millions, e.g. "250.5"
return f"{val:.2f}".rstrip('0').rstrip('.')
else:
# Return integer for employees
return str(int(val))
except ValueError:
return "k.A."
def fuzzy_similarity(str1: str, str2: str) -> float:
"""Returns fuzzy similarity between two strings (0.0 to 1.0)."""
if not str1 or not str2:
return 0.0
return fuzz.ratio(str1, str2) / 100.0
# ==============================================================================
# 3. LLM WRAPPER (GEMINI)
# ==============================================================================
@retry_on_failure(max_retries=3)
def call_gemini(
prompt: Union[str, List[str]],
model_name: str = "gemini-2.0-flash",
temperature: float = 0.3,
json_mode: bool = False,
system_instruction: Optional[str] = None
) -> str:
"""
Unified caller for Gemini API. Prefers new `google.genai` library.
"""
api_key = settings.GEMINI_API_KEY
if not api_key:
raise ValueError("GEMINI_API_KEY is missing in configuration.")
# Option A: New Library (google-genai)
if HAS_NEW_GENAI:
try:
client = genai.Client(api_key=api_key)
config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
config["response_mime_type"] = "application/json"
response = client.models.generate_content(
model=model_name,
contents=[prompt] if isinstance(prompt, str) else prompt,
config=config,
)
if not response.text:
raise ValueError("Empty response from Gemini")
return response.text.strip()
except Exception as e:
logger.error(f"Error with google-genai lib: {e}")
if not HAS_OLD_GENAI:
raise e
# Fallthrough to Option B
# Option B: Old Library (google-generativeai)
if HAS_OLD_GENAI:
try:
old_genai.configure(api_key=api_key)
generation_config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
generation_config["response_mime_type"] = "application/json"
model = old_genai.GenerativeModel(
model_name=model_name,
generation_config=generation_config,
system_instruction=system_instruction
)
response = model.generate_content(prompt)
return response.text.strip()
except Exception as e:
logger.error(f"Error with google-generativeai lib: {e}")
raise e
raise ImportError("No Google GenAI library installed (neither google-genai nor google-generativeai).")