Files
Brancheneinstufung2/company-explorer/backend/lib/core_utils.py
Floke 4a336f6374 fix(ce): Resolve database schema mismatch and restore docs
- Fixed a critical  in the company-explorer by forcing a database re-initialization with a new file (). This ensures the application code is in sync with the database schema.
- Documented the schema mismatch incident and its resolution in MIGRATION_PLAN.md.

- Restored and enhanced BUILDER_APPS_MIGRATION.md by recovering extensive, valuable content from the git history that was accidentally deleted. The guide now again includes detailed troubleshooting steps and code templates for common migration pitfalls.
2026-01-15 15:54:45 +00:00

299 lines
11 KiB
Python

import time
import logging
import random
import os
import re
import unicodedata
from urllib.parse import urlparse
from functools import wraps
from typing import Optional, Union, List
from thefuzz import fuzz
# Try new Google GenAI Lib (v1.0+)
try:
from google import genai
from google.genai import types
HAS_NEW_GENAI = True
except ImportError:
HAS_NEW_GENAI = False
# Fallback to old Lib
try:
import google.generativeai as old_genai
HAS_OLD_GENAI = True
except ImportError:
HAS_OLD_GENAI = False
from ..config import settings
logger = logging.getLogger(__name__)
# ==============================================================================
# 1. DECORATORS
# ==============================================================================
def retry_on_failure(max_retries: int = 3, delay: float = 2.0):
"""
Decorator for retrying functions with exponential backoff.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
# Don't retry on certain fatal errors (can be extended)
if isinstance(e, ValueError) and "API Key" in str(e):
raise e
wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Retry {attempt + 1}/{max_retries} for '{func.__name__}' after error: {e}. Waiting {wait_time:.1f}s")
time.sleep(wait_time)
logger.error(f"Function '{func.__name__}' failed after {max_retries} attempts.")
raise last_exception
return wrapper
return decorator
# ==============================================================================
# 2. TEXT TOOLS
# ==============================================================================
def clean_text(text: str) -> str:
"""Removes excess whitespace and control characters."""
if not text:
return ""
text = str(text).strip()
# Normalize unicode characters
text = unicodedata.normalize('NFKC', text)
# Remove control characters
text = "".join(ch for ch in text if unicodedata.category(ch)[0] != "C")
text = re.sub(r'\s+', ' ', text)
return text
def normalize_string(s: str) -> str:
"""Basic normalization (lowercase, stripped)."""
return s.lower().strip() if s else ""
def simple_normalize_url(url: str) -> str:
"""Normalizes a URL to its core domain (e.g. 'https://www.example.com/foo' -> 'example.com')."""
if not url or url.lower() in ["k.a.", "nan", "none"]:
return "k.A."
# Ensure protocol for urlparse
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
try:
parsed = urlparse(url)
domain = parsed.netloc or parsed.path
# Remove www.
if domain.startswith('www.'):
domain = domain[4:]
return domain.lower()
except Exception:
return "k.A."
def normalize_company_name(name: str) -> str:
"""
Normalizes a company name by removing common legal forms, special characters,
and extra spaces, for robust comparison.
Handles names with numbers more intelligently (e.g., "11 88 0 Solutions" -> "11880 solutions").
"""
if not name:
return ""
name = name.lower()
# Remove common legal forms (more comprehensive list)
legal_forms = [
r'\bgmbh\b', r'\bag\b', r'\bkg\b', r'\bohg\b', r'\bug\b', r'\bltd\b',
r'\bllc\b', r'\binc\b', r'\bcorp\b', r'\bco\b', r'\b& co\b', r'\be\.v\.\b',
r'\bsa\b', r'\bse\b', r'\bs\.a\.\b', r'\bgesellschaft\b', r'\bgp\b', r'\blp\b',
r'\bservice\b', r'\bservices\b', r'\bgroup\b', r'\bsolutions\b', r'\bsysteme\b',
r'\bhandel\b', r'\bmarketing\b', r'\btechnology\b', r'\binternational\b',
r'\bgmbh & co\. kg\b', r'\bholding\b', r'\bverwaltung\b', r'\bfoundation\b'
]
for form in legal_forms:
name = re.sub(form, '', name)
# Condense numbers: "11 88 0" -> "11880"
name = re.sub(r'(\d)\s+(\d)', r'\1\2', name) # Condense numbers separated by space
# Remove special chars and extra spaces
name = re.sub(r'[^\w\s\d]', '', name) # Keep digits
name = re.sub(r'\s+', ' ', name).strip()
return name
def extract_numeric_value(raw_value: str, is_umsatz: bool = False) -> str:
"""
Extracts a numeric value from a string, handling 'Mio', 'Mrd', etc.
Returns string representation of the number or 'k.A.'.
Handles German number formatting (1.000 = 1000, 1,5 = 1.5).
"""
if not raw_value:
return "k.A."
raw_value = str(raw_value).strip().lower()
if raw_value in ["k.a.", "nan", "none"]:
return "k.A."
# Simple multiplier handling
multiplier = 1.0
if 'mrd' in raw_value or 'billion' in raw_value or 'bn' in raw_value:
multiplier = 1000.0 # Standardize to Millions for revenue, Billions for absolute numbers
if not is_umsatz: multiplier = 1000000000.0
elif 'mio' in raw_value or 'million' in raw_value or 'mn' in raw_value:
multiplier = 1.0 # Already in Millions for revenue
if not is_umsatz: multiplier = 1000000.0
elif 'tsd' in raw_value or 'thousand' in raw_value:
multiplier = 0.001 # Thousands converted to millions for revenue
if not is_umsatz: multiplier = 1000.0
# Extract number candidates
# Regex for "1.000,50" or "1,000.50" or "1000"
matches = re.findall(r'(\d+[\.,]?\d*[\.,]?\d*)', raw_value)
if not matches:
return "k.A."
try:
num_str = matches[0]
# Heuristic for German formatting (1.000,00) vs English (1,000.00)
# If it contains both, the last separator is likely the decimal
if '.' in num_str and ',' in num_str:
if num_str.rfind(',') > num_str.rfind('.'):
# German: 1.000,00 -> remove dots, replace comma with dot
num_str = num_str.replace('.', '').replace(',', '.')
else:
# English: 1,000.00 -> remove commas
num_str = num_str.replace(',', '')
elif '.' in num_str:
# Ambiguous: 1.005 could be 1005 or 1.005
# Assumption: If it's employees (integer), and looks like "1.xxx", it's likely thousands
parts = num_str.split('.')
if len(parts) > 1 and len(parts[-1]) == 3 and not is_umsatz:
# Likely thousands separator for employees (e.g. 1.005)
num_str = num_str.replace('.', '')
elif is_umsatz and len(parts) > 1 and len(parts[-1]) == 3:
# For revenue, 375.6 vs 1.000 is tricky.
# But usually revenue in millions is small numbers with decimals (250.5).
# Large integers usually mean thousands.
# Let's keep dot as decimal for revenue by default unless we detect multiple dots
if num_str.count('.') > 1:
num_str = num_str.replace('.', '')
elif ',' in num_str:
# German decimal: 1,5 -> 1.5
num_str = num_str.replace(',', '.')
val = float(num_str) * multiplier
# Round appropriately
if is_umsatz:
# Return in millions, e.g. "250.5"
return f"{val:.2f}".rstrip('0').rstrip('.')
else:
# Return integer for employees
return str(int(val))
except ValueError:
return "k.A."
def fuzzy_similarity(str1: str, str2: str) -> float:
"""Returns fuzzy similarity between two strings (0.0 to 1.0)."""
if not str1 or not str2:
return 0.0
return fuzz.ratio(str1, str2) / 100.0
def clean_json_response(response_text: str) -> str:
"""
Cleans LLM response to ensure valid JSON.
Removes Markdown code blocks (```json ... ```).
"""
if not response_text: return "{}"
# Remove markdown code blocks
cleaned = re.sub(r'^```json\s*', '', response_text, flags=re.MULTILINE)
cleaned = re.sub(r'^```\s*', '', cleaned, flags=re.MULTILINE)
cleaned = re.sub(r'\s*```$', '', cleaned, flags=re.MULTILINE)
return cleaned.strip()
# ==============================================================================
# 3. LLM WRAPPER (GEMINI)
# ==============================================================================
@retry_on_failure(max_retries=3)
def call_gemini(
prompt: Union[str, List[str]],
model_name: str = "gemini-2.0-flash",
temperature: float = 0.3,
json_mode: bool = False,
system_instruction: Optional[str] = None
) -> str:
"""
Unified caller for Gemini API. Prefers new `google.genai` library.
"""
api_key = settings.GEMINI_API_KEY
if not api_key:
raise ValueError("GEMINI_API_KEY is missing in configuration.")
# Option A: New Library (google-genai)
if HAS_NEW_GENAI:
try:
client = genai.Client(api_key=api_key)
config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
config["response_mime_type"] = "application/json"
response = client.models.generate_content(
model=model_name,
contents=[prompt] if isinstance(prompt, str) else prompt,
config=config,
)
if not response.text:
raise ValueError("Empty response from Gemini")
return response.text.strip()
except Exception as e:
logger.error(f"Error with google-genai lib: {e}")
if not HAS_OLD_GENAI:
raise e
# Fallthrough to Option B
# Option B: Old Library (google-generativeai)
if HAS_OLD_GENAI:
try:
old_genai.configure(api_key=api_key)
generation_config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
generation_config["response_mime_type"] = "application/json"
model = old_genai.GenerativeModel(
model_name=model_name,
generation_config=generation_config,
system_instruction=system_instruction
)
response = model.generate_content(prompt)
return response.text.strip()
except Exception as e:
logger.error(f"Error with google-generativeai lib: {e}")
raise e
raise ImportError("No Google GenAI library installed (neither google-genai nor google-generativeai).")