- fixed Year-Prefix Bug in MetricParser - added metric_confidence and metric_proof_text to database - added Entity-Check and Annual-Priority to LLM prompt - improved UI: added confidence traffic light and mouse-over proof tooltip - restored missing API endpoints (create, bulk, wiki-override)
301 lines
11 KiB
Python
301 lines
11 KiB
Python
import time
|
|
import logging
|
|
import random
|
|
import os
|
|
import re
|
|
import unicodedata
|
|
from urllib.parse import urlparse
|
|
from functools import wraps
|
|
from typing import Optional, Union, List, Dict, Any
|
|
from thefuzz import fuzz
|
|
import requests # Added for SerpAPI
|
|
|
|
# Try new Google GenAI Lib (v1.0+)
|
|
try:
|
|
from google import genai
|
|
from google.genai import types
|
|
HAS_NEW_GENAI = True
|
|
except ImportError:
|
|
HAS_NEW_GENAI = False
|
|
|
|
# Fallback to old Lib
|
|
try:
|
|
import google.generativeai as old_genai
|
|
HAS_OLD_GENAI = True
|
|
except ImportError:
|
|
HAS_OLD_GENAI = False
|
|
|
|
from ..config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ==============================================================================
|
|
# 1. DECORATORS
|
|
# ==============================================================================
|
|
|
|
def retry_on_failure(max_retries: int = 3, delay: float = 2.0):
|
|
"""
|
|
Decorator for retrying functions with exponential backoff.
|
|
"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
last_exception = None
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except Exception as e:
|
|
last_exception = e
|
|
if isinstance(e, ValueError) and "API Key" in str(e):
|
|
raise e
|
|
|
|
wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
|
|
logger.warning(f"Retry {attempt + 1}/{max_retries} for '{func.__name__}' after error: {e}. Waiting {wait_time:.1f}s")
|
|
time.sleep(wait_time)
|
|
|
|
logger.error(f"Function '{func.__name__}' failed after {max_retries} attempts.")
|
|
raise last_exception
|
|
return wrapper
|
|
return decorator
|
|
|
|
# ==============================================================================
|
|
# 2. TEXT TOOLS
|
|
# ==============================================================================
|
|
|
|
def clean_text(text: str) -> str:
|
|
"""Removes excess whitespace and control characters."""
|
|
if not text:
|
|
return ""
|
|
text = str(text).strip()
|
|
text = unicodedata.normalize('NFKC', text)
|
|
text = "".join(ch for ch in text if unicodedata.category(ch)[0] != "C")
|
|
text = re.sub(r'\s+', ' ', text)
|
|
return text
|
|
|
|
def normalize_string(s: str) -> str:
|
|
"""Basic normalization (lowercase, stripped)."""
|
|
return s.lower().strip() if s else ""
|
|
|
|
def simple_normalize_url(url: str) -> str:
|
|
"""Normalizes a URL to its core domain (e.g. 'https://www.example.com/foo' -> 'example.com')."""
|
|
if not url or url.lower() in ["k.a.", "nan", "none"]:
|
|
return "k.A."
|
|
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = 'http://' + url
|
|
|
|
try:
|
|
parsed = urlparse(url)
|
|
domain = parsed.netloc or parsed.path
|
|
if domain.startswith('www.'):
|
|
domain = domain[4:]
|
|
return domain.lower()
|
|
except Exception:
|
|
return "k.A."
|
|
|
|
def normalize_company_name(name: str) -> str:
|
|
"""
|
|
Normalizes a company name by removing common legal forms, special characters,
|
|
and extra spaces, for robust comparison.
|
|
Handles names with numbers more intelligently (e.g., "11 88 0 Solutions" -> "11880 solutions").
|
|
"""
|
|
if not name:
|
|
return ""
|
|
|
|
name = name.lower()
|
|
legal_forms = [
|
|
r'\bgmbh\b', r'\bag\b', r'\bkg\b', r'\bohg\b', r'\bug\b', r'\bltd\b',
|
|
r'\bllc\b', r'\binc\b', r'\bcorp\b', r'\bco\b', r'\b& co\b', r'\be\.v\.\b',
|
|
r'\bsa\b', r'\bse\b', r'\bs\.a\.\b', r'\bgesellschaft\b', r'\bgp\b', r'\blp\b',
|
|
r'\bservice\b', r'\bservices\b', r'\bgroup\b', r'\bsolutions\b', r'\bsysteme\b',
|
|
r'\bhandel\b', r'\bmarketing\b', r'\btechnology\b', r'\binternational\b',
|
|
r'\bgmbh & co\. kg\b', r'\bholding\b', r'\bverwaltung\b', r'\bfoundation\b'
|
|
]
|
|
for form in legal_forms:
|
|
name = re.sub(form, '', name)
|
|
|
|
name = re.sub(r'(\d)\s+(\d)', r'\1\2', name)
|
|
name = re.sub(r'[^\w\s\d]', '', name)
|
|
name = re.sub(r'\s+', ' ', name).strip()
|
|
|
|
return name
|
|
|
|
def extract_numeric_value(raw_value: str, is_umsatz: bool = False) -> str:
|
|
"""
|
|
Extracts a numeric value from a string, handling 'Mio', 'Mrd', etc.
|
|
Returns string representation of the number or 'k.A.'.
|
|
Handles German number formatting (1.000 = 1000, 1,5 = 1.5).
|
|
"""
|
|
from .metric_parser import MetricParser
|
|
|
|
val = MetricParser.extract_numeric_value(raw_value, is_revenue=is_umsatz)
|
|
if val is None:
|
|
return "k.A."
|
|
|
|
if is_umsatz:
|
|
return f"{val:.2f}".rstrip('0').rstrip('.')
|
|
else:
|
|
return str(int(val))
|
|
|
|
def fuzzy_similarity(str1: str, str2: str) -> float:
|
|
"""Returns fuzzy similarity between two strings (0.0 to 1.0)."""
|
|
if not str1 or not str2:
|
|
return 0.0
|
|
return fuzz.ratio(str1, str2) / 100.0
|
|
|
|
def clean_json_response(response_text: str) -> str:
|
|
"""
|
|
Cleans LLM response to ensure valid JSON.
|
|
Removes Markdown code blocks (```json ... ```).
|
|
"""
|
|
if not response_text: return "{}"
|
|
|
|
cleaned = re.sub(r'^```json\s*', '', response_text, flags=re.MULTILINE)
|
|
cleaned = re.sub(r'^```\s*', '', cleaned, flags=re.MULTILINE)
|
|
cleaned = re.sub(r'\s*```$', '', cleaned, flags=re.MULTILINE)
|
|
|
|
return cleaned.strip()
|
|
|
|
# ==============================================================================
|
|
# 3. LLM WRAPPER (GEMINI)
|
|
# ==============================================================================
|
|
|
|
@retry_on_failure(max_retries=3)
|
|
def call_gemini_flash(
|
|
prompt: Union[str, List[str]],
|
|
model_name: str = "gemini-2.0-flash",
|
|
temperature: float = 0.3,
|
|
json_mode: bool = False,
|
|
system_instruction: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Unified caller for Gemini API. Prefers new `google.genai` library.
|
|
"""
|
|
api_key = settings.GEMINI_API_KEY
|
|
if not api_key:
|
|
raise ValueError("GEMINI_API_KEY is missing in configuration.")
|
|
|
|
# Option A: New Library (google-genai)
|
|
if HAS_NEW_GENAI:
|
|
try:
|
|
client = genai.Client(api_key=api_key)
|
|
config = {
|
|
"temperature": temperature,
|
|
"top_p": 0.95,
|
|
"top_k": 40,
|
|
"max_output_tokens": 8192,
|
|
}
|
|
if json_mode:
|
|
config["response_mime_type"] = "application/json"
|
|
|
|
response = client.models.generate_content(
|
|
model=model_name,
|
|
contents=[prompt] if isinstance(prompt, str) else prompt,
|
|
config=config,
|
|
)
|
|
if not response.text:
|
|
raise ValueError("Empty response from Gemini")
|
|
return response.text.strip()
|
|
except Exception as e:
|
|
logger.error(f"Error with google-genai lib: {e}")
|
|
if not HAS_OLD_GENAI:
|
|
raise e
|
|
# Fallthrough to Option B
|
|
|
|
# Option B: Old Library (google-generativeai)
|
|
if HAS_OLD_GENAI:
|
|
try:
|
|
old_genai.configure(api_key=api_key)
|
|
generation_config = {
|
|
"temperature": temperature,
|
|
"top_p": 0.95,
|
|
"top_k": 40,
|
|
"max_output_tokens": 8192,
|
|
}
|
|
if json_mode:
|
|
generation_config["response_mime_type"] = "application/json"
|
|
|
|
model = old_genai.GenerativeModel(
|
|
model_name=model_name,
|
|
generation_config=generation_config,
|
|
system_instruction=system_instruction
|
|
)
|
|
response = model.generate_content(prompt)
|
|
return response.text.strip()
|
|
except Exception as e:
|
|
logger.error(f"Error with google-generativeai lib: {e}")
|
|
raise e
|
|
|
|
raise ImportError("No Google GenAI library installed (neither google-genai nor google-generativeai).")
|
|
|
|
# ==============================================================================
|
|
# 4. MATH UTILS
|
|
# ==============================================================================
|
|
|
|
def safe_eval_math(expression: str) -> Optional[float]:
|
|
"""
|
|
Safely evaluates simple mathematical expressions.
|
|
Only allows numbers, basic operators (+, -, *, /), and parentheses.
|
|
Prevents arbitrary code execution.
|
|
"""
|
|
if not isinstance(expression, str) or not expression:
|
|
return None
|
|
|
|
# Allowed characters: digits, ., +, -, *, /, (, )
|
|
# Also allow 'wert' (for replacement) and spaces
|
|
allowed_pattern = re.compile(r"^[0-9.+\-*/()\s]+$")
|
|
|
|
# Temporarily replace 'wert' for initial character check if still present
|
|
temp_expression = expression.lower().replace("wert", "1") # Replace wert with a dummy digit
|
|
|
|
if not allowed_pattern.fullmatch(temp_expression):
|
|
logger.error(f"Math expression contains disallowed characters: {expression}")
|
|
return None
|
|
|
|
try:
|
|
# Compile the expression for safety and performance. Use a restricted global/local dict.
|
|
code = compile(expression, '<string>', 'eval')
|
|
# Restrict globals and locals to prevent arbitrary code execution
|
|
return float(eval(code, {"__builtins__": {}}, {}))
|
|
except Exception as e:
|
|
logger.error(f"Error evaluating math expression '{expression}': {e}", exc_info=True)
|
|
return None
|
|
|
|
# ==============================================================================
|
|
# 5. SEARCH UTILS
|
|
# ==============================================================================
|
|
|
|
@retry_on_failure(max_retries=2, delay=5.0)
|
|
def run_serp_search(query: str, num_results: int = 5) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Performs a Google search using SerpAPI and returns parsed results.
|
|
Requires SERP_API_KEY in settings.
|
|
"""
|
|
api_key = settings.SERP_API_KEY
|
|
if not api_key:
|
|
logger.error("SERP_API_KEY is missing in configuration. Cannot run SerpAPI search.")
|
|
return None
|
|
|
|
url = "https://serpapi.com/search.json"
|
|
params = {
|
|
"api_key": api_key,
|
|
"engine": "google",
|
|
"q": query,
|
|
"num": num_results, # Number of organic results
|
|
"gl": "de", # Geo-targeting to Germany
|
|
"hl": "de" # Interface language to German
|
|
}
|
|
|
|
try:
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status() # Raise an exception for HTTP errors
|
|
results = response.json()
|
|
logger.info("SerpAPI search for '%s' successful. Found %s organic results.", query, len(results.get("organic_results", [])))
|
|
return results
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"SerpAPI request failed for query '{query}': {e}", exc_info=True)
|
|
return None
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse SerpAPI JSON response for query '{query}': {e}", exc_info=True)
|
|
return None
|