[2ff88f42] feat(GTM-Engine): Implement Dual Opener Strategy & Harden Analysis

Completed the GTM engine setup:\n\n- Implemented 'Dual Opener' generation (Primary/Secondary) in ClassificationService.\n- Migrated DB to support two opener fields.\n- Updated API and Frontend to handle and display both openers.\n- Fixed bug creating duplicate website_scrape entries.\n- Hardened metric extraction by improving the LLM prompt and adding content length checks.
This commit is contained in:
2026-02-20 15:38:06 +00:00
parent e4d738990a
commit 23d0c695d6
12 changed files with 434 additions and 19 deletions

View File

@@ -97,6 +97,8 @@ class ProvisioningResponse(BaseModel):
website: Optional[str] = None
vertical_name: Optional[str] = None
role_name: Optional[str] = None
opener: Optional[str] = None # Primary opener (Infrastructure/Cleaning)
opener_secondary: Optional[str] = None # Secondary opener (Service/Logistics)
texts: Dict[str, Optional[str]] = {}
# --- Events ---
@@ -243,6 +245,8 @@ def provision_superoffice_contact(
website=company.website,
vertical_name=vertical_name,
role_name=role_name,
opener=company.ai_opener,
opener_secondary=company.ai_opener_secondary,
texts=texts
)
@@ -797,21 +801,29 @@ def run_analysis_task(company_id: int):
logger.info(f"Running Analysis Task for {company.name}")
# 1. Scrape Website (if not locked)
# --- 1. Scrape Website (if not locked) ---
# Check for existing scrape data first
existing_scrape = db.query(EnrichmentData).filter(
EnrichmentData.company_id == company.id,
EnrichmentData.source_type == "website_scrape"
).first()
# If it doesn't exist or is not locked, we perform a scrape
if not existing_scrape or not existing_scrape.is_locked:
from .services.scraping import ScraperService
scrape_res = ScraperService().scrape_url(company.website)
logger.info(f"Scraping website for {company.name}...")
scrape_res = scraper.scrape_url(company.website) # Use singleton
# Now, either create new or update existing
if not existing_scrape:
db.add(EnrichmentData(company_id=company.id, source_type="website_scrape", content=scrape_res))
logger.info("Created new website_scrape entry.")
else:
existing_scrape.content = scrape_res
existing_scrape.updated_at = datetime.utcnow()
logger.info("Updated existing website_scrape entry.")
db.commit()
else:
logger.info("Website scrape is locked. Skipping.")
# 2. Classify Industry & Metrics
# IMPORTANT: Using the new method name and passing db session

View File

@@ -150,7 +150,7 @@ class Industry(Base):
created_at = Column(DateTime, default=datetime.utcnow)
class JobRoleMapping(BaseModel):
class JobRoleMapping(Base):
"""
Maps job title patterns (regex or simple string) to Roles.
"""
@@ -162,7 +162,7 @@ class JobRoleMapping(BaseModel):
created_at = Column(DateTime, default=datetime.utcnow)
class RawJobTitle(BaseModel):
class RawJobTitle(Base):
"""
Stores raw unique job titles imported from CRM to assist in pattern mining.
Tracks frequency to prioritize high-impact patterns.
@@ -180,7 +180,7 @@ class RawJobTitle(BaseModel):
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Persona(BaseModel):
class Persona(Base):
"""
Represents a generalized persona/role (e.g. 'Geschäftsführer', 'IT-Leiter')
independent of the specific job title pattern.

View File

@@ -0,0 +1,58 @@
import sys
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# Add backend path
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import Company, EnrichmentData
from backend.config import settings
def inspect_company(company_name_part):
engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
db = SessionLocal()
try:
print(f"Searching for company containing: '{company_name_part}'...")
companies = db.query(Company).filter(Company.name.ilike(f"%{company_name_part}%")).all()
if not companies:
print("❌ No company found.")
return
for company in companies:
print("\n" + "="*60)
print(f"🏢 COMPANY: {company.name} (ID: {company.id})")
print("="*60)
print(f"🌐 Website: {company.website}")
print(f"🏗️ Industry (AI): {company.industry_ai}")
print(f"📊 Metric: {company.calculated_metric_value} {company.calculated_metric_unit} (Std: {company.standardized_metric_value} m²)")
print(f"✅ Status: {company.status}")
# Enrichment Data
enrichment = db.query(EnrichmentData).filter(EnrichmentData.company_id == company.id).all()
print("\n📚 ENRICHMENT DATA:")
for ed in enrichment:
print(f" 🔹 Type: {ed.source_type} (Locked: {ed.is_locked})")
if ed.source_type == "website_scrape":
content = ed.content
if isinstance(content, dict):
summary = content.get("summary", "No summary")
raw_text = content.get("raw_text", "")
print(f" 📝 Summary: {str(summary)[:200]}...")
print(f" 📄 Raw Text Length: {len(str(raw_text))} chars")
elif ed.source_type == "wikipedia":
content = ed.content
if isinstance(content, dict):
print(f" 🔗 Wiki URL: {content.get('url')}")
print(f" 📄 Content Snippet: {str(content.get('full_text', ''))[:200]}...")
except Exception as e:
print(f"Error: {e}")
finally:
db.close()
if __name__ == "__main__":
inspect_company("Therme Erding")

View File

@@ -0,0 +1,31 @@
from sqlalchemy import create_engine, text
import sys
import os
# Add backend path
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.config import settings
def migrate():
engine = create_engine(settings.DATABASE_URL)
with engine.connect() as conn:
try:
# Check if column exists
print("Checking schema...")
# SQLite specific pragma
result = conn.execute(text("PRAGMA table_info(companies)"))
columns = [row[1] for row in result.fetchall()]
if "ai_opener" in columns:
print("Column 'ai_opener' already exists. Skipping.")
else:
print("Adding column 'ai_opener' to 'companies' table...")
conn.execute(text("ALTER TABLE companies ADD COLUMN ai_opener TEXT"))
conn.commit()
print("✅ Migration successful.")
except Exception as e:
print(f"❌ Migration failed: {e}")
if __name__ == "__main__":
migrate()

View File

@@ -0,0 +1,41 @@
import sys
import os
import logging
# Add backend path
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
# Mock logging
logging.basicConfig(level=logging.INFO)
# Import Service
from backend.services.classification import ClassificationService
def test_opener_generation():
service = ClassificationService()
print("\n--- TEST: Therme Erding (Primary Focus: Hygiene) ---")
op_prim = service._generate_marketing_opener(
company_name="Therme Erding",
website_text="Größte Therme der Welt, 35 Saunen, Rutschenparadies Galaxy, Wellenbad. Täglich tausende Besucher.",
industry_name="Leisure - Wet & Spa",
industry_pains="Rutschgefahr und Hygiene",
focus_mode="primary"
)
print(f"Primary Opener: {op_prim}")
print("\n--- TEST: Dachser Logistik (Secondary Focus: Process) ---")
op_sec = service._generate_marketing_opener(
company_name="Dachser SE",
website_text="Globaler Logistikdienstleister, Warehousing, Food Logistics, Air & Sea Logistics. Intelligent Logistics.",
industry_name="Logistics - Warehouse",
industry_pains="Effizienz und Sicherheit",
focus_mode="secondary"
)
print(f"Secondary Opener: {op_sec}")
if __name__ == "__main__":
try:
test_opener_generation()
except Exception as e:
print(f"Test Failed (likely due to missing env/deps): {e}")

View File

@@ -75,10 +75,12 @@ Source Text:
{text_content[:6000]}
Return a JSON object with:
- "raw_value": The number found (e.g. 352 or 352.0). If text says "352 Betten", extract 352. If not found, null.
- "raw_value": The number found (e.g. 352 or 352.0). If not found, null.
- "raw_unit": The unit found (e.g. "Betten", "").
- "proof_text": A short quote from the text proving this value.
**IMPORTANT:** Ignore obvious year numbers (like 1900-2026) if other, more plausible metric values are present in the text. Focus on the target metric.
JSON ONLY.
"""
try:
@@ -159,8 +161,8 @@ JSON ONLY.
try:
args = (company,) if source_name == 'website' else (db, company.id) if source_name == 'wikipedia' else (company, search_term)
content_text, current_source_url = content_loader(*args)
if not content_text:
logger.info(f"No content for {source_name}.")
if not content_text or len(content_text) < 100:
logger.info(f"No or insufficient content for {source_name} (Length: {len(content_text) if content_text else 0}).")
continue
llm_result = self._run_llm_metric_extraction_prompt(content_text, search_term, industry_name)
if llm_result:
@@ -224,13 +226,68 @@ JSON ONLY.
company.metric_confidence_reason = metrics["metric_confidence_reason"]
company.last_classification_at = datetime.utcnow()
db.commit()
# REMOVED: db.commit() - This should be handled by the calling function.
return company
def reevaluate_wikipedia_metric(self, company: Company, db: Session, industry: Industry) -> Company:
logger.info(f"Re-evaluating metric for {company.name}...")
return self.extract_metrics_for_industry(company, db, industry)
def _generate_marketing_opener(self, company_name: str, website_text: str, industry_name: str, industry_pains: str, focus_mode: str = "primary") -> Optional[str]:
"""
Generates the 'First Sentence' (Opener).
focus_mode: 'primary' (Standard/Cleaning) or 'secondary' (Service/Logistics).
"""
if not industry_pains:
industry_pains = "Effizienz und Personalmangel" # Fallback
# Dynamic Focus Instruction
if focus_mode == "secondary":
focus_instruction = """
- **FOKUS: SEKUNDÄR-PROZESSE (Logistik/Service/Versorgung).**
- Ignoriere das Thema Reinigung. Konzentriere dich auf **Abläufe, Materialfluss, Entlastung von Fachkräften** oder **Gäste-Service**.
- Der Satz muss einen operativen Entscheider (z.B. Pflegedienstleitung, Produktionsleiter) abholen."""
else:
focus_instruction = """
- **FOKUS: PRIMÄR-PROZESSE (Infrastruktur/Sauberkeit/Sicherheit).**
- Konzentriere dich auf Anforderungen an das Facility Management, Hygiene, Außenwirkung oder Arbeitssicherheit.
- Der Satz muss einen Infrastruktur-Entscheider (z.B. FM-Leiter, Geschäftsführer) abholen."""
prompt = f"""
Du bist ein exzellenter B2B-Stratege und Texter.
Deine Aufgabe ist es, einen hochpersonalisierten Einleitungssatz für eine E-Mail an ein potenzielles Kundenunternehmen zu formulieren.
--- KONTEXT ---
Zielunternehmen: {company_name}
Branche: {industry_name}
Operative Herausforderung (Pain): "{industry_pains}"
Webseiten-Kontext:
{website_text[:2500]}
--- Denkprozess & Stilvorgaben ---
1. **Analysiere den Kontext:** Verstehe das Kerngeschäft.
2. **Identifiziere den Hebel:** Was ist der Erfolgsfaktor in Bezug auf den FOKUS?
3. **Formuliere den Satz (ca. 20-35 Wörter):**
- Wähle einen eleganten, aktiven Einstieg.
- Verbinde die **Tätigkeit** mit dem **Hebel** und den **Konsequenzen**.
- **WICHTIG:** Formuliere als positive Beobachtung über eine Kernkompetenz.
- **VERMEIDE:** Konkrete Zahlen.
- Verwende den Firmennamen: {company_name}.
{focus_instruction}
--- Deine Ausgabe ---
Gib NUR den finalen Satz aus. Keine Anführungszeichen.
"""
try:
response = call_gemini_flash(prompt)
if response:
return response.strip().strip('"')
return None
except Exception as e:
logger.error(f"Opener Generation Error: {e}")
return None
def classify_company_potential(self, company: Company, db: Session) -> Company:
logger.info(f"Starting classification for {company.name}...")
@@ -249,12 +306,29 @@ JSON ONLY.
suggested_industry_name = self._run_llm_classification_prompt(website_content, company.name, industry_defs)
logger.info(f"AI suggests industry: {suggested_industry_name}")
# 4. Update Company
# Match back to DB object
# 4. Update Company & Generate Openers
matched_industry = next((i for i in industries if i.name == suggested_industry_name), None)
if matched_industry:
company.industry_ai = matched_industry.name
# --- Generate PRIMARY Opener (Infrastructure/Cleaning) ---
op_prim = self._generate_marketing_opener(
company.name, website_content, matched_industry.name, matched_industry.pains, "primary"
)
if op_prim:
company.ai_opener = op_prim
logger.info(f"Opener (Primary): {op_prim}")
# --- Generate SECONDARY Opener (Service/Logistics) ---
# Only if relevant (could be optimized, but generating always is safer for "Dual Strategy")
op_sec = self._generate_marketing_opener(
company.name, website_content, matched_industry.name, matched_industry.pains, "secondary"
)
if op_sec:
company.ai_opener_secondary = op_sec
logger.info(f"Opener (Secondary): {op_sec}")
else:
company.industry_ai = "Others"