186 lines
7.2 KiB
Python
186 lines
7.2 KiB
Python
import re
|
||
import logging
|
||
from typing import Optional, Union
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class MetricParser:
|
||
"""
|
||
Robust parser for extracting numeric values from text, specialized for
|
||
German formats and business metrics (Revenue, Employees).
|
||
Reconstructs legacy logic to handle thousands separators and year-suffixes.
|
||
"""
|
||
|
||
@staticmethod
|
||
def extract_numeric_value(text: str, is_revenue: bool = False, expected_value: Optional[str] = None) -> Optional[float]:
|
||
"""
|
||
Extracts a float value from a string.
|
||
If expected_value is provided (from LLM), matches that specific number in the text.
|
||
Otherwise, finds the first robust number.
|
||
"""
|
||
if not text:
|
||
return None
|
||
|
||
# 1. Pre-cleaning
|
||
text_processed = str(text).strip()
|
||
logger.info(f"[MetricParser] Processing text (len: {len(text_processed)}) (Hint: {expected_value})")
|
||
|
||
# Optimize: If we have an expected value (hint), try to find that specific number first
|
||
if expected_value:
|
||
try:
|
||
# Clean the hint to get the target digits (e.g. "352" from "352 Betten")
|
||
# We only take the FIRST sequence of digits as the target
|
||
hint_match = re.search(r'[\d\.,\']+', str(expected_value))
|
||
if hint_match:
|
||
target_str = hint_match.group(0)
|
||
target_digits = re.sub(r'[^0-9]', '', target_str)
|
||
|
||
if target_digits:
|
||
# Find all numbers in the text and check if they match our target
|
||
all_numbers_in_text = re.findall(r'[\d\.,\']+', text_processed)
|
||
for num_str in all_numbers_in_text:
|
||
if target_digits == re.sub(r'[^0-9]', '', num_str):
|
||
# Exact digit match!
|
||
val = MetricParser._parse_robust_number(num_str, is_revenue)
|
||
if val is not None:
|
||
logger.info(f"[MetricParser] Found targeted value via hint: '{num_str}' -> {val}")
|
||
return val
|
||
except Exception as e:
|
||
logger.error(f"Error while parsing with hint: {e}")
|
||
|
||
# Fallback: Classic robust parsing
|
||
# Normalize quotes
|
||
text_processed = text_processed.replace("’", "'").replace("‘", "'")
|
||
|
||
# 2. Remove noise: Citations [1] and Year/Date in parentheses (2020)
|
||
text_processed = re.sub(r'\(.*?\)|\[.*?\]', ' ', text_processed).strip()
|
||
|
||
# 3. Remove common prefixes and currency symbols
|
||
prefixes = [
|
||
r'ca\.?:?\s*', r'circa\s*', r'rund\s*', r'etwa\s*', r'über\s*', r'unter\s*',
|
||
r'mehr als\s*', r'weniger als\s*', r'bis zu\s*', r'about\s*', r'over\s*',
|
||
r'approx\.?:?\s*', r'around\s*', r'up to\s*', r'~\s*', r'rd\.?:?\s*'
|
||
]
|
||
currencies = [
|
||
r'€', r'EUR', r'US\$', r'USD', r'CHF', r'GBP', r'£', r'¥', r'JPY'
|
||
]
|
||
|
||
for p in prefixes:
|
||
text_processed = re.sub(f'(?i)^{p}', '', text_processed).strip()
|
||
for c in currencies:
|
||
text_processed = re.sub(f'(?i){c}', '', text_processed).strip()
|
||
|
||
# 4. Extract Multipliers (Mio, Mrd)
|
||
multiplier = 1.0
|
||
lower_text = text_processed.lower()
|
||
|
||
def has_unit(text, units):
|
||
for u in units:
|
||
if re.search(r'\b' + re.escape(u) + r'\b', text):
|
||
return True
|
||
return False
|
||
|
||
if is_revenue:
|
||
if has_unit(lower_text, ['mrd', 'milliarden', 'billion', 'bn']):
|
||
multiplier = 1000.0
|
||
elif has_unit(lower_text, ['mio', 'million', 'mn']):
|
||
multiplier = 1.0
|
||
elif has_unit(lower_text, ['tsd', 'tausend', 'k']):
|
||
multiplier = 0.001
|
||
else:
|
||
if has_unit(lower_text, ['mrd', 'milliarden', 'billion', 'bn']):
|
||
multiplier = 1_000_000_000.0
|
||
elif has_unit(lower_text, ['mio', 'million', 'mn']):
|
||
multiplier = 1_000_000.0
|
||
elif has_unit(lower_text, ['tsd', 'tausend', 'k']):
|
||
multiplier = 1000.0
|
||
|
||
# 5. Extract the first valid number candidate
|
||
candidates = re.finditer(r'([\d\.,\'\s]+)', text_processed)
|
||
|
||
for match in candidates:
|
||
cand = match.group(1).strip()
|
||
if not cand or not re.search(r'\d', cand):
|
||
continue
|
||
|
||
# Clean candidate
|
||
clean_cand = cand.replace("'", "").replace(".", "").replace(",", "").replace(" ", "")
|
||
|
||
# Year detection
|
||
if clean_cand.isdigit() and len(clean_cand) == 4:
|
||
val = int(clean_cand)
|
||
if 1900 <= val <= 2100:
|
||
continue # Skip years
|
||
|
||
# Smart separator handling for spaces
|
||
if " " in cand:
|
||
parts = cand.split()
|
||
if len(parts) > 1:
|
||
if not (len(parts[1]) == 3 and parts[1].isdigit()):
|
||
cand = parts[0]
|
||
else:
|
||
merged = parts[0]
|
||
for p in parts[1:]:
|
||
if len(p) == 3 and p.isdigit():
|
||
merged += p
|
||
else:
|
||
break
|
||
cand = merged
|
||
|
||
try:
|
||
val = MetricParser._parse_robust_number(cand, is_revenue)
|
||
if val is not None:
|
||
final = val * multiplier
|
||
logger.info(f"[MetricParser] Found value: '{cand}' -> {final}")
|
||
return final
|
||
except:
|
||
continue
|
||
|
||
return None
|
||
|
||
@staticmethod
|
||
def _parse_robust_number(s: str, is_revenue: bool) -> Optional[float]:
|
||
"""
|
||
Parses a number string dealing with ambiguous separators.
|
||
Standardizes to Python float.
|
||
"""
|
||
s = s.strip().replace("'", "")
|
||
if not s:
|
||
return None
|
||
|
||
dots = s.count('.')
|
||
commas = s.count(',')
|
||
|
||
try:
|
||
# Case 1: Both present
|
||
if dots > 0 and commas > 0:
|
||
if s.rfind('.') > s.rfind(','): # US Style
|
||
return float(s.replace(',', ''))
|
||
else: # German Style
|
||
return float(s.replace('.', '').replace(',', '.'))
|
||
|
||
# Case 2: Multiple dots
|
||
if dots > 1:
|
||
return float(s.replace('.', ''))
|
||
|
||
# Case 3: Multiple commas
|
||
if commas > 1:
|
||
return float(s.replace(',', ''))
|
||
|
||
# Case 4: Only Comma
|
||
if commas == 1:
|
||
return float(s.replace(',', '.'))
|
||
|
||
# Case 5: Only Dot
|
||
if dots == 1:
|
||
parts = s.split('.')
|
||
if len(parts[1]) == 3:
|
||
if is_revenue:
|
||
return float(s)
|
||
else:
|
||
return float(s.replace('.', ''))
|
||
return float(s)
|
||
|
||
return float(s)
|
||
except:
|
||
return None |