325 lines
15 KiB
Python
325 lines
15 KiB
Python
import re
|
||
import logging
|
||
from typing import Optional, Union
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class MetricParser:
|
||
"""
|
||
Robust parser for extracting numeric values from text, specialized for
|
||
German formats and business metrics (Revenue, Employees).
|
||
Reconstructs legacy logic to handle thousands separators and year-suffixes.
|
||
"""
|
||
|
||
@staticmethod
|
||
def extract_numeric_value(text: str, is_revenue: bool = False, expected_value: Optional[str] = None) -> Optional[float]:
|
||
"""
|
||
Extracts a float value from a string.
|
||
If expected_value is provided (from LLM), matches that specific number in the text.
|
||
Otherwise, finds the first robust number.
|
||
"""
|
||
if not text:
|
||
return None
|
||
|
||
# 1. Pre-cleaning
|
||
text_processed = str(text).strip()
|
||
logger.info(f"[MetricParser] Processing: '{text_processed}' (Expected: {expected_value})")
|
||
|
||
# Optimize: If we have an expected value, try to clean and parse THAT first
|
||
if expected_value:
|
||
# Try to parse the LLM's raw value directly first (it's often cleaner: "200000")
|
||
try:
|
||
# Remove simple noise from expected value
|
||
# Aggressively strip units and text to isolate the number
|
||
clean_expected = str(expected_value).lower()
|
||
# Remove common units
|
||
for unit in ['m²', 'qm', 'sqm', 'mitarbeiter', 'employees', 'eur', 'usd', 'chf', '€', '$', '£', '¥']:
|
||
clean_expected = clean_expected.replace(unit, "")
|
||
|
||
# Remove multipliers text (we handle multipliers via is_revenue later, but for expected value matching we want the raw number)
|
||
# Actually, expected_value "2.5 Mio" implies we want to match 2.5 in the text, OR 2500000?
|
||
# Usually the LLM extract matches the text representation.
|
||
clean_expected = clean_expected.replace("mio", "").replace("millionen", "").replace("mrd", "").replace("milliarden", "")
|
||
clean_expected = clean_expected.replace("tsd", "").replace("tausend", "")
|
||
|
||
# Final cleanup of non-numeric chars (allow . , ' -)
|
||
# But preserve structure for robust parser
|
||
clean_expected = clean_expected.replace(" ", "").replace("'", "")
|
||
|
||
# If it looks like a clean number already, try parsing it
|
||
# But use the robust parser to handle German decimals if present in expected
|
||
val = MetricParser._parse_robust_number(clean_expected, is_revenue)
|
||
|
||
# Check if this value (or a close representation) actually exists in the text
|
||
# This prevents hallucination acceptance, but allows the LLM to guide us to the *second* number in a string.
|
||
# Simplified check: is the digits sequence present?
|
||
# No, better: Let the parser run on the FULL text, find all candidates, and pick the one closest to 'val'.
|
||
except:
|
||
pass
|
||
|
||
# Normalize quotes
|
||
text_processed = text_processed.replace("’", "'").replace("‘", "'")
|
||
|
||
# 2. Remove noise: Citations [1] and Year/Date in parentheses (2020)
|
||
# We remove everything in parentheses/brackets as it's almost always noise for the metric itself.
|
||
text_processed = re.sub(r'\(.*?\)|\[.*?\]', ' ', text_processed).strip()
|
||
|
||
# 3. Remove common prefixes and currency symbols
|
||
prefixes = [
|
||
r'ca\.?\s*', r'circa\s*', r'rund\s*', r'etwa\s*', r'über\s*', r'unter\s*',
|
||
r'mehr als\s*', r'weniger als\s*', r'bis zu\s*', r'about\s*', r'over\s*',
|
||
r'approx\.?\s*', r'around\s*', r'up to\s*', r'~\s*', r'rd\.?\s*'
|
||
]
|
||
currencies = [
|
||
r'€', r'EUR', r'US\$', r'USD', r'CHF', r'GBP', r'£', r'¥', r'JPY'
|
||
]
|
||
|
||
for p in prefixes:
|
||
text_processed = re.sub(f'(?i)^{p}', '', text_processed).strip()
|
||
for c in currencies:
|
||
text_processed = re.sub(f'(?i){c}', '', text_processed).strip()
|
||
|
||
# 4. Remove Range Splitting (was too aggressive, cutting off text after dashes)
|
||
# Old: text_processed = re.split(r'\s*(-|–|bis|to)\s*', text_processed, 1)[0].strip()
|
||
|
||
# 5. Extract Multipliers (Mio, Mrd)
|
||
multiplier = 1.0
|
||
lower_text = text_processed.lower()
|
||
|
||
def has_unit(text, units):
|
||
for u in units:
|
||
# Escape special chars if any, though mostly alphanumeric here
|
||
# Use word boundaries \b for safe matching
|
||
if re.search(r'\b' + re.escape(u) + r'\b', text):
|
||
return True
|
||
return False
|
||
|
||
# For Revenue, we normalize to Millions (User Rule)
|
||
# For others (Employees), we scale to absolute numbers
|
||
if is_revenue:
|
||
if has_unit(lower_text, ['mrd', 'milliarden', 'billion', 'bn']):
|
||
multiplier = 1000.0
|
||
elif has_unit(lower_text, ['mio', 'million', 'mn']):
|
||
multiplier = 1.0
|
||
elif has_unit(lower_text, ['tsd', 'tausend', 'k']):
|
||
multiplier = 0.001
|
||
else:
|
||
if has_unit(lower_text, ['mrd', 'milliarden', 'billion', 'bn']):
|
||
multiplier = 1_000_000_000.0
|
||
elif has_unit(lower_text, ['mio', 'million', 'mn']):
|
||
multiplier = 1_000_000.0
|
||
elif has_unit(lower_text, ['tsd', 'tausend', 'k']):
|
||
multiplier = 1000.0
|
||
|
||
# 6. Extract the number candidate
|
||
# Loop through matches to find the best candidate (skipping years if possible)
|
||
candidates = re.finditer(r'([\d\.,\'\s]+)', text_processed)
|
||
|
||
selected_candidate = None
|
||
best_candidate_val = None
|
||
|
||
matches = [m for m in candidates]
|
||
# logger.info(f"DEBUG matches: {[m.group(1) for m in matches]}")
|
||
# logger.info(f"DEBUG: Found {len(matches)} matches: {[m.group(1) for m in matches]}")
|
||
|
||
# Helper to parse a candidate string
|
||
def parse_cand(c):
|
||
# Extract temporary multiplier for this specific candidate context?
|
||
# Complex. For now, we assume the global multiplier applies or we rely on the candidates raw numeric value.
|
||
# Actually, simpler: We parse the candidate as is (treating as raw number)
|
||
try:
|
||
# Remove thousands separators for comparison
|
||
c_clean = c.replace("'", "").replace(".", "").replace(" ", "").replace(",", ".") # Rough EN/DE mix
|
||
return float(c_clean)
|
||
except:
|
||
return None
|
||
|
||
# Parse expected value for comparison
|
||
target_val = None
|
||
if expected_value:
|
||
try:
|
||
# Re-apply aggressive cleaning to ensure we have a valid float for comparison
|
||
clean_expected = str(expected_value).lower()
|
||
for unit in ['m²', 'qm', 'sqm', 'mitarbeiter', 'employees', 'eur', 'usd', 'chf', '€', '$', '£', '¥']:
|
||
clean_expected = clean_expected.replace(unit, "")
|
||
clean_expected = clean_expected.replace("mio", "").replace("millionen", "").replace("mrd", "").replace("milliarden", "")
|
||
clean_expected = clean_expected.replace("tsd", "").replace("tausend", "")
|
||
clean_expected = clean_expected.replace(" ", "").replace("'", "")
|
||
|
||
target_val = MetricParser._parse_robust_number(clean_expected, is_revenue)
|
||
except:
|
||
pass
|
||
|
||
for i, match in enumerate(matches):
|
||
cand = match.group(1).strip()
|
||
if not cand: continue
|
||
|
||
# Clean candidate for analysis (remove separators)
|
||
clean_cand = cand.replace("'", "").replace(".", "").replace(",", "").replace(" ", "")
|
||
|
||
# Check if it looks like a year (4 digits, 1900-2100)
|
||
is_year_like = False
|
||
if clean_cand.isdigit() and len(clean_cand) == 4:
|
||
val = int(clean_cand)
|
||
if 1900 <= val <= 2100:
|
||
is_year_like = True
|
||
|
||
# Smart Year Skip (Legacy Logic)
|
||
if is_year_like and not target_val: # Only skip if we don't have a specific target
|
||
if i < len(matches) - 1:
|
||
logger.info(f"[MetricParser] Skipping year-like candidate '{cand}' because another number follows.")
|
||
continue
|
||
|
||
# Clean candidate for checking (remove internal spaces if they look like thousands separators)
|
||
# Simple approach: Remove all spaces for parsing check
|
||
cand_clean_for_parse = cand.replace(" ", "")
|
||
|
||
# If we have a target value from LLM, check if this candidate matches it
|
||
if target_val is not None:
|
||
try:
|
||
curr_val = MetricParser._parse_robust_number(cand_clean_for_parse, is_revenue)
|
||
|
||
if abs(curr_val - target_val) < 0.1 or abs(curr_val - target_val/1000) < 0.1 or abs(curr_val - target_val*1000) < 0.1:
|
||
selected_candidate = cand # Keep original with spaces for final processing
|
||
logger.info(f"[MetricParser] Found candidate '{cand}' matching expected '{expected_value}'")
|
||
break
|
||
except:
|
||
pass
|
||
|
||
# Fallback logic:
|
||
# If we have NO target value, we take the first valid one we find.
|
||
# If we DO have a target value, we only take a fallback if we reach the end and haven't found the target?
|
||
# Better: We keep the FIRST valid candidate as a fallback in a separate variable.
|
||
|
||
if selected_candidate is None:
|
||
# Check if it's a valid number at all before storing as fallback
|
||
try:
|
||
MetricParser._parse_robust_number(cand_clean_for_parse, is_revenue)
|
||
if not is_year_like:
|
||
if best_candidate_val is None: # Store first valid non-year
|
||
best_candidate_val = cand
|
||
except:
|
||
pass
|
||
|
||
# If we found a specific match, use it. Otherwise use the fallback.
|
||
if selected_candidate:
|
||
candidate = selected_candidate
|
||
elif best_candidate_val:
|
||
candidate = best_candidate_val
|
||
else:
|
||
return None
|
||
|
||
# logger.info(f"DEBUG: Selected candidate: '{candidate}'")
|
||
|
||
# Smart separator handling (on the chosen candidate):
|
||
|
||
# Smart separator handling:
|
||
|
||
# Smart separator handling:
|
||
# A space is only a thousands-separator if it's followed by 3 digits.
|
||
# Otherwise it's likely a separator between unrelated numbers (e.g. "80 2020")
|
||
if " " in candidate:
|
||
parts = candidate.split()
|
||
if len(parts) > 1:
|
||
# Basic check: if second part is not 3 digits, we take only the first part
|
||
if not (len(parts[1]) == 3 and parts[1].isdigit()):
|
||
candidate = parts[0]
|
||
else:
|
||
# It might be 1 000. Keep merging if subsequent parts are also 3 digits.
|
||
merged = parts[0]
|
||
for p in parts[1:]:
|
||
if len(p) == 3 and p.isdigit():
|
||
merged += p
|
||
else:
|
||
break
|
||
candidate = merged
|
||
|
||
# Remove thousands separators (Quote)
|
||
candidate = candidate.replace("'", "")
|
||
|
||
if not candidate or not re.search(r'\d', candidate):
|
||
return None
|
||
|
||
# Count separators for rule checks
|
||
dots = candidate.count('.')
|
||
commas = candidate.count(',')
|
||
|
||
# 7. Concatenated Year Detection (Bug Fix for 802020)
|
||
# If the number is long (5-7 digits) and ends with a recent year (2018-2026),
|
||
# and has no separators, it's likely a concatenation like "802020".
|
||
if dots == 0 and commas == 0 and " " not in candidate:
|
||
if len(candidate) >= 5 and len(candidate) <= 7:
|
||
for year in range(2018, 2027):
|
||
y_str = str(year)
|
||
if candidate.endswith(y_str):
|
||
val_str = candidate[:-4]
|
||
if val_str.isdigit():
|
||
logger.warning(f"[MetricParser] Caught concatenated year BUG: '{candidate}' -> '{val_str}' (Year {year})")
|
||
candidate = val_str
|
||
break
|
||
|
||
try:
|
||
val = MetricParser._parse_robust_number(candidate, is_revenue)
|
||
final = val * multiplier
|
||
logger.info(f"[MetricParser] Candidate: '{candidate}' -> Multiplier: {multiplier} -> Value: {final}")
|
||
return final
|
||
except Exception as e:
|
||
logger.debug(f"Failed to parse number string '{candidate}': {e}")
|
||
return None
|
||
|
||
@staticmethod
|
||
def _parse_robust_number(s: str, is_revenue: bool) -> float:
|
||
"""
|
||
Parses a number string dealing with ambiguous separators.
|
||
Standardizes to Python float.
|
||
"""
|
||
# Count separators
|
||
dots = s.count('.')
|
||
commas = s.count(',')
|
||
|
||
# Case 1: Both present (e.g. 1.234,56 or 1,234.56)
|
||
if dots > 0 and commas > 0:
|
||
# Check which comes last
|
||
if s.rfind('.') > s.rfind(','): # US Style: 1,234.56
|
||
return float(s.replace(',', ''))
|
||
else: # German Style: 1.234,56
|
||
return float(s.replace('.', '').replace(',', '.'))
|
||
|
||
# Case 2: Multiple dots (Thousands: 1.000.000)
|
||
if dots > 1:
|
||
return float(s.replace('.', ''))
|
||
|
||
# Case 3: Multiple commas (Unusual, but treat as thousands)
|
||
if commas > 1:
|
||
return float(s.replace(',', ''))
|
||
|
||
# Case 4: Only Comma
|
||
if commas == 1:
|
||
# In German context "1,5" is 1.5. "1.000" is usually 1000.
|
||
# If it looks like decimal (1-2 digits after comma), treat as decimal.
|
||
# Except if it's exactly 3 digits and not is_revenue? No, comma is almost always decimal in DE.
|
||
return float(s.replace(',', '.'))
|
||
|
||
# Case 5: Only Dot
|
||
if dots == 1:
|
||
# Ambiguity: "1.005" (1005) vs "1.5" (1.5)
|
||
# Rule from Lesson 1: "1.005 Mitarbeiter" extracted as "1" (wrong).
|
||
# If dot followed by exactly 3 digits (and no comma), it's a thousands separator.
|
||
# FOR REVENUE: dots are generally decimals (375.6 Mio) unless unambiguous.
|
||
|
||
parts = s.split('.')
|
||
if len(parts[1]) == 3:
|
||
if is_revenue:
|
||
# Revenue: 375.600 Mio? Unlikely compared to 375.6 Mio.
|
||
# But 1.000 Mio is 1 Billion? No, 1.000 (thousand) millions.
|
||
# User Rule: "Revenue: dots are generally treated as decimals"
|
||
# "1.005" as revenue -> 1.005 (Millions)
|
||
# "1.005" as employees -> 1005
|
||
return float(s)
|
||
else:
|
||
return float(s.replace('.', ''))
|
||
return float(s)
|
||
|
||
return float(s)
|
||
|