Files
Brancheneinstufung2/company-explorer/backend/lib/core_utils.py

340 lines
12 KiB
Python

import time
import logging
import random
import os
import re
import unicodedata
from urllib.parse import urlparse
from functools import wraps
from typing import Optional, Union, List, Dict, Any
from thefuzz import fuzz
import requests # Added for SerpAPI
# Try new Google GenAI Lib (v1.0+)
try:
from google import genai
from google.genai import types
HAS_NEW_GENAI = True
except ImportError:
HAS_NEW_GENAI = False
# Fallback to old Lib
try:
import google.generativeai as old_genai
HAS_OLD_GENAI = True
except ImportError:
HAS_OLD_GENAI = False
from ..config import settings
logger = logging.getLogger(__name__)
# ==============================================================================
# 1. DECORATORS
# ==============================================================================
def retry_on_failure(max_retries: int = 3, delay: float = 2.0):
"""
Decorator for retrying functions with exponential backoff.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if isinstance(e, ValueError) and "API Key" in str(e):
raise e
wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Retry {attempt + 1}/{max_retries} for '{func.__name__}' after error: {e}. Waiting {wait_time:.1f}s")
time.sleep(wait_time)
logger.error(f"Function '{func.__name__}' failed after {max_retries} attempts.")
raise last_exception
return wrapper
return decorator
# ==============================================================================
# 2. TEXT TOOLS
# ==============================================================================
def clean_text(text: str) -> str:
"""Removes excess whitespace and control characters."""
if not text:
return ""
text = str(text).strip()
text = unicodedata.normalize('NFKC', text)
text = "".join(ch for ch in text if unicodedata.category(ch)[0] != "C")
text = re.sub(r'\s+', ' ', text)
return text
def normalize_string(s: str) -> str:
"""Basic normalization (lowercase, stripped)."""
return s.lower().strip() if s else ""
def simple_normalize_url(url: str) -> str:
"""Normalizes a URL to its core domain (e.g. 'https://www.example.com/foo' -> 'example.com')."""
if not url or url.lower() in ["k.a.", "nan", "none"]:
return "k.A."
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
if domain.startswith('www.'):
domain = domain[4:]
return domain.lower()
except Exception:
return "k.A."
def normalize_company_name(name: str) -> str:
"""
Normalizes a company name by removing common legal forms, special characters,
and extra spaces, for robust comparison.
Handles names with numbers more intelligently (e.g., "11 88 0 Solutions" -> "11880 solutions").
"""
if not name:
return ""
name = name.lower()
legal_forms = [
r'\bgmbh\b', r'\bag\b', r'\bkg\b', r'\bohg\b', r'\bug\b', r'\bltd\b',
r'\bllc\b', r'\binc\b', r'\bcorp\b', r'\bco\b', r'\b& co\b', r'\be\.v\.\b',
r'\bsa\b', r'\bse\b', r'\bs\.a\.\b', r'\bgesellschaft\b', r'\bgp\b', r'\blp\b',
r'\bservice\b', r'\bservices\b', r'\bgroup\b', r'\bsolutions\b', r'\bsysteme\b',
r'\bhandel\b', r'\bmarketing\b', r'\btechnology\b', r'\binternational\b',
r'\bgmbh & co\. kg\b', r'\bholding\b', r'\bverwaltung\b', r'\bfoundation\b'
]
for form in legal_forms:
name = re.sub(form, '', name)
name = re.sub(r'(\d)\s+(\d)', r'\1\2', name)
name = re.sub(r'[^\w\s\d]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def extract_numeric_value(raw_value: str, is_umsatz: bool = False) -> str:
"""
Extracts a numeric value from a string, handling 'Mio', 'Mrd', etc.
Returns string representation of the number or 'k.A.'.
Handles German number formatting (1.000 = 1000, 1,5 = 1.5).
"""
if not raw_value:
return "k.A."
raw_value = str(raw_value).strip().lower()
if raw_value in ["k.a.", "nan", "none"]:
return "k.A."
multiplier = 1.0
if 'mrd' in raw_value or 'billion' in raw_value or 'bn' in raw_value:
multiplier = 1000.0
if not is_umsatz: multiplier = 1000000000.0
elif 'mio' in raw_value or 'million' in raw_value or 'mn' in raw_value:
multiplier = 1.0
if not is_umsatz: multiplier = 1000000.0
elif 'tsd' in raw_value or 'thousand' in raw_value:
multiplier = 0.001
if not is_umsatz: multiplier = 1000.0
matches = re.findall(r'(\d+[\.,]?\d*[\.,]?\d*)', raw_value)
if not matches:
return "k.A."
try:
num_str = matches[0]
if '.' in num_str and ',' in num_str:
if num_str.rfind(',') > num_str.rfind('.'):
num_str = num_str.replace('.', '').replace(',', '.')
else:
num_str = num_str.replace(',', '')
elif '.' in num_str:
parts = num_str.split('.')
if len(parts) > 1 and len(parts[-1]) == 3 and not is_umsatz:
num_str = num_str.replace('.', '')
elif is_umsatz and len(parts) > 1 and len(parts[-1]) == 3:
if num_str.count('.') > 1:
num_str = num_str.replace('.', '')
elif ',' in num_str:
num_str = num_str.replace(',', '.')
val = float(num_str) * multiplier
if is_umsatz:
return f"{val:.2f}".rstrip('0').rstrip('.')
else:
return str(int(val))
except ValueError:
return "k.A."
def fuzzy_similarity(str1: str, str2: str) -> float:
"""Returns fuzzy similarity between two strings (0.0 to 1.0)."""
if not str1 or not str2:
return 0.0
return fuzz.ratio(str1, str2) / 100.0
def clean_json_response(response_text: str) -> str:
"""
Cleans LLM response to ensure valid JSON.
Removes Markdown code blocks (```json ... ```).
"""
if not response_text: return "{}"
cleaned = re.sub(r'^```json\s*', '', response_text, flags=re.MULTILINE)
cleaned = re.sub(r'^```\s*', '', cleaned, flags=re.MULTILINE)
cleaned = re.sub(r'\s*```$', '', cleaned, flags=re.MULTILINE)
return cleaned.strip()
# ==============================================================================
# 3. LLM WRAPPER (GEMINI)
# ==============================================================================
@retry_on_failure(max_retries=3)
def call_gemini_flash(
prompt: Union[str, List[str]],
model_name: str = "gemini-2.0-flash",
temperature: float = 0.3,
json_mode: bool = False,
system_instruction: Optional[str] = None
) -> str:
"""
Unified caller for Gemini API. Prefers new `google.genai` library.
"""
api_key = settings.GEMINI_API_KEY
if not api_key:
raise ValueError("GEMINI_API_KEY is missing in configuration.")
# Option A: New Library (google-genai)
if HAS_NEW_GENAI:
try:
client = genai.Client(api_key=api_key)
config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
config["response_mime_type"] = "application/json"
response = client.models.generate_content(
model=model_name,
contents=[prompt] if isinstance(prompt, str) else prompt,
config=config,
)
if not response.text:
raise ValueError("Empty response from Gemini")
return response.text.strip()
except Exception as e:
logger.error(f"Error with google-genai lib: {e}")
if not HAS_OLD_GENAI:
raise e
# Fallthrough to Option B
# Option B: Old Library (google-generativeai)
if HAS_OLD_GENAI:
try:
old_genai.configure(api_key=api_key)
generation_config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
generation_config["response_mime_type"] = "application/json"
model = old_genai.GenerativeModel(
model_name=model_name,
generation_config=generation_config,
system_instruction=system_instruction
)
response = model.generate_content(prompt)
return response.text.strip()
except Exception as e:
logger.error(f"Error with google-generativeai lib: {e}")
raise e
raise ImportError("No Google GenAI library installed (neither google-genai nor google-generativeai).")
# ==============================================================================
# 4. MATH UTILS
# ==============================================================================
def safe_eval_math(expression: str) -> Optional[float]:
"""
Safely evaluates simple mathematical expressions.
Only allows numbers, basic operators (+, -, *, /), and parentheses.
Prevents arbitrary code execution.
"""
if not isinstance(expression, str) or not expression:
return None
# Allowed characters: digits, ., +, -, *, /, (, )
# Also allow 'wert' (for replacement) and spaces
allowed_pattern = re.compile(r"^[0-9.+\-*/()\s]+$")
# Temporarily replace 'wert' for initial character check if still present
temp_expression = expression.lower().replace("wert", "1") # Replace wert with a dummy digit
if not allowed_pattern.fullmatch(temp_expression):
logger.error(f"Math expression contains disallowed characters: {expression}")
return None
try:
# Compile the expression for safety and performance. Use a restricted global/local dict.
code = compile(expression, '<string>', 'eval')
# Restrict globals and locals to prevent arbitrary code execution
return float(eval(code, {"__builtins__": {}}, {}))
except Exception as e:
logger.error(f"Error evaluating math expression '{expression}': {e}", exc_info=True)
return None
# ==============================================================================
# 5. SEARCH UTILS
# ==============================================================================
@retry_on_failure(max_retries=2, delay=5.0)
def run_serp_search(query: str, num_results: int = 5) -> Optional[Dict[str, Any]]:
"""
Performs a Google search using SerpAPI and returns parsed results.
Requires SERP_API_KEY in settings.
"""
api_key = settings.SERP_API_KEY
if not api_key:
logger.error("SERP_API_KEY is missing in configuration. Cannot run SerpAPI search.")
return None
url = "https://serpapi.com/search.json"
params = {
"api_key": api_key,
"engine": "google",
"q": query,
"num": num_results, # Number of organic results
"gl": "de", # Geo-targeting to Germany
"hl": "de" # Interface language to German
}
try:
response = requests.get(url, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
results = response.json()
logger.info("SerpAPI search for '%s' successful. Found %s organic results.", query, len(results.get("organic_results", [])))
return results
except requests.exceptions.RequestException as e:
logger.error(f"SerpAPI request failed for query '{query}': {e}", exc_info=True)
return None
except json.JSONDecodeError as e:
logger.error(f"Failed to parse SerpAPI JSON response for query '{query}': {e}", exc_info=True)
return None