Files
Brancheneinstufung2/company-explorer/backend/services/classification.py
Floke b4595ef974 feat(app): Add wiki re-evaluation and fix wolfra bug
- Implemented a "Re-evaluate Wikipedia" button in the UI.

- Added a backend endpoint to trigger targeted Wikipedia metric extraction.

- Hardened the LLM metric extraction prompt to prevent hallucinations.

- Corrected several database path errors that caused data loss.

- Updated application version to 0.6.4 and documented the ongoing issue.
2026-01-23 16:05:44 +00:00

354 lines
18 KiB
Python

import json
import logging
import re
from datetime import datetime
from typing import Optional, Dict, Any, List
from sqlalchemy.orm import Session
from backend.database import Company, Industry, RoboticsCategory, EnrichmentData
from backend.lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search
from backend.services.scraping import scrape_website_content
from backend.lib.metric_parser import MetricParser
logger = logging.getLogger(__name__)
class ClassificationService:
def __init__(self):
# We no longer load industries in init because we don't have a DB session here
pass
def _load_industry_definitions(self, db: Session) -> List[Industry]:
"""Loads all industry definitions from the database."""
industries = db.query(Industry).all()
if not industries:
logger.warning("No industry definitions found in DB. Classification might be limited.")
return industries
def _get_wikipedia_content(self, db: Session, company_id: int) -> Optional[str]:
"""Fetches Wikipedia content from enrichment_data for a given company."""
enrichment = db.query(EnrichmentData).filter(
EnrichmentData.company_id == company_id,
EnrichmentData.source_type == "wikipedia"
).order_by(EnrichmentData.created_at.desc()).first()
if enrichment and enrichment.content:
wiki_data = enrichment.content
return wiki_data.get('full_text')
return None
def _run_llm_classification_prompt(self, website_text: str, company_name: str, industry_definitions: List[Dict[str, str]]) -> Optional[str]:
"""
Uses LLM to classify the company into one of the predefined industries.
"""
prompt = r"""
Du bist ein präziser Branchen-Klassifizierer für Unternehmen.
Deine Aufgabe ist es, das vorliegende Unternehmen basierend auf seinem Website-Inhalt
einer der untenstehenden Branchen zuzuordnen.
--- UNTERNEHMEN ---
Name: {company_name}
Website-Inhalt (Auszug):
{website_text_excerpt}
--- ZU VERWENDENDE BRANCHEN-DEFINITIONEN (STRIKT) ---
Wähle EINE der folgenden Branchen. Jede Branche hat eine Definition.
{industry_definitions_json}
--- AUFGABE ---
Analysiere den Website-Inhalt. Wähle die Branchen-Definition, die am besten zum Unternehmen passt.
Wenn keine der Definitionen zutrifft oder du unsicher bist, wähle "Others".
Gib NUR den Namen der zugeordneten Branche zurück, als reinen String, nichts anderes.
Beispiel Output: Hotellerie
""".format(
company_name=company_name,
website_text_excerpt=website_text[:10000],
industry_definitions_json=json.dumps(industry_definitions, ensure_ascii=False)
)
try:
response = call_gemini_flash(prompt, temperature=0.1, json_mode=False)
return response.strip()
except Exception as e:
logger.error(f"LLM classification failed for {company_name}: {e}")
return None
def _run_llm_metric_extraction_prompt(self, text_content: str, search_term: str, industry_name: str) -> Optional[Dict[str, Any]]:
"""
Uses LLM to extract the specific metric value from text.
Updated to look specifically for area (m²) even if not the primary search term.
"""
prompt = r"""
Du bist ein Datenextraktions-Spezialist für Unternehmens-Kennzahlen.
Analysiere den folgenden Text, um spezifische Werte zu extrahieren.
--- KONTEXT ---
Branche: {industry_name}
Primär gesuchte Metrik: '{search_term}'
--- TEXT ---
{text_content_excerpt}
--- AUFGABE ---
1. Finde den numerischen Wert für die primäre Metrik '{search_term}'.
2. EXTREM WICHTIG: Suche im gesamten Text nach einer Angabe zur Gesamtfläche, Nutzfläche, Grundstücksfläche oder Verkaufsfläche in Quadratmetern (m²).
In Branchen wie Freizeitparks, Flughäfen oder Thermen ist dies oft separat im Fließtext versteckt (z.B. "Die Therme verfügt über eine Gesamtfläche von 4.000 m²").
3. Achte auf deutsche Zahlenformate (z.B. 1.005 für tausend-fünf).
4. Regel: Extrahiere IMMER den umgebenden Satz oder die Zeile in 'raw_text_segment'. Rate NIEMALS einen numerischen Wert, ohne den Beweis dafür zu liefern.
Gib NUR ein JSON-Objekt zurück:
'raw_text_segment': Das Snippet für '{search_term}' (z.B. "ca. 1.500 Besucher (2020)"). MUSS IMMER AUSGEFÜLLT SEIN WENN EIN WERT GEFUNDEN WURDE.
'raw_value': Der numerische Wert für '{search_term}'. null, falls nicht gefunden.
'raw_unit': Die Einheit (z.B. "Besucher", "Passagiere"). null, falls nicht gefunden.
'area_text_segment': Das Snippet, das eine Fläche (m²) erwähnt (z.B. "4.000 m² Gesamtfläche"). null, falls nicht gefunden.
'area_value': Der gefundene Wert der Fläche in m² (als Zahl). null, falls nicht gefunden.
'metric_name': '{search_term}'.
""".format(
industry_name=industry_name,
search_term=search_term,
text_content_excerpt=text_content[:15000]
)
try:
response = call_gemini_flash(prompt, temperature=0.05, json_mode=True)
return json.loads(response)
except Exception as e:
logger.error(f"LLM metric extraction failed for '{search_term}': {e}")
return None
def _parse_standardization_logic(self, formula: str, raw_value: float) -> Optional[float]:
if not formula or raw_value is None:
return None
# Clean formula: Replace 'wert'/'Value' and strip area units like m² or alphanumeric noise
# that Notion sync might bring in (e.g. "wert * 25m2" -> "wert * 25")
formula_cleaned = formula.replace("wert", str(raw_value)).replace("Value", str(raw_value))
# Remove common unit strings and non-math characters (except dots and parentheses)
formula_cleaned = re.sub(r'(?i)m[²2]', '', formula_cleaned)
formula_cleaned = re.sub(r'(?i)qm', '', formula_cleaned)
# We leave the final safety check to safe_eval_math
try:
return safe_eval_math(formula_cleaned)
except Exception as e:
logger.error(f"Failed to parse standardization logic '{formula}' with value {raw_value}: {e}")
return None
def _extract_and_calculate_metric_cascade(
self,
db: Session,
company: Company,
industry_name: str,
search_term: str,
standardization_logic: Optional[str],
standardized_unit: Optional[str]
) -> Dict[str, Any]:
results = {
"calculated_metric_name": search_term,
"calculated_metric_value": None,
"calculated_metric_unit": None,
"standardized_metric_value": None,
"standardized_metric_unit": standardized_unit,
"metric_source": None
}
# CASCADE: Website -> Wikipedia -> SerpAPI
sources = [
("website", lambda: scrape_website_content(company.website)),
("wikipedia", lambda: self._get_wikipedia_content(db, company.id)),
("serpapi", lambda: " ".join([res.get("snippet", "") for res in run_serp_search(f"{company.name} {search_term} {industry_name}").get("organic_results", [])]) if run_serp_search(f"{company.name} {search_term} {industry_name}") else None)
]
for source_name, content_loader in sources:
logger.info(f"Checking {source_name} for '{search_term}' for {company.name}")
try:
content = content_loader()
print(f"--- DEBUG: Content length for {source_name}: {len(content) if content else 0}")
if not content: continue
llm_result = self._run_llm_metric_extraction_prompt(content, search_term, industry_name)
print(f"--- DEBUG: LLM Result for {source_name}: {llm_result}")
is_revenue = "umsatz" in search_term.lower() or "revenue" in search_term.lower()
# Hybrid Extraction Logic:
# 1. Try to parse from the text segment using our robust Python parser (prioritized for German formats)
parsed_value = None
if llm_result and llm_result.get("raw_text_segment"):
parsed_value = MetricParser.extract_numeric_value(llm_result["raw_text_segment"], is_revenue=is_revenue)
if parsed_value is not None:
logger.info(f"Successfully parsed '{llm_result['raw_text_segment']}' to {parsed_value} using MetricParser.")
# 2. Fallback to LLM's raw_value if parser failed or no segment found
# NEW: Also run MetricParser on the raw_value if it's a string, to catch errors like "802020"
final_value = parsed_value
if final_value is None and llm_result.get("raw_value"):
final_value = MetricParser.extract_numeric_value(str(llm_result["raw_value"]), is_revenue=is_revenue)
if final_value is not None:
logger.info(f"Successfully cleaned LLM raw_value '{llm_result['raw_value']}' to {final_value}")
# Ultimate fallback to original raw_value if still None (though parser is very robust)
if final_value is None:
final_value = llm_result.get("raw_value")
if llm_result and (final_value is not None or llm_result.get("area_value") is not None or llm_result.get("area_text_segment")):
results["calculated_metric_value"] = final_value
results["calculated_metric_unit"] = llm_result.get("raw_unit")
results["metric_source"] = source_name
# 3. Area Extraction Logic (Cascading)
area_val = llm_result.get("area_value")
# Try to refine area_value if a segment exists
if llm_result.get("area_text_segment"):
refined_area = MetricParser.extract_numeric_value(llm_result["area_text_segment"], is_revenue=False)
if refined_area is not None:
area_val = refined_area
logger.info(f"Refined area to {area_val} from segment '{llm_result['area_text_segment']}'")
if area_val is not None:
results["standardized_metric_value"] = area_val
elif final_value is not None and standardization_logic:
results["standardized_metric_value"] = self._parse_standardization_logic(standardization_logic, final_value)
return results
except Exception as e:
logger.error(f"Error in {source_name} stage: {e}")
return results
def extract_metrics_for_industry(self, company: Company, db: Session, industry: Industry) -> Company:
"""
Extracts and calculates metrics for a given industry.
Splits out from classify_company_potential to allow manual overrides.
"""
if not industry or not industry.scraper_search_term:
logger.warning(f"No metric configuration for industry '{industry.name if industry else 'None'}'")
return company
# Derive standardized unit
std_unit = "" if "" in (industry.standardization_logic or "") else "Einheiten"
metrics = self._extract_and_calculate_metric_cascade(
db, company, industry.name, industry.scraper_search_term, industry.standardization_logic, std_unit
)
company.calculated_metric_name = metrics["calculated_metric_name"]
company.calculated_metric_value = metrics["calculated_metric_value"]
company.calculated_metric_unit = metrics["calculated_metric_unit"]
company.standardized_metric_value = metrics["standardized_metric_value"]
company.standardized_metric_unit = metrics["standardized_metric_unit"]
company.metric_source = metrics["metric_source"]
# Keep track of refinement
company.last_classification_at = datetime.utcnow()
db.commit()
return company
def reevaluate_wikipedia_metric(self, company: Company, db: Session, industry: Industry) -> Company:
"""
Runs the metric extraction cascade for ONLY the Wikipedia source.
"""
logger.info(f"Starting Wikipedia re-evaluation for '{company.name}'")
if not industry or not industry.scraper_search_term:
logger.warning(f"Cannot re-evaluate: No metric configuration for industry '{industry.name}'")
return company
search_term = industry.scraper_search_term
content = self._get_wikipedia_content(db, company.id)
if not content:
logger.warning("No Wikipedia content found to re-evaluate.")
return company
try:
llm_result = self._run_llm_metric_extraction_prompt(content, search_term, industry.name)
if not llm_result:
raise ValueError("LLM metric extraction returned empty result.")
is_revenue = "umsatz" in search_term.lower() or "revenue" in search_term.lower()
# Hybrid Extraction Logic (same as in cascade)
parsed_value = None
if llm_result.get("raw_text_segment"):
parsed_value = MetricParser.extract_numeric_value(llm_result["raw_text_segment"], is_revenue=is_revenue)
if parsed_value is not None:
logger.info(f"Successfully parsed '{llm_result['raw_text_segment']}' to {parsed_value} using MetricParser.")
final_value = parsed_value
if final_value is None and llm_result.get("raw_value"):
final_value = MetricParser.extract_numeric_value(str(llm_result["raw_value"]), is_revenue=is_revenue)
if final_value is not None:
logger.info(f"Successfully cleaned LLM raw_value '{llm_result['raw_value']}' to {final_value}")
if final_value is None:
final_value = llm_result.get("raw_value")
# Update company metrics if a value was found
if final_value is not None:
company.calculated_metric_name = search_term
company.calculated_metric_value = final_value
company.calculated_metric_unit = llm_result.get("raw_unit")
company.metric_source = "wikipedia_reevaluated"
# Handle standardization
std_unit = "" if "" in (industry.standardization_logic or "") else "Einheiten"
company.standardized_metric_unit = std_unit
area_val = llm_result.get("area_value")
if llm_result.get("area_text_segment"):
refined_area = MetricParser.extract_numeric_value(llm_result["area_text_segment"], is_revenue=False)
if refined_area is not None:
area_val = refined_area
if area_val is not None:
company.standardized_metric_value = area_val
elif industry.standardization_logic:
company.standardized_metric_value = self._parse_standardization_logic(industry.standardization_logic, final_value)
else:
company.standardized_metric_value = None
company.last_classification_at = datetime.utcnow()
db.commit()
logger.info(f"Successfully re-evaluated and updated metrics for {company.name} from Wikipedia.")
else:
logger.warning(f"Re-evaluation for {company.name} did not yield a metric value.")
except Exception as e:
logger.error(f"Error during Wikipedia re-evaluation for {company.name}: {e}")
return company
def classify_company_potential(self, company: Company, db: Session) -> Company:
logger.info(f"Starting complete classification for {company.name}")
# 1. Load Industries
industries = self._load_industry_definitions(db)
industry_defs = [{"name": i.name, "description": i.description} for i in industries]
# 2. Industry Classification (Website-based)
# STRENG: Nur wenn Branche noch auf "Others" steht oder neu ist, darf die KI klassifizieren
valid_industry_names = [i.name for i in industries]
if company.industry_ai and company.industry_ai != "Others" and company.industry_ai in valid_industry_names:
logger.info(f"KEEPING manual/existing industry '{company.industry_ai}' for {company.name}")
else:
website_content = scrape_website_content(company.website)
if website_content:
industry_name = self._run_llm_classification_prompt(website_content, company.name, industry_defs)
company.industry_ai = industry_name if industry_name in valid_industry_names else "Others"
logger.info(f"AI CLASSIFIED {company.name} as '{company.industry_ai}'")
else:
company.industry_ai = "Others"
logger.warning(f"No website content for {company.name}, setting industry to Others")
db.commit()
# 3. Metric Extraction
if company.industry_ai != "Others":
industry = next((i for i in industries if i.name == company.industry_ai), None)
if industry:
self.extract_metrics_for_industry(company, db, industry)
return company