Überarbeite den Prompt für die Generierung der Atomic Opener. Die Opener werden nun auf genau zwei Sätze begrenzt, um Prägnanz zu gewährleisten. Der Ton ist faktenbasierter und leitet direkter zu den operativen Herausforderungen über, anstatt generische Lobhudelei zu verwenden.
373 lines
20 KiB
Python
373 lines
20 KiB
Python
from typing import Tuple
|
|
import json
|
|
import logging
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
from sqlalchemy.orm import Session, joinedload
|
|
|
|
from backend.database import Company, Industry, RoboticsCategory, EnrichmentData
|
|
from backend.lib.core_utils import call_gemini_flash, safe_eval_math, run_serp_search
|
|
from backend.services.scraping import scrape_website_content
|
|
from backend.lib.metric_parser import MetricParser
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ClassificationService:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def _load_industry_definitions(self, db: Session) -> List[Industry]:
|
|
industries = db.query(Industry).options(
|
|
joinedload(Industry.primary_category),
|
|
joinedload(Industry.secondary_category)
|
|
).all()
|
|
if not industries:
|
|
logger.warning("No industry definitions found in DB.")
|
|
return industries
|
|
|
|
def _get_wikipedia_content(self, db: Session, company_id: int) -> Optional[Dict[str, Any]]:
|
|
enrichment = db.query(EnrichmentData).filter(
|
|
EnrichmentData.company_id == company_id,
|
|
EnrichmentData.source_type == "wikipedia"
|
|
).order_by(EnrichmentData.created_at.desc()).first()
|
|
return enrichment.content if enrichment and enrichment.content else None
|
|
|
|
def _run_llm_classification_prompt(self, website_text: str, company_name: str, industry_definitions: List[Dict[str, str]]) -> Optional[str]:
|
|
prompt = f"""
|
|
Act as a strict B2B Industry Classifier.
|
|
Company: {company_name}
|
|
Context: {website_text[:3000]}
|
|
|
|
Available Industries:
|
|
{json.dumps(industry_definitions, indent=2)}
|
|
|
|
Task: Select the ONE industry that best matches the company.
|
|
If the company is a Hospital/Klinik, select 'Healthcare - Hospital'.
|
|
If none match well, select 'Others'.
|
|
|
|
Return ONLY the exact name of the industry.
|
|
"""
|
|
try:
|
|
response = call_gemini_flash(prompt)
|
|
if not response: return "Others"
|
|
cleaned = response.strip().replace('"', '').replace("'", "")
|
|
valid_names = [i['name'] for i in industry_definitions] + ["Others"]
|
|
if cleaned in valid_names: return cleaned
|
|
for name in valid_names:
|
|
if name in cleaned: return name
|
|
return "Others"
|
|
except Exception as e:
|
|
logger.error(f"Classification Prompt Error: {e}")
|
|
return "Others"
|
|
|
|
def _run_llm_metric_extraction_prompt(self, text_content: str, search_term: str, industry_name: str) -> Optional[Dict[str, Any]]:
|
|
prompt = f"""
|
|
Extract the following metric for the company in industry '{industry_name}':
|
|
Target Metric: "{search_term}"
|
|
|
|
Source Text:
|
|
{text_content[:6000]}
|
|
|
|
Return a JSON object with:
|
|
- "raw_value": The number found (e.g. 352 or 352.0). If not found, null.
|
|
- "raw_unit": The unit found (e.g. "Betten", "m²").
|
|
- "proof_text": A short quote from the text proving this value.
|
|
|
|
JSON ONLY.
|
|
"""
|
|
try:
|
|
response = call_gemini_flash(prompt, json_mode=True)
|
|
if not response: return None
|
|
if isinstance(response, str):
|
|
try:
|
|
data = json.loads(response.replace("```json", "").replace("```", "").strip())
|
|
except: return None
|
|
else:
|
|
data = response
|
|
if isinstance(data, list) and data: data = data[0]
|
|
if not isinstance(data, dict): return None
|
|
if data.get("raw_value") == "null": data["raw_value"] = None
|
|
return data
|
|
except Exception as e:
|
|
logger.error(f"LLM Extraction Parse Error: {e}")
|
|
return None
|
|
|
|
def _is_metric_plausible(self, metric_name: str, value: Optional[float]) -> bool:
|
|
if value is None: return False
|
|
try: return float(value) > 0
|
|
except: return False
|
|
|
|
def _parse_standardization_logic(self, formula: str, raw_value: float) -> Optional[float]:
|
|
if not formula or raw_value is None: return None
|
|
# Clean formula: remove anything in parentheses first (often units or comments)
|
|
clean_formula = re.sub(r'\(.*?\)', '', formula.lower())
|
|
# Replace 'wert' with the actual value
|
|
expression = clean_formula.replace("wert", str(raw_value))
|
|
# Remove any non-math characters
|
|
expression = re.sub(r'[^0-9\.\+\-\*\/]', '', expression)
|
|
try:
|
|
return safe_eval_math(expression)
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse logic '{formula}' with value {raw_value}: {e}")
|
|
return None
|
|
|
|
def _get_best_metric_result(self, results_list: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
|
|
if not results_list: return None
|
|
source_priority = {"wikipedia": 0, "website": 1, "serpapi": 2}
|
|
valid_results = [r for r in results_list if r.get("calculated_metric_value") is not None]
|
|
if not valid_results: return None
|
|
valid_results.sort(key=lambda r: source_priority.get(r.get("metric_source"), 99))
|
|
return valid_results[0]
|
|
|
|
def _get_website_content_and_url(self, db: Session, company: Company) -> Tuple[Optional[str], Optional[str]]:
|
|
enrichment = db.query(EnrichmentData).filter_by(company_id=company.id, source_type="website_scrape").order_by(EnrichmentData.created_at.desc()).first()
|
|
if enrichment and enrichment.content and "raw_text" in enrichment.content:
|
|
return enrichment.content["raw_text"], company.website
|
|
content = scrape_website_content(company.website)
|
|
return content, company.website
|
|
|
|
def _get_wikipedia_content_and_url(self, db: Session, company_id: int) -> Tuple[Optional[str], Optional[str]]:
|
|
wiki_data = self._get_wikipedia_content(db, company_id)
|
|
return (wiki_data.get('full_text'), wiki_data.get('url')) if wiki_data else (None, None)
|
|
|
|
def _get_serpapi_content_and_url(self, company: Company, search_term: str) -> Tuple[Optional[str], Optional[str]]:
|
|
serp_results = run_serp_search(f"{company.name} {company.city or ''} {search_term}")
|
|
if not serp_results: return None, None
|
|
content = " ".join([res.get("snippet", "") for res in serp_results.get("organic_results", [])])
|
|
url = serp_results.get("organic_results", [{}])[0].get("link") if serp_results.get("organic_results") else None
|
|
return content, url
|
|
|
|
def _extract_and_calculate_metric_cascade(self, db: Session, company: Company, industry_name: str, search_term: str, standardization_logic: Optional[str], standardized_unit: Optional[str]) -> Dict[str, Any]:
|
|
final_result = {"calculated_metric_name": search_term, "calculated_metric_value": None, "calculated_metric_unit": None, "standardized_metric_value": None, "standardized_metric_unit": standardized_unit, "metric_source": None, "proof_text": None, "metric_source_url": None}
|
|
sources = [
|
|
("website", lambda: self._get_website_content_and_url(db, company)),
|
|
("wikipedia", lambda: self._get_wikipedia_content_and_url(db, company.id)),
|
|
("serpapi", lambda: self._get_serpapi_content_and_url(company, search_term))
|
|
]
|
|
all_source_results = []
|
|
parser = MetricParser()
|
|
for source_name, content_loader in sources:
|
|
logger.info(f" -> Checking source: [{source_name.upper()}] for '{search_term}'")
|
|
try:
|
|
content_text, current_source_url = content_loader()
|
|
if not content_text or len(content_text) < 100: continue
|
|
llm_result = self._run_llm_metric_extraction_prompt(content_text, search_term, industry_name)
|
|
if llm_result and llm_result.get("proof_text"):
|
|
# Use the robust parser on the LLM's proof text or raw_value
|
|
hint = llm_result.get("raw_value") or llm_result.get("proof_text")
|
|
parsed_value = parser.extract_numeric_value(text=content_text, expected_value=str(hint))
|
|
if parsed_value is not None:
|
|
llm_result.update({"calculated_metric_value": parsed_value, "calculated_metric_unit": llm_result.get('raw_unit'), "metric_source": source_name, "metric_source_url": current_source_url})
|
|
all_source_results.append(llm_result)
|
|
except Exception as e: logger.error(f" -> Error in {source_name} stage: {e}")
|
|
|
|
best_result = self._get_best_metric_result(all_source_results)
|
|
if not best_result: return final_result
|
|
final_result.update(best_result)
|
|
if self._is_metric_plausible(search_term, final_result['calculated_metric_value']):
|
|
final_result['standardized_metric_value'] = self._parse_standardization_logic(standardization_logic, final_result['calculated_metric_value'])
|
|
return final_result
|
|
|
|
def _find_direct_area(self, db: Session, company: Company, industry_name: str) -> Optional[Dict[str, Any]]:
|
|
logger.info(" -> (Helper) Running specific search for 'Fläche'...")
|
|
area_metrics = self._extract_and_calculate_metric_cascade(db, company, industry_name, search_term="Fläche", standardization_logic=None, standardized_unit="m²")
|
|
if area_metrics and area_metrics.get("calculated_metric_value") is not None:
|
|
unit = (area_metrics.get("calculated_metric_unit") or "").lower()
|
|
if any(u in unit for u in ["m²", "qm", "quadratmeter"]):
|
|
logger.info(" ✅ SUCCESS: Found direct area value.")
|
|
area_metrics['standardized_metric_value'] = area_metrics['calculated_metric_value']
|
|
return area_metrics
|
|
return None
|
|
|
|
def _summarize_website_for_opener(self, company_name: str, website_text: str) -> str:
|
|
"""
|
|
Creates a high-quality summary of the website content to provide
|
|
better context for the opener generation.
|
|
"""
|
|
prompt = f"""
|
|
**Rolle:** Du bist ein erfahrener B2B-Marktanalyst mit Fokus auf Facility Management und Gebäudereinigung.
|
|
**Aufgabe:** Analysiere den Website-Text des Unternehmens '{company_name}' und erstelle ein prägnantes Dossier.
|
|
|
|
**Deine Analyse besteht aus ZWEI TEILEN:**
|
|
|
|
**TEIL 1: Geschäftsmodell-Analyse**
|
|
1. Identifiziere die Kernprodukte und/oder Dienstleistungen des Unternehmens.
|
|
2. Fasse in 2-3 prägnanten Sätzen zusammen, was das Unternehmen macht und für welche Kunden.
|
|
|
|
**TEIL 2: Reinigungspotenzial & Hygiene-Analyse**
|
|
1. Scanne den Text gezielt nach Hinweisen auf große Bodenflächen, Publikumsverkehr oder hohe Hygieneanforderungen (Schlüsselwörter: Reinigung, Sauberkeit, Hygiene, Bodenpflege, Verkaufsfläche, Logistikhalle, Patientenversorgung, Gästeerlebnis).
|
|
2. Bewerte das Potenzial für automatisierte Reinigungslösungen auf einer Skala (Hoch / Mittel / Niedrig).
|
|
3. Extrahiere die 1-2 wichtigsten Sätze, die diese Anforderungen oder die Größe der Einrichtung belegen.
|
|
|
|
**Antworte AUSSCHLIESSLICH im folgenden exakten Format:**
|
|
GESCHÄFTSMODELL: <Deine 2-3 Sätze über das Kerngeschäft des Unternehmens.>
|
|
REINIGUNGSPOTENZIAL: <Hoch / Mittel / Niedrig / Kein Hinweis>
|
|
HYGIENE-BEWEISE: <Die 1-2 aussagekräftigsten Sätze als Bullet Points (* Satz 1...)>
|
|
|
|
**Hier ist der Website-Text:**
|
|
{website_text[:5000]}
|
|
"""
|
|
try:
|
|
response = call_gemini_flash(prompt)
|
|
return response.strip() if response else "Keine Zusammenfassung möglich."
|
|
except Exception as e:
|
|
logger.error(f"Summary Error: {e}")
|
|
return "Fehler bei der Zusammenfassung."
|
|
|
|
def _generate_marketing_opener(self, company: Company, industry: Industry, context_text: str, focus_mode: str = "primary") -> Optional[str]:
|
|
if not industry: return None
|
|
|
|
# 1. Determine Product Category & Context
|
|
category = industry.primary_category
|
|
raw_pains = industry.pains or ""
|
|
raw_gains = industry.gains or ""
|
|
|
|
if focus_mode == "secondary" and industry.ops_focus_secondary and industry.secondary_category:
|
|
category = industry.secondary_category
|
|
|
|
product_name = category.name if category else "Robotik-Lösungen"
|
|
product_desc = category.description if category and category.description else "Automatisierung von operativen Prozessen"
|
|
|
|
# Split pains/gains based on markers
|
|
def extract_segment(text, marker):
|
|
if not text: return ""
|
|
segments = re.split(r'\[(.*?)\]', text)
|
|
for i in range(1, len(segments), 2):
|
|
if marker.lower() in segments[i].lower():
|
|
return segments[i+1].strip()
|
|
return text
|
|
|
|
relevant_pains = extract_segment(raw_pains, "Primary Product")
|
|
relevant_gains = extract_segment(raw_gains, "Primary Product")
|
|
|
|
if focus_mode == "secondary" and industry.ops_focus_secondary and industry.secondary_category:
|
|
relevant_pains = extract_segment(raw_pains, "Secondary Product")
|
|
relevant_gains = extract_segment(raw_gains, "Secondary Product")
|
|
|
|
prompt = f"""
|
|
Du bist ein scharfsinniger Marktbeobachter und Branchenexperte. Formuliere eine prägnante Einleitung (genau 2 Sätze) für ein Anschreiben an das Unternehmen {company.name}.
|
|
|
|
DEINE PERSONA:
|
|
Ein direkter Branchenkenner, der eine relevante Beobachtung teilt. Dein Ton ist faktenbasiert, professionell und absolut NICHT verkäuferisch. Dein Ziel ist es, schnell zur operativen Herausforderung überzuleiten.
|
|
|
|
STRATEGISCHER HINTERGRUND (Nicht nennen!):
|
|
Dieses Unternehmen wird kontaktiert, weil sein Geschäftsmodell perfekt zu folgendem Bereich passt: "{product_name}" ({product_desc}).
|
|
Ziel des Schreibens ist es, die Branchen-Herausforderungen "{relevant_pains}" zu adressieren und die Mehrwerte "{relevant_gains}" zu ermöglichen.
|
|
|
|
DEINE AUFGABE:
|
|
1. Firmenname kürzen: Kürze "{company.name}" sinnvoll (meist erste zwei Worte). Entferne UNBEDINGT Rechtsformen wie GmbH, AG, gGmbH, e.V. etc.
|
|
2. Struktur: Genau 2 flüssige Sätze. NICHT MEHR.
|
|
3. Inhalt:
|
|
- Satz 1: Eine faktenbasierte, relevante Beobachtung zum Geschäftsmodell oder einem aktuellen Fokus des Unternehmens (siehe Analyse-Dossier). Vermeide Lobhudelei und generische Floskeln.
|
|
- Satz 2: Leite direkt und prägnant zu einer spezifischen operativen Herausforderung über, die für das Unternehmen aufgrund seiner Größe oder Branche relevant ist (orientiere dich an "{relevant_pains}").
|
|
4. STRENGES VERBOT: Nenne KEIN Produkt ("{product_name}") und biete KEINE "Lösungen", "Hilfe" oder "Zusammenarbeit" an. Der Text soll eine reine Beobachtung bleiben.
|
|
5. KEINE Anrede (kein "Sehr geehrte Damen und Herren", kein "Hallo").
|
|
|
|
KONTEXT (Analyse-Dossier):
|
|
{context_text}
|
|
|
|
BEISPIEL-STIL:
|
|
"Das Kreiskrankenhaus Weilburg leistet einen bedeutenden Beitrag zur regionalen Patientenversorgung. Bei der lückenlosen Dokumentation und den strengen Hygienevorgaben im Klinikalltag ist die Aufrechterhaltung höchster Standards jedoch eine enorme operative Herausforderung."
|
|
|
|
AUSGABE: Nur der fertige Text.
|
|
"""
|
|
try:
|
|
response = call_gemini_flash(prompt)
|
|
return response.strip().strip('"') if response else None
|
|
except Exception as e:
|
|
logger.error(f"Opener Error: {e}")
|
|
return None
|
|
|
|
def _sync_company_address_data(self, db: Session, company: Company):
|
|
"""Extracts address and VAT data from website scrape if available."""
|
|
from ..database import EnrichmentData
|
|
enrichment = db.query(EnrichmentData).filter_by(
|
|
company_id=company.id, source_type="website_scrape"
|
|
).order_by(EnrichmentData.created_at.desc()).first()
|
|
|
|
if enrichment and enrichment.content and "impressum" in enrichment.content:
|
|
imp = enrichment.content["impressum"]
|
|
if imp and isinstance(imp, dict):
|
|
changed = False
|
|
# City
|
|
if imp.get("city") and not company.city:
|
|
company.city = imp.get("city")
|
|
changed = True
|
|
# Street
|
|
if imp.get("street") and not company.street:
|
|
company.street = imp.get("street")
|
|
changed = True
|
|
# Zip / PLZ
|
|
zip_val = imp.get("zip") or imp.get("plz")
|
|
if zip_val and not company.zip_code:
|
|
company.zip_code = zip_val
|
|
changed = True
|
|
# Country
|
|
if imp.get("country_code") and (not company.country or company.country == "DE"):
|
|
company.country = imp.get("country_code")
|
|
changed = True
|
|
# VAT ID
|
|
if imp.get("vat_id") and not company.crm_vat:
|
|
company.crm_vat = imp.get("vat_id")
|
|
changed = True
|
|
|
|
if changed:
|
|
db.commit()
|
|
logger.info(f"Updated Address/VAT from Impressum for {company.name}: City={company.city}, VAT={company.crm_vat}")
|
|
|
|
def classify_company_potential(self, company: Company, db: Session) -> Company:
|
|
logger.info(f"--- Starting FULL Analysis v3.0 for {company.name} ---")
|
|
|
|
# Ensure metadata is synced from scrape
|
|
self._sync_company_address_data(db, company)
|
|
|
|
industries = self._load_industry_definitions(db)
|
|
website_content, _ = self._get_website_content_and_url(db, company)
|
|
if not website_content or len(website_content) < 100:
|
|
company.status = "ENRICH_FAILED"
|
|
db.commit()
|
|
return company
|
|
|
|
industry_defs = [{"name": i.name, "description": i.description} for i in industries]
|
|
suggested_industry_name = self._run_llm_classification_prompt(website_content, company.name, industry_defs)
|
|
matched_industry = next((i for i in industries if i.name == suggested_industry_name), None)
|
|
if not matched_industry:
|
|
company.industry_ai = "Others"
|
|
db.commit()
|
|
return company
|
|
|
|
company.industry_ai = matched_industry.name
|
|
logger.info(f"✅ Industry: {matched_industry.name}")
|
|
|
|
metrics = self._find_direct_area(db, company, matched_industry.name)
|
|
if not metrics:
|
|
logger.info(" -> No direct area. Trying proxy...")
|
|
if matched_industry.scraper_search_term:
|
|
metrics = self._extract_and_calculate_metric_cascade(db, company, matched_industry.name, search_term=matched_industry.scraper_search_term, standardization_logic=matched_industry.standardization_logic, standardized_unit="m²")
|
|
|
|
if metrics and metrics.get("calculated_metric_value"):
|
|
logger.info(f" ✅ SUCCESS: {metrics.get('calculated_metric_value')} {metrics.get('calculated_metric_unit')}")
|
|
company.calculated_metric_name = metrics.get("calculated_metric_name", matched_industry.scraper_search_term or "Fläche")
|
|
company.calculated_metric_value = metrics.get("calculated_metric_value")
|
|
company.calculated_metric_unit = metrics.get("calculated_metric_unit")
|
|
company.standardized_metric_value = metrics.get("standardized_metric_value")
|
|
company.standardized_metric_unit = metrics.get("standardized_metric_unit")
|
|
company.metric_source = metrics.get("metric_source")
|
|
company.metric_proof_text = metrics.get("proof_text")
|
|
company.metric_source_url = metrics.get("metric_source_url")
|
|
company.metric_confidence = 0.8
|
|
company.metric_confidence_reason = "Metric processed."
|
|
|
|
# NEW: Two-Step approach with summarization
|
|
website_summary = self._summarize_website_for_opener(company.name, website_content)
|
|
company.research_dossier = website_summary
|
|
|
|
company.ai_opener = self._generate_marketing_opener(company, matched_industry, website_summary, "primary")
|
|
company.ai_opener_secondary = self._generate_marketing_opener(company, matched_industry, website_summary, "secondary")
|
|
company.last_classification_at = datetime.utcnow()
|
|
company.status = "ENRICHED"
|
|
db.commit()
|
|
logger.info(f"--- ✅ Analysis Finished for {company.name} ---")
|
|
return company |