Files
Brancheneinstufung2/company-explorer/backend/lib/core_utils.py
Floke 88c9d487be feat(company-explorer): add impressum scraping, robust json parsing, and enhanced ui polling
- Implemented Impressum scraping with Root-URL fallback and enhanced keyword detection.
- Added 'clean_json_response' helper to strip Markdown from LLM outputs, preventing JSONDecodeErrors.
- Improved numeric extraction for German formatting (thousands separators vs decimals).
- Updated Inspector UI with Polling logic for auto-refresh and display of AI Dossier and Legal Data.
- Added Manual Override for Website URL.
2026-01-08 16:14:01 +01:00

288 lines
10 KiB
Python

import time
import logging
import random
import os
import re
import unicodedata
from urllib.parse import urlparse
from functools import wraps
from typing import Optional, Union, List
from thefuzz import fuzz
# Versuche neue Google GenAI Lib (v1.0+)
try:
from google import genai
from google.genai import types
HAS_NEW_GENAI = True
except ImportError:
HAS_NEW_GENAI = False
# Fallback auf alte Lib
try:
import google.generativeai as old_genai
HAS_OLD_GENAI = True
except ImportError:
HAS_OLD_GENAI = False
from ..config import settings
logger = logging.getLogger(__name__)
# ==============================================================================
# 1. DECORATORS
# ==============================================================================
def retry_on_failure(max_retries: int = 3, delay: float = 2.0):
"""
Decorator for retrying functions with exponential backoff.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
# Don't retry on certain fatal errors (can be extended)
if isinstance(e, ValueError) and "API Key" in str(e):
raise e
wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Retry {attempt + 1}/{max_retries} for '{func.__name__}' after error: {e}. Waiting {wait_time:.1f}s")
time.sleep(wait_time)
logger.error(f"Function '{func.__name__}' failed after {max_retries} attempts.")
raise last_exception
return wrapper
return decorator
# ==============================================================================
# 2. TEXT TOOLS
# ==============================================================================
def clean_text(text: str) -> str:
"""Removes excess whitespace and control characters."""
if not text:
return ""
text = str(text).strip()
# Normalize unicode characters
text = unicodedata.normalize('NFKC', text)
# Remove control characters
text = "".join(ch for ch in text if unicodedata.category(ch)[0] != "C")
text = re.sub(r'\s+', ' ', text)
return text
def normalize_string(s: str) -> str:
"""Basic normalization (lowercase, stripped)."""
return s.lower().strip() if s else ""
def simple_normalize_url(url: str) -> str:
"""Normalizes a URL to its core domain (e.g. 'https://www.example.com/foo' -> 'example.com')."""
if not url or url.lower() in ["k.a.", "nan", "none"]:
return "k.A."
# Ensure protocol for urlparse
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
# Remove www.
if domain.startswith('www.'):
domain = domain[4:]
return domain.lower()
except Exception:
return "k.A."
def normalize_company_name(name: str) -> str:
"""Normalizes a company name by removing legal forms and special characters."""
if not name:
return ""
name = name.lower()
# Remove common legal forms
legal_forms = [
r'\bgmbh\b', r'\bag\b', r'\bkg\b', r'\bohg\b', r'\bug\b', r'\bltd\b',
r'\bllc\b', r'\binc\b', r'\bcorp\b', r'\bco\b', r'\b& co\b', r'\be\.v\.\b'
]
for form in legal_forms:
name = re.sub(form, '', name)
# Remove special chars and extra spaces
name = re.sub(r'[^\w\s]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def extract_numeric_value(raw_value: str, is_umsatz: bool = False) -> str:
"""
Extracts a numeric value from a string, handling 'Mio', 'Mrd', etc.
Returns string representation of the number or 'k.A.'.
Handles German number formatting (1.000 = 1000, 1,5 = 1.5).
"""
if not raw_value:
return "k.A."
raw_value = str(raw_value).strip().lower()
if raw_value in ["k.a.", "nan", "none"]:
return "k.A."
# Simple multiplier handling
multiplier = 1.0
if 'mrd' in raw_value or 'billion' in raw_value or 'bn' in raw_value:
multiplier = 1000.0 if is_umsatz else 1000000000.0
elif 'mio' in raw_value or 'million' in raw_value or 'mn' in raw_value:
multiplier = 1.0 if is_umsatz else 1000000.0
elif 'tsd' in raw_value or 'thousand' in raw_value:
multiplier = 0.001 if is_umsatz else 1000.0
# Extract number candidates
# Regex for "1.000,50" or "1,000.50" or "1000"
matches = re.findall(r'(\d+[\.,]?\d*[\.,]?\d*)', raw_value)
if not matches:
return "k.A."
try:
num_str = matches[0]
# Heuristic for German formatting (1.000,00) vs English (1,000.00)
# If it contains both, the last separator is likely the decimal
if '.' in num_str and ',' in num_str:
if num_str.rfind(',') > num_str.rfind('.'):
# German: 1.000,00 -> remove dots, replace comma with dot
num_str = num_str.replace('.', '').replace(',', '.')
else:
# English: 1,000.00 -> remove commas
num_str = num_str.replace(',', '')
elif '.' in num_str:
# Ambiguous: 1.005 could be 1005 or 1.005
# Assumption: If it's employees (integer), and looks like "1.xxx", it's likely thousands
parts = num_str.split('.')
if len(parts) > 1 and len(parts[-1]) == 3 and not is_umsatz:
# Likely thousands separator for employees (e.g. 1.005)
num_str = num_str.replace('.', '')
elif is_umsatz and len(parts) > 1 and len(parts[-1]) == 3:
# For revenue, 375.6 vs 1.000 is tricky.
# But usually revenue in millions is small numbers with decimals (250.5).
# Large integers usually mean thousands.
# Let's assume dot is decimal for revenue unless context implies otherwise,
# but for "375.6" it works. For "1.000" it becomes 1.0.
# Let's keep dot as decimal for revenue by default unless we detect multiple dots
if num_str.count('.') > 1:
num_str = num_str.replace('.', '')
elif ',' in num_str:
# German decimal: 1,5 -> 1.5
num_str = num_str.replace(',', '.')
val = float(num_str) * multiplier
# Round appropriately
if is_umsatz:
# Return in millions, e.g. "250.5"
return f"{val:.2f}".rstrip('0').rstrip('.')
else:
# Return integer for employees
return str(int(val))
except ValueError:
return "k.A."
def fuzzy_similarity(str1: str, str2: str) -> float:
"""Returns fuzzy similarity between two strings (0.0 to 1.0)."""
if not str1 or not str2:
return 0.0
return fuzz.ratio(str1, str2) / 100.0
def clean_json_response(response_text: str) -> str:
"""
Cleans LLM response to ensure valid JSON.
Removes Markdown code blocks (```json ... ```).
"""
if not response_text: return "{}"
# Remove markdown code blocks
cleaned = re.sub(r'^```json\s*', '', response_text, flags=re.MULTILINE)
cleaned = re.sub(r'^```\s*', '', cleaned, flags=re.MULTILINE)
cleaned = re.sub(r'\s*```$', '', cleaned, flags=re.MULTILINE)
return cleaned.strip()
# ==============================================================================
# 3. LLM WRAPPER (GEMINI)
# ==============================================================================
@retry_on_failure(max_retries=3)
def call_gemini(
prompt: Union[str, List[str]],
model_name: str = "gemini-2.0-flash",
temperature: float = 0.3,
json_mode: bool = False,
system_instruction: Optional[str] = None
) -> str:
"""
Unified caller for Gemini API. Prefers new `google.genai` library.
"""
api_key = settings.GEMINI_API_KEY
if not api_key:
raise ValueError("GEMINI_API_KEY is missing in configuration.")
# Option A: New Library (google-genai)
if HAS_NEW_GENAI:
try:
client = genai.Client(api_key=api_key)
config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
config["response_mime_type"] = "application/json"
response = client.models.generate_content(
model=model_name,
contents=[prompt] if isinstance(prompt, str) else prompt,
config=config,
)
if not response.text:
raise ValueError("Empty response from Gemini")
return response.text.strip()
except Exception as e:
logger.error(f"Error with google-genai lib: {e}")
if not HAS_OLD_GENAI:
raise e
# Fallthrough to Option B
# Option B: Old Library (google-generativeai)
if HAS_OLD_GENAI:
try:
old_genai.configure(api_key=api_key)
generation_config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
generation_config["response_mime_type"] = "application/json"
model = old_genai.GenerativeModel(
model_name=model_name,
generation_config=generation_config,
system_instruction=system_instruction
)
response = model.generate_content(prompt)
return response.text.strip()
except Exception as e:
logger.error(f"Error with google-generativeai lib: {e}")
raise e
raise ImportError("No Google GenAI library installed (neither google-genai nor google-generativeai).")