import json import logging import re from datetime import datetime from typing import Optional, Dict, Any, List from sqlalchemy.orm import Session from backend.database import Company, Industry, RoboticsCategory, EnrichmentData from backend.lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search from backend.services.scraping import scrape_website_content from backend.lib.metric_parser import MetricParser logger = logging.getLogger(__name__) class ClassificationService: def __init__(self): # We no longer load industries in init because we don't have a DB session here pass def _load_industry_definitions(self, db: Session) -> List[Industry]: """Loads all industry definitions from the database.""" industries = db.query(Industry).all() if not industries: logger.warning("No industry definitions found in DB. Classification might be limited.") return industries def _get_wikipedia_content(self, db: Session, company_id: int) -> Optional[str]: """Fetches Wikipedia content from enrichment_data for a given company.""" enrichment = db.query(EnrichmentData).filter( EnrichmentData.company_id == company_id, EnrichmentData.source_type == "wikipedia" ).order_by(EnrichmentData.created_at.desc()).first() if enrichment and enrichment.content: wiki_data = enrichment.content return wiki_data.get('full_text') return None def _run_llm_classification_prompt(self, website_text: str, company_name: str, industry_definitions: List[Dict[str, str]]) -> Optional[str]: """ Uses LLM to classify the company into one of the predefined industries. """ prompt = r""" Du bist ein präziser Branchen-Klassifizierer für Unternehmen. Deine Aufgabe ist es, das vorliegende Unternehmen basierend auf seinem Website-Inhalt einer der untenstehenden Branchen zuzuordnen. --- UNTERNEHMEN --- Name: {company_name} Website-Inhalt (Auszug): {website_text_excerpt} --- ZU VERWENDENDE BRANCHEN-DEFINITIONEN (STRIKT) --- Wähle EINE der folgenden Branchen. Jede Branche hat eine Definition. {industry_definitions_json} --- AUFGABE --- Analysiere den Website-Inhalt. Wähle die Branchen-Definition, die am besten zum Unternehmen passt. Wenn keine der Definitionen zutrifft oder du unsicher bist, wähle "Others". Gib NUR den Namen der zugeordneten Branche zurück, als reinen String, nichts anderes. Beispiel Output: Hotellerie """.format( company_name=company_name, website_text_excerpt=website_text[:10000], industry_definitions_json=json.dumps(industry_definitions, ensure_ascii=False) ) try: response = call_gemini_flash(prompt, temperature=0.1, json_mode=False) return response.strip() except Exception as e: logger.error(f"LLM classification failed for {company_name}: {e}") return None def _run_llm_metric_extraction_prompt(self, text_content: str, search_term: str, industry_name: str) -> Optional[Dict[str, Any]]: """ Uses LLM to extract the specific metric value from text. Updated to look specifically for area (m²) even if not the primary search term. """ prompt = r""" Du bist ein Datenextraktions-Spezialist für Unternehmens-Kennzahlen. Analysiere den folgenden Text, um spezifische Werte zu extrahieren. --- KONTEXT --- Branche: {industry_name} Primär gesuchte Metrik: '{search_term}' --- TEXT --- {text_content_excerpt} --- AUFGABE --- 1. Finde den numerischen Wert für die primäre Metrik '{search_term}'. 2. EXTREM WICHTIG: Suche im gesamten Text nach einer Angabe zur Gesamtfläche, Nutzfläche, Grundstücksfläche oder Verkaufsfläche in Quadratmetern (m²). In Branchen wie Freizeitparks, Flughäfen oder Thermen ist dies oft separat im Fließtext versteckt (z.B. "Die Therme verfügt über eine Gesamtfläche von 4.000 m²"). 3. Achte auf deutsche Zahlenformate (z.B. 1.005 für tausend-fünf). 4. Regel: Extrahiere IMMER den umgebenden Satz oder die Zeile in 'raw_text_segment'. Rate NIEMALS einen numerischen Wert, ohne den Beweis dafür zu liefern. 5. WICHTIG: Jahreszahlen in Klammern oder direkt dahinter (z.B. "80 (2020)" oder "80 Stand 2021") dürfen NICHT Teil von 'raw_value' sein. "80 (2020)" -> raw_value: 80. 6. WICHTIG: Zitations-Nummern wie "[3]" müssen entfernt werden. "80[3]" -> raw_value: 80. 7. ENTITÄTS-CHECK: Stelle sicher, dass sich die Zahl wirklich auf '{search_term}' für das Unternehmen bezieht und nicht auf einen Wettbewerber. 8. ZEITRAUM-CHECK: Wir suchen JÄHRLICHE Werte. Wenn du "500 Besucher am Tag" und "150.000 im Jahr" findest, nimm IMMER den JÄHRLICHEN Wert. Ignoriere Tages- oder Monatswerte, es sei denn, es gibt gar keine anderen. Bewerte deine Zuversicht (confidence_score) zwischen 0.0 und 1.0: - 0.9 - 1.0: Exakter, aktueller Jahreswert aus zuverlässiger Quelle. - 0.6 - 0.8: Wahrscheinlich korrekt, aber evtl. etwas älter (vor 2022) oder leicht gerundet ("rund 200.000"). - 0.1 - 0.5: Unsicher, ob es sich auf das richtige Unternehmen bezieht, oder nur Tages-/Monatswerte gefunden. Gib NUR ein JSON-Objekt zurück: 'raw_text_segment': Das Snippet für '{search_term}' (z.B. "ca. 1.500 Besucher (2020)"). MUSS IMMER AUSGEFÜLLT SEIN WENN EIN WERT GEFUNDEN WURDE. 'raw_value': Der numerische Wert für '{search_term}'. null, falls nicht gefunden. 'raw_unit': Die Einheit (z.B. "Besucher", "Passagiere"). null, falls nicht gefunden. 'area_text_segment': Das Snippet, das eine Fläche (m²) erwähnt (z.B. "4.000 m² Gesamtfläche"). null, falls nicht gefunden. 'area_value': Der gefundene Wert der Fläche in m² (als Zahl). null, falls nicht gefunden. 'metric_name': '{search_term}'. 'confidence_score': Float zwischen 0.0 und 1.0. 'confidence_reason': Kurze Begründung (z.B. "Klarer Jahreswert 2023"). """.format( industry_name=industry_name, search_term=search_term, text_content_excerpt=text_content[:15000] ) try: response = call_gemini_flash(prompt, temperature=0.05, json_mode=True) return json.loads(response) except Exception as e: logger.error(f"LLM metric extraction failed for '{search_term}': {e}") return None def _parse_standardization_logic(self, formula: str, raw_value: float) -> Optional[float]: if not formula or raw_value is None: return None # Clean formula: Replace 'wert'/'Value' and strip area units like m² or alphanumeric noise # that Notion sync might bring in (e.g. "wert * 25m2" -> "wert * 25") formula_cleaned = formula.replace("wert", str(raw_value)).replace("Value", str(raw_value)) # Remove common unit strings and non-math characters (except dots and parentheses) formula_cleaned = re.sub(r'(?i)m[²2]', '', formula_cleaned) formula_cleaned = re.sub(r'(?i)qm', '', formula_cleaned) # We leave the final safety check to safe_eval_math try: return safe_eval_math(formula_cleaned) except Exception as e: logger.error(f"Failed to parse standardization logic '{formula}' with value {raw_value}: {e}") return None def _extract_and_calculate_metric_cascade( self, db: Session, company: Company, industry_name: str, search_term: str, standardization_logic: Optional[str], standardized_unit: Optional[str] ) -> Dict[str, Any]: results = { "calculated_metric_name": search_term, "calculated_metric_value": None, "calculated_metric_unit": None, "standardized_metric_value": None, "standardized_metric_unit": standardized_unit, "metric_source": None, "metric_proof_text": None, "metric_confidence": 0.0, "metric_confidence_reason": None } # CASCADE: Website -> Wikipedia -> SerpAPI sources = [ ("website", lambda: scrape_website_content(company.website)), ("wikipedia", lambda: self._get_wikipedia_content(db, company.id)), ("serpapi", lambda: " ".join([res.get("snippet", "") for res in run_serp_search(f"{company.name} {company.city or ''} {search_term}").get("organic_results", [])]) if run_serp_search(f"{company.name} {company.city or ''} {search_term}") else None) ] for source_name, content_loader in sources: logger.info(f"Checking {source_name} for '{search_term}' for {company.name}") try: content = content_loader() print(f"--- DEBUG: Content length for {source_name}: {len(content) if content else 0}") if not content: continue llm_result = self._run_llm_metric_extraction_prompt(content, search_term, industry_name) # Handle List response (multiple candidates) -> Take best (first) if isinstance(llm_result, list): llm_result = llm_result[0] if llm_result else None print(f"--- DEBUG: LLM Result for {source_name}: {llm_result}") is_revenue = "umsatz" in search_term.lower() or "revenue" in search_term.lower() # Hybrid Extraction Logic: # 1. Try to parse from the text segment using our robust Python parser (prioritized for German formats) parsed_value = None if llm_result and llm_result.get("raw_text_segment"): # PASS RAW_VALUE AS EXPECTED HINT parsed_value = MetricParser.extract_numeric_value( llm_result["raw_text_segment"], is_revenue=is_revenue, expected_value=str(llm_result.get("raw_value", "")) if llm_result.get("raw_value") else None ) if parsed_value is not None: logger.info(f"Successfully parsed '{llm_result['raw_text_segment']}' to {parsed_value} using MetricParser.") # 2. Fallback to LLM's raw_value if parser failed or no segment found # NEW: Also run MetricParser on the raw_value if it's a string, to catch errors like "802020" final_value = parsed_value if final_value is None and llm_result.get("raw_value"): final_value = MetricParser.extract_numeric_value(str(llm_result["raw_value"]), is_revenue=is_revenue) if final_value is not None: logger.info(f"Successfully cleaned LLM raw_value '{llm_result['raw_value']}' to {final_value}") # Ultimate fallback to original raw_value if still None (though parser is very robust) if final_value is None: final_value = llm_result.get("raw_value") if llm_result and (final_value is not None or llm_result.get("area_value") is not None or llm_result.get("area_text_segment")): results["calculated_metric_value"] = final_value results["calculated_metric_unit"] = llm_result.get("raw_unit") results["metric_source"] = source_name results["metric_proof_text"] = llm_result.get("raw_text_segment") results["metric_confidence"] = llm_result.get("confidence_score") results["metric_confidence_reason"] = llm_result.get("confidence_reason") # 3. Area Extraction Logic (Cascading) area_val = llm_result.get("area_value") # Try to refine area_value if a segment exists if llm_result.get("area_text_segment"): refined_area = MetricParser.extract_numeric_value(llm_result["area_text_segment"], is_revenue=False) if refined_area is not None: area_val = refined_area logger.info(f"Refined area to {area_val} from segment '{llm_result['area_text_segment']}'") if area_val is not None: results["standardized_metric_value"] = area_val elif final_value is not None and standardization_logic: results["standardized_metric_value"] = self._parse_standardization_logic(standardization_logic, final_value) return results except Exception as e: logger.error(f"Error in {source_name} stage: {e}") return results def extract_metrics_for_industry(self, company: Company, db: Session, industry: Industry) -> Company: """ Extracts and calculates metrics for a given industry. Splits out from classify_company_potential to allow manual overrides. """ if not industry or not industry.scraper_search_term: logger.warning(f"No metric configuration for industry '{industry.name if industry else 'None'}'") return company # Derive standardized unit std_unit = "m²" if "m²" in (industry.standardization_logic or "") else "Einheiten" metrics = self._extract_and_calculate_metric_cascade( db, company, industry.name, industry.scraper_search_term, industry.standardization_logic, std_unit ) company.calculated_metric_name = metrics["calculated_metric_name"] company.calculated_metric_value = metrics["calculated_metric_value"] company.calculated_metric_unit = metrics["calculated_metric_unit"] company.standardized_metric_value = metrics["standardized_metric_value"] company.standardized_metric_unit = metrics["standardized_metric_unit"] company.metric_source = metrics["metric_source"] company.metric_proof_text = metrics["metric_proof_text"] company.metric_confidence = metrics["metric_confidence"] company.metric_confidence_reason = metrics["metric_confidence_reason"] # Keep track of refinement company.last_classification_at = datetime.utcnow() db.commit() return company def reevaluate_wikipedia_metric(self, company: Company, db: Session, industry: Industry) -> Company: """ Runs the metric extraction cascade for ONLY the Wikipedia source. """ logger.info(f"Starting Wikipedia re-evaluation for '{company.name}'") if not industry or not industry.scraper_search_term: logger.warning(f"Cannot re-evaluate: No metric configuration for industry '{industry.name}'") return company search_term = industry.scraper_search_term content = self._get_wikipedia_content(db, company.id) if not content: logger.warning("No Wikipedia content found to re-evaluate.") return company try: llm_result = self._run_llm_metric_extraction_prompt(content, search_term, industry.name) # Handle List response (multiple candidates) -> Take best (first) if isinstance(llm_result, list): llm_result = llm_result[0] if llm_result else None if not llm_result: raise ValueError("LLM metric extraction returned empty result.") is_revenue = "umsatz" in search_term.lower() or "revenue" in search_term.lower() # Hybrid Extraction Logic (same as in cascade) parsed_value = None if llm_result.get("raw_text_segment"): parsed_value = MetricParser.extract_numeric_value( llm_result["raw_text_segment"], is_revenue=is_revenue, expected_value=str(llm_result.get("raw_value", "")) if llm_result.get("raw_value") else None ) if parsed_value is not None: logger.info(f"Successfully parsed '{llm_result['raw_text_segment']}' to {parsed_value} using MetricParser.") final_value = parsed_value if final_value is None and llm_result.get("raw_value"): final_value = MetricParser.extract_numeric_value(str(llm_result["raw_value"]), is_revenue=is_revenue) if final_value is not None: logger.info(f"Successfully cleaned LLM raw_value '{llm_result['raw_value']}' to {final_value}") if final_value is None: final_value = llm_result.get("raw_value") # Update company metrics if a value was found if final_value is not None: company.calculated_metric_name = search_term company.calculated_metric_value = final_value company.calculated_metric_unit = llm_result.get("raw_unit") company.metric_source = "wikipedia_reevaluated" company.metric_proof_text = llm_result.get("raw_text_segment") company.metric_confidence = llm_result.get("confidence_score") company.metric_confidence_reason = llm_result.get("confidence_reason") # Handle standardization std_unit = "m²" if "m²" in (industry.standardization_logic or "") else "Einheiten" company.standardized_metric_unit = std_unit area_val = llm_result.get("area_value") if llm_result.get("area_text_segment"): refined_area = MetricParser.extract_numeric_value(llm_result["area_text_segment"], is_revenue=False) if refined_area is not None: area_val = refined_area if area_val is not None: company.standardized_metric_value = area_val elif industry.standardization_logic: company.standardized_metric_value = self._parse_standardization_logic(industry.standardization_logic, final_value) else: company.standardized_metric_value = None company.last_classification_at = datetime.utcnow() db.commit() logger.info(f"Successfully re-evaluated and updated metrics for {company.name} from Wikipedia.") else: logger.warning(f"Re-evaluation for {company.name} did not yield a metric value.") except Exception as e: logger.error(f"Error during Wikipedia re-evaluation for {company.name}: {e}") return company def classify_company_potential(self, company: Company, db: Session) -> Company: logger.info(f"Starting complete classification for {company.name}") # 1. Load Industries industries = self._load_industry_definitions(db) industry_defs = [{"name": i.name, "description": i.description} for i in industries] # 2. Industry Classification (Website-based) # STRENG: Nur wenn Branche noch auf "Others" steht oder neu ist, darf die KI klassifizieren valid_industry_names = [i.name for i in industries] if company.industry_ai and company.industry_ai != "Others" and company.industry_ai in valid_industry_names: logger.info(f"KEEPING manual/existing industry '{company.industry_ai}' for {company.name}") else: website_content = scrape_website_content(company.website) if website_content: industry_name = self._run_llm_classification_prompt(website_content, company.name, industry_defs) company.industry_ai = industry_name if industry_name in valid_industry_names else "Others" logger.info(f"AI CLASSIFIED {company.name} as '{company.industry_ai}'") else: company.industry_ai = "Others" logger.warning(f"No website content for {company.name}, setting industry to Others") db.commit() # 3. Metric Extraction if company.industry_ai != "Others": industry = next((i for i in industries if i.name == company.industry_ai), None) if industry: self.extract_metrics_for_industry(company, db, industry) return company