Files
Brancheneinstufung2/company-explorer/backend/services/classification.py
Floke 11d2bc03bf [30e88f42] ✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf
✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf
  mit Echtdaten abgeschlossen.
2026-02-22 08:20:28 +00:00

309 lines
16 KiB
Python

from typing import Tuple
import json
import logging
import re
from datetime import datetime
from typing import Optional, Dict, Any, List
from sqlalchemy.orm import Session, joinedload
from backend.database import Company, Industry, RoboticsCategory, EnrichmentData
from backend.lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search
from backend.services.scraping import scrape_website_content
from backend.lib.metric_parser import MetricParser
logger = logging.getLogger(__name__)
class ClassificationService:
def __init__(self):
pass
def _load_industry_definitions(self, db: Session) -> List[Industry]:
industries = db.query(Industry).options(
joinedload(Industry.primary_category),
joinedload(Industry.secondary_category)
).all()
if not industries:
logger.warning("No industry definitions found in DB.")
return industries
def _get_wikipedia_content(self, db: Session, company_id: int) -> Optional[Dict[str, Any]]:
enrichment = db.query(EnrichmentData).filter(
EnrichmentData.company_id == company_id,
EnrichmentData.source_type == "wikipedia"
).order_by(EnrichmentData.created_at.desc()).first()
return enrichment.content if enrichment and enrichment.content else None
def _run_llm_classification_prompt(self, website_text: str, company_name: str, industry_definitions: List[Dict[str, str]]) -> Optional[str]:
prompt = f"""
Act as a strict B2B Industry Classifier.
Company: {company_name}
Context: {website_text[:3000]}
Available Industries:
{json.dumps(industry_definitions, indent=2)}
Task: Select the ONE industry that best matches the company.
If the company is a Hospital/Klinik, select 'Healthcare - Hospital'.
If none match well, select 'Others'.
Return ONLY the exact name of the industry.
"""
try:
response = call_gemini_flash(prompt)
if not response: return "Others"
cleaned = response.strip().replace('"', '').replace("'", "")
valid_names = [i['name'] for i in industry_definitions] + ["Others"]
if cleaned in valid_names: return cleaned
for name in valid_names:
if name in cleaned: return name
return "Others"
except Exception as e:
logger.error(f"Classification Prompt Error: {e}")
return "Others"
def _run_llm_metric_extraction_prompt(self, text_content: str, search_term: str, industry_name: str) -> Optional[Dict[str, Any]]:
prompt = f"""
Extract the following metric for the company in industry '{industry_name}':
Target Metric: "{search_term}"
Source Text:
{text_content[:6000]}
Return a JSON object with:
- "raw_value": The number found (e.g. 352 or 352.0). If not found, null.
- "raw_unit": The unit found (e.g. "Betten", "").
- "proof_text": A short quote from the text proving this value.
JSON ONLY.
"""
try:
response = call_gemini_flash(prompt, json_mode=True)
if not response: return None
if isinstance(response, str):
try:
data = json.loads(response.replace("```json", "").replace("```", "").strip())
except: return None
else:
data = response
if isinstance(data, list) and data: data = data[0]
if not isinstance(data, dict): return None
if data.get("raw_value") == "null": data["raw_value"] = None
return data
except Exception as e:
logger.error(f"LLM Extraction Parse Error: {e}")
return None
def _is_metric_plausible(self, metric_name: str, value: Optional[float]) -> bool:
if value is None: return False
try: return float(value) > 0
except: return False
def _parse_standardization_logic(self, formula: str, raw_value: float) -> Optional[float]:
if not formula or raw_value is None: return None
# Clean formula: remove anything in parentheses first (often units or comments)
clean_formula = re.sub(r'\(.*?\)', '', formula.lower())
# Replace 'wert' with the actual value
expression = clean_formula.replace("wert", str(raw_value))
# Remove any non-math characters
expression = re.sub(r'[^0-9\.\+\-\*\/]', '', expression)
try:
return safe_eval_math(expression)
except Exception as e:
logger.error(f"Failed to parse logic '{formula}' with value {raw_value}: {e}")
return None
def _get_best_metric_result(self, results_list: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if not results_list: return None
source_priority = {"wikipedia": 0, "website": 1, "serpapi": 2}
valid_results = [r for r in results_list if r.get("calculated_metric_value") is not None]
if not valid_results: return None
valid_results.sort(key=lambda r: source_priority.get(r.get("metric_source"), 99))
return valid_results[0]
def _get_website_content_and_url(self, db: Session, company: Company) -> Tuple[Optional[str], Optional[str]]:
enrichment = db.query(EnrichmentData).filter_by(company_id=company.id, source_type="website_scrape").order_by(EnrichmentData.created_at.desc()).first()
if enrichment and enrichment.content and "raw_text" in enrichment.content:
return enrichment.content["raw_text"], company.website
content = scrape_website_content(company.website)
return content, company.website
def _get_wikipedia_content_and_url(self, db: Session, company_id: int) -> Tuple[Optional[str], Optional[str]]:
wiki_data = self._get_wikipedia_content(db, company_id)
return (wiki_data.get('full_text'), wiki_data.get('url')) if wiki_data else (None, None)
def _get_serpapi_content_and_url(self, company: Company, search_term: str) -> Tuple[Optional[str], Optional[str]]:
serp_results = run_serp_search(f"{company.name} {company.city or ''} {search_term}")
if not serp_results: return None, None
content = " ".join([res.get("snippet", "") for res in serp_results.get("organic_results", [])])
url = serp_results.get("organic_results", [{}])[0].get("link") if serp_results.get("organic_results") else None
return content, url
def _extract_and_calculate_metric_cascade(self, db: Session, company: Company, industry_name: str, search_term: str, standardization_logic: Optional[str], standardized_unit: Optional[str]) -> Dict[str, Any]:
final_result = {"calculated_metric_name": search_term, "calculated_metric_value": None, "calculated_metric_unit": None, "standardized_metric_value": None, "standardized_metric_unit": standardized_unit, "metric_source": None, "proof_text": None, "metric_source_url": None}
sources = [
("website", lambda: self._get_website_content_and_url(db, company)),
("wikipedia", lambda: self._get_wikipedia_content_and_url(db, company.id)),
("serpapi", lambda: self._get_serpapi_content_and_url(company, search_term))
]
all_source_results = []
parser = MetricParser()
for source_name, content_loader in sources:
logger.info(f" -> Checking source: [{source_name.upper()}] for '{search_term}'")
try:
content_text, current_source_url = content_loader()
if not content_text or len(content_text) < 100: continue
llm_result = self._run_llm_metric_extraction_prompt(content_text, search_term, industry_name)
if llm_result and llm_result.get("proof_text"):
# Use the robust parser on the LLM's proof text or raw_value
hint = llm_result.get("raw_value") or llm_result.get("proof_text")
parsed_value = parser.extract_numeric_value(text=content_text, expected_value=str(hint))
if parsed_value is not None:
llm_result.update({"calculated_metric_value": parsed_value, "calculated_metric_unit": llm_result.get('raw_unit'), "metric_source": source_name, "metric_source_url": current_source_url})
all_source_results.append(llm_result)
except Exception as e: logger.error(f" -> Error in {source_name} stage: {e}")
best_result = self._get_best_metric_result(all_source_results)
if not best_result: return final_result
final_result.update(best_result)
if self._is_metric_plausible(search_term, final_result['calculated_metric_value']):
final_result['standardized_metric_value'] = self._parse_standardization_logic(standardization_logic, final_result['calculated_metric_value'])
return final_result
def _find_direct_area(self, db: Session, company: Company, industry_name: str) -> Optional[Dict[str, Any]]:
logger.info(" -> (Helper) Running specific search for 'Fläche'...")
area_metrics = self._extract_and_calculate_metric_cascade(db, company, industry_name, search_term="Fläche", standardization_logic=None, standardized_unit="")
if area_metrics and area_metrics.get("calculated_metric_value") is not None:
unit = (area_metrics.get("calculated_metric_unit") or "").lower()
if any(u in unit for u in ["", "qm", "quadratmeter"]):
logger.info(" ✅ SUCCESS: Found direct area value.")
area_metrics['standardized_metric_value'] = area_metrics['calculated_metric_value']
return area_metrics
return None
def _generate_marketing_opener(self, company: Company, industry: Industry, website_text: str, focus_mode: str = "primary") -> Optional[str]:
if not industry: return None
# 1. Determine Context & Pains/Gains
product_context = industry.primary_category.name if industry.primary_category else "Robotik-Lösungen"
raw_pains = industry.pains or ""
# Split pains/gains based on markers
def extract_segment(text, marker):
if not text: return ""
segments = re.split(r'\[(.*?)\]', text)
for i in range(1, len(segments), 2):
if marker.lower() in segments[i].lower():
return segments[i+1].strip()
return text # Fallback to full text if no markers found
relevant_pains = extract_segment(raw_pains, "Primary Product")
if focus_mode == "secondary" and industry.ops_focus_secondary and industry.secondary_category:
product_context = industry.secondary_category.name
relevant_pains = extract_segment(raw_pains, "Secondary Product")
prompt = f"""
Du bist ein exzellenter B2B-Stratege und Texter. Formuliere einen hochpersonalisierten Einleitungssatz (1-2 Sätze).
Unternehmen: {company.name}
Branche: {industry.name}
Fokus: {focus_mode.upper()}
Herausforderungen: {relevant_pains}
Kontext: {website_text[:2500]}
REGEL: Nenne NICHT das Produkt "{product_context}". Fokussiere dich NUR auf die Herausforderung.
AUSGABE: NUR den fertigen Satz.
"""
try:
response = call_gemini_flash(prompt)
return response.strip().strip('"') if response else None
except Exception as e:
logger.error(f"Opener Error: {e}")
return None
def _sync_company_address_data(self, db: Session, company: Company):
"""Extracts address and VAT data from website scrape if available."""
from ..database import EnrichmentData
enrichment = db.query(EnrichmentData).filter_by(
company_id=company.id, source_type="website_scrape"
).order_by(EnrichmentData.created_at.desc()).first()
if enrichment and enrichment.content and "impressum" in enrichment.content:
imp = enrichment.content["impressum"]
if imp and isinstance(imp, dict):
changed = False
# City
if imp.get("city") and not company.city:
company.city = imp.get("city")
changed = True
# Street
if imp.get("street") and not company.street:
company.street = imp.get("street")
changed = True
# Zip / PLZ
zip_val = imp.get("zip") or imp.get("plz")
if zip_val and not company.zip_code:
company.zip_code = zip_val
changed = True
# Country
if imp.get("country_code") and (not company.country or company.country == "DE"):
company.country = imp.get("country_code")
changed = True
# VAT ID
if imp.get("vat_id") and not company.crm_vat:
company.crm_vat = imp.get("vat_id")
changed = True
if changed:
db.commit()
logger.info(f"Updated Address/VAT from Impressum for {company.name}: City={company.city}, VAT={company.crm_vat}")
def classify_company_potential(self, company: Company, db: Session) -> Company:
logger.info(f"--- Starting FULL Analysis v3.0 for {company.name} ---")
# Ensure metadata is synced from scrape
self._sync_company_address_data(db, company)
industries = self._load_industry_definitions(db)
website_content, _ = self._get_website_content_and_url(db, company)
if not website_content or len(website_content) < 100:
company.status = "ENRICH_FAILED"
db.commit()
return company
industry_defs = [{"name": i.name, "description": i.description} for i in industries]
suggested_industry_name = self._run_llm_classification_prompt(website_content, company.name, industry_defs)
matched_industry = next((i for i in industries if i.name == suggested_industry_name), None)
if not matched_industry:
company.industry_ai = "Others"
db.commit()
return company
company.industry_ai = matched_industry.name
logger.info(f"✅ Industry: {matched_industry.name}")
metrics = self._find_direct_area(db, company, matched_industry.name)
if not metrics:
logger.info(" -> No direct area. Trying proxy...")
if matched_industry.scraper_search_term:
metrics = self._extract_and_calculate_metric_cascade(db, company, matched_industry.name, search_term=matched_industry.scraper_search_term, standardization_logic=matched_industry.standardization_logic, standardized_unit="")
if metrics and metrics.get("calculated_metric_value"):
logger.info(f" ✅ SUCCESS: {metrics.get('calculated_metric_value')} {metrics.get('calculated_metric_unit')}")
company.calculated_metric_name = metrics.get("calculated_metric_name", matched_industry.scraper_search_term or "Fläche")
company.calculated_metric_value = metrics.get("calculated_metric_value")
company.calculated_metric_unit = metrics.get("calculated_metric_unit")
company.standardized_metric_value = metrics.get("standardized_metric_value")
company.standardized_metric_unit = metrics.get("standardized_metric_unit")
company.metric_source = metrics.get("metric_source")
company.metric_proof_text = metrics.get("proof_text")
company.metric_source_url = metrics.get("metric_source_url")
company.metric_confidence = 0.8
company.metric_confidence_reason = "Metric processed."
company.ai_opener = self._generate_marketing_opener(company, matched_industry, website_content, "primary")
company.ai_opener_secondary = self._generate_marketing_opener(company, matched_industry, website_content, "secondary")
company.last_classification_at = datetime.utcnow()
company.status = "ENRICHED"
db.commit()
logger.info(f"--- ✅ Analysis Finished for {company.name} ---")
return company