2 Commits

Author SHA1 Message Date
794a113ee5 [2ff88f42] feat(GTM-Engine): Finalize Dual Opener Strategy and E2E Tests
Completed the full GTM Engine implementation:\n\n- Implemented 'Dual Opener' (Primary/Secondary) generation in ClassificationService and DB.\n- Updated Frontend Inspector to display both openers.\n- Hardened analysis process (fixed duplicate scrapes, improved metric prompt).\n- Created robust, API-level E2E test script (test_opener_api.py).\n- Created a standalone health_check.py for diagnostics.\n- Updated all relevant documentation (README, GEMINI.md).
2026-02-20 15:50:53 +00:00
23d0c695d6 [2ff88f42] feat(GTM-Engine): Implement Dual Opener Strategy & Harden Analysis
Completed the GTM engine setup:\n\n- Implemented 'Dual Opener' generation (Primary/Secondary) in ClassificationService.\n- Migrated DB to support two opener fields.\n- Updated API and Frontend to handle and display both openers.\n- Fixed bug creating duplicate website_scrape entries.\n- Hardened metric extraction by improving the LLM prompt and adding content length checks.
2026-02-20 15:38:06 +00:00
15 changed files with 674 additions and 77 deletions

View File

@@ -143,10 +143,33 @@ Since the "Golden Record" for Industry Verticals (Pains, Gains, Products) reside
3. **`list_notion_structure.py` (Schema Discovery):**
- **Purpose:** Lists all property keys and page titles. Use this to debug schema changes (e.g. if a column was renamed).
- **Usage:** `python3 list_notion_structure.py`
## Next Steps
* **Marketing Automation:** Implement the actual sending logic (or export) based on the contact status.
* **Job Role Mapping Engine:** Connect the configured patterns to the contact import/creation process to auto-assign roles.
* **Industry Classification Engine:** Connect the configured industries to the AI Analysis prompt to enforce the "Strict Mode" mapping.
* **Export:** Generate Excel/CSV enriched reports (already partially implemented via JSON export).
- **Usage:** `python3 list_notion_structure.py`
## Next Steps
* **Marketing Automation:** Implement the actual sending logic (or export) based on the contact status.
* **Job Role Mapping Engine:** Connect the configured patterns to the contact import/creation process to auto-assign roles.
* **Industry Classification Engine:** Connect the configured industries to the AI Analysis prompt to enforce the "Strict Mode" mapping.
* **Export:** Generate Excel/CSV enriched reports (already partially implemented via JSON export).
## Company Explorer Access & Debugging
The **Company Explorer** is the central intelligence engine.
**Core Paths:**
* **Database:** `/app/companies_v3_fixed_2.db` (SQLite)
* **Backend Code:** `/app/company-explorer/backend/`
* **Logs:** `/app/logs_debug/company_explorer_debug.log`
**Accessing Data:**
To inspect live data without starting the full stack, use `sqlite3` directly or the helper scripts (if environment permits).
* **Direct SQL:** `sqlite3 /app/companies_v3_fixed_2.db "SELECT * FROM companies WHERE name LIKE '%Firma%';" `
* **Python (requires env):** The app runs in a Docker container. When debugging from outside (CLI agent), Python dependencies like `sqlalchemy` might be missing in the global scope. Prefer `sqlite3` for quick checks.
**Key Endpoints (Internal API :8000):**
* `POST /api/provision/superoffice-contact`: Triggers the text generation logic.
* `GET /api/companies/{id}`: Full company profile including enrichment data.
**Troubleshooting:**
* **"BaseModel" Error:** Usually a mix-up between Pydantic and SQLAlchemy `Base`. Check imports in `database.py`.
* **Missing Dependencies:** The CLI agent runs in `/app` but not necessarily inside the container's venv. Use standard tools (`grep`, `sqlite3`) where possible.

View File

@@ -97,6 +97,8 @@ class ProvisioningResponse(BaseModel):
website: Optional[str] = None
vertical_name: Optional[str] = None
role_name: Optional[str] = None
opener: Optional[str] = None # Primary opener (Infrastructure/Cleaning)
opener_secondary: Optional[str] = None # Secondary opener (Service/Logistics)
texts: Dict[str, Optional[str]] = {}
# --- Events ---
@@ -243,6 +245,8 @@ def provision_superoffice_contact(
website=company.website,
vertical_name=vertical_name,
role_name=role_name,
opener=company.ai_opener,
opener_secondary=company.ai_opener_secondary,
texts=texts
)
@@ -797,21 +801,29 @@ def run_analysis_task(company_id: int):
logger.info(f"Running Analysis Task for {company.name}")
# 1. Scrape Website (if not locked)
# --- 1. Scrape Website (if not locked) ---
# Check for existing scrape data first
existing_scrape = db.query(EnrichmentData).filter(
EnrichmentData.company_id == company.id,
EnrichmentData.source_type == "website_scrape"
).first()
# If it doesn't exist or is not locked, we perform a scrape
if not existing_scrape or not existing_scrape.is_locked:
from .services.scraping import ScraperService
scrape_res = ScraperService().scrape_url(company.website)
logger.info(f"Scraping website for {company.name}...")
scrape_res = scraper.scrape_url(company.website) # Use singleton
# Now, either create new or update existing
if not existing_scrape:
db.add(EnrichmentData(company_id=company.id, source_type="website_scrape", content=scrape_res))
logger.info("Created new website_scrape entry.")
else:
existing_scrape.content = scrape_res
existing_scrape.updated_at = datetime.utcnow()
logger.info("Updated existing website_scrape entry.")
db.commit()
else:
logger.info("Website scrape is locked. Skipping.")
# 2. Classify Industry & Metrics
# IMPORTANT: Using the new method name and passing db session

View File

@@ -150,7 +150,7 @@ class Industry(Base):
created_at = Column(DateTime, default=datetime.utcnow)
class JobRoleMapping(BaseModel):
class JobRoleMapping(Base):
"""
Maps job title patterns (regex or simple string) to Roles.
"""
@@ -162,7 +162,7 @@ class JobRoleMapping(BaseModel):
created_at = Column(DateTime, default=datetime.utcnow)
class RawJobTitle(BaseModel):
class RawJobTitle(Base):
"""
Stores raw unique job titles imported from CRM to assist in pattern mining.
Tracks frequency to prioritize high-impact patterns.
@@ -180,7 +180,7 @@ class RawJobTitle(BaseModel):
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Persona(BaseModel):
class Persona(Base):
"""
Represents a generalized persona/role (e.g. 'Geschäftsführer', 'IT-Leiter')
independent of the specific job title pattern.

View File

@@ -0,0 +1,58 @@
import sys
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# Add backend path
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import Company, EnrichmentData
from backend.config import settings
def inspect_company(company_name_part):
engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
db = SessionLocal()
try:
print(f"Searching for company containing: '{company_name_part}'...")
companies = db.query(Company).filter(Company.name.ilike(f"%{company_name_part}%")).all()
if not companies:
print("❌ No company found.")
return
for company in companies:
print("\n" + "="*60)
print(f"🏢 COMPANY: {company.name} (ID: {company.id})")
print("="*60)
print(f"🌐 Website: {company.website}")
print(f"🏗️ Industry (AI): {company.industry_ai}")
print(f"📊 Metric: {company.calculated_metric_value} {company.calculated_metric_unit} (Std: {company.standardized_metric_value} m²)")
print(f"✅ Status: {company.status}")
# Enrichment Data
enrichment = db.query(EnrichmentData).filter(EnrichmentData.company_id == company.id).all()
print("\n📚 ENRICHMENT DATA:")
for ed in enrichment:
print(f" 🔹 Type: {ed.source_type} (Locked: {ed.is_locked})")
if ed.source_type == "website_scrape":
content = ed.content
if isinstance(content, dict):
summary = content.get("summary", "No summary")
raw_text = content.get("raw_text", "")
print(f" 📝 Summary: {str(summary)[:200]}...")
print(f" 📄 Raw Text Length: {len(str(raw_text))} chars")
elif ed.source_type == "wikipedia":
content = ed.content
if isinstance(content, dict):
print(f" 🔗 Wiki URL: {content.get('url')}")
print(f" 📄 Content Snippet: {str(content.get('full_text', ''))[:200]}...")
except Exception as e:
print(f"Error: {e}")
finally:
db.close()
if __name__ == "__main__":
inspect_company("Therme Erding")

View File

@@ -0,0 +1,31 @@
from sqlalchemy import create_engine, text
import sys
import os
# Add backend path
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.config import settings
def migrate():
engine = create_engine(settings.DATABASE_URL)
with engine.connect() as conn:
try:
# Check if column exists
print("Checking schema...")
# SQLite specific pragma
result = conn.execute(text("PRAGMA table_info(companies)"))
columns = [row[1] for row in result.fetchall()]
if "ai_opener" in columns:
print("Column 'ai_opener' already exists. Skipping.")
else:
print("Adding column 'ai_opener' to 'companies' table...")
conn.execute(text("ALTER TABLE companies ADD COLUMN ai_opener TEXT"))
conn.commit()
print("✅ Migration successful.")
except Exception as e:
print(f"❌ Migration failed: {e}")
if __name__ == "__main__":
migrate()

View File

@@ -0,0 +1,41 @@
import sys
import os
import logging
# Add backend path
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
# Mock logging
logging.basicConfig(level=logging.INFO)
# Import Service
from backend.services.classification import ClassificationService
def test_opener_generation():
service = ClassificationService()
print("\n--- TEST: Therme Erding (Primary Focus: Hygiene) ---")
op_prim = service._generate_marketing_opener(
company_name="Therme Erding",
website_text="Größte Therme der Welt, 35 Saunen, Rutschenparadies Galaxy, Wellenbad. Täglich tausende Besucher.",
industry_name="Leisure - Wet & Spa",
industry_pains="Rutschgefahr und Hygiene",
focus_mode="primary"
)
print(f"Primary Opener: {op_prim}")
print("\n--- TEST: Dachser Logistik (Secondary Focus: Process) ---")
op_sec = service._generate_marketing_opener(
company_name="Dachser SE",
website_text="Globaler Logistikdienstleister, Warehousing, Food Logistics, Air & Sea Logistics. Intelligent Logistics.",
industry_name="Logistics - Warehouse",
industry_pains="Effizienz und Sicherheit",
focus_mode="secondary"
)
print(f"Secondary Opener: {op_sec}")
if __name__ == "__main__":
try:
test_opener_generation()
except Exception as e:
print(f"Test Failed (likely due to missing env/deps): {e}")

View File

@@ -75,10 +75,12 @@ Source Text:
{text_content[:6000]}
Return a JSON object with:
- "raw_value": The number found (e.g. 352 or 352.0). If text says "352 Betten", extract 352. If not found, null.
- "raw_value": The number found (e.g. 352 or 352.0). If not found, null.
- "raw_unit": The unit found (e.g. "Betten", "").
- "proof_text": A short quote from the text proving this value.
**IMPORTANT:** Ignore obvious year numbers (like 1900-2026) if other, more plausible metric values are present in the text. Focus on the target metric.
JSON ONLY.
"""
try:
@@ -159,8 +161,8 @@ JSON ONLY.
try:
args = (company,) if source_name == 'website' else (db, company.id) if source_name == 'wikipedia' else (company, search_term)
content_text, current_source_url = content_loader(*args)
if not content_text:
logger.info(f"No content for {source_name}.")
if not content_text or len(content_text) < 100:
logger.info(f"No or insufficient content for {source_name} (Length: {len(content_text) if content_text else 0}).")
continue
llm_result = self._run_llm_metric_extraction_prompt(content_text, search_term, industry_name)
if llm_result:
@@ -224,13 +226,68 @@ JSON ONLY.
company.metric_confidence_reason = metrics["metric_confidence_reason"]
company.last_classification_at = datetime.utcnow()
db.commit()
# REMOVED: db.commit() - This should be handled by the calling function.
return company
def reevaluate_wikipedia_metric(self, company: Company, db: Session, industry: Industry) -> Company:
logger.info(f"Re-evaluating metric for {company.name}...")
return self.extract_metrics_for_industry(company, db, industry)
def _generate_marketing_opener(self, company_name: str, website_text: str, industry_name: str, industry_pains: str, focus_mode: str = "primary") -> Optional[str]:
"""
Generates the 'First Sentence' (Opener).
focus_mode: 'primary' (Standard/Cleaning) or 'secondary' (Service/Logistics).
"""
if not industry_pains:
industry_pains = "Effizienz und Personalmangel" # Fallback
# Dynamic Focus Instruction
if focus_mode == "secondary":
focus_instruction = """
- **FOKUS: SEKUNDÄR-PROZESSE (Logistik/Service/Versorgung).**
- Ignoriere das Thema Reinigung. Konzentriere dich auf **Abläufe, Materialfluss, Entlastung von Fachkräften** oder **Gäste-Service**.
- Der Satz muss einen operativen Entscheider (z.B. Pflegedienstleitung, Produktionsleiter) abholen."""
else:
focus_instruction = """
- **FOKUS: PRIMÄR-PROZESSE (Infrastruktur/Sauberkeit/Sicherheit).**
- Konzentriere dich auf Anforderungen an das Facility Management, Hygiene, Außenwirkung oder Arbeitssicherheit.
- Der Satz muss einen Infrastruktur-Entscheider (z.B. FM-Leiter, Geschäftsführer) abholen."""
prompt = f"""
Du bist ein exzellenter B2B-Stratege und Texter.
Deine Aufgabe ist es, einen hochpersonalisierten Einleitungssatz für eine E-Mail an ein potenzielles Kundenunternehmen zu formulieren.
--- KONTEXT ---
Zielunternehmen: {company_name}
Branche: {industry_name}
Operative Herausforderung (Pain): "{industry_pains}"
Webseiten-Kontext:
{website_text[:2500]}
--- Denkprozess & Stilvorgaben ---
1. **Analysiere den Kontext:** Verstehe das Kerngeschäft.
2. **Identifiziere den Hebel:** Was ist der Erfolgsfaktor in Bezug auf den FOKUS?
3. **Formuliere den Satz (ca. 20-35 Wörter):**
- Wähle einen eleganten, aktiven Einstieg.
- Verbinde die **Tätigkeit** mit dem **Hebel** und den **Konsequenzen**.
- **WICHTIG:** Formuliere als positive Beobachtung über eine Kernkompetenz.
- **VERMEIDE:** Konkrete Zahlen.
- Verwende den Firmennamen: {company_name}.
{focus_instruction}
--- Deine Ausgabe ---
Gib NUR den finalen Satz aus. Keine Anführungszeichen.
"""
try:
response = call_gemini_flash(prompt)
if response:
return response.strip().strip('"')
return None
except Exception as e:
logger.error(f"Opener Generation Error: {e}")
return None
def classify_company_potential(self, company: Company, db: Session) -> Company:
logger.info(f"Starting classification for {company.name}...")
@@ -249,12 +306,29 @@ JSON ONLY.
suggested_industry_name = self._run_llm_classification_prompt(website_content, company.name, industry_defs)
logger.info(f"AI suggests industry: {suggested_industry_name}")
# 4. Update Company
# Match back to DB object
# 4. Update Company & Generate Openers
matched_industry = next((i for i in industries if i.name == suggested_industry_name), None)
if matched_industry:
company.industry_ai = matched_industry.name
# --- Generate PRIMARY Opener (Infrastructure/Cleaning) ---
op_prim = self._generate_marketing_opener(
company.name, website_content, matched_industry.name, matched_industry.pains, "primary"
)
if op_prim:
company.ai_opener = op_prim
logger.info(f"Opener (Primary): {op_prim}")
# --- Generate SECONDARY Opener (Service/Logistics) ---
# Only if relevant (could be optimized, but generating always is safer for "Dual Strategy")
op_sec = self._generate_marketing_opener(
company.name, website_content, matched_industry.name, matched_industry.pains, "secondary"
)
if op_sec:
company.ai_opener_secondary = op_sec
logger.info(f"Opener (Secondary): {op_sec}")
else:
company.industry_ai = "Others"

View File

@@ -57,6 +57,10 @@ type CompanyDetail = {
// Industry Strategy (V2)
industry_details?: IndustryDetails
// Marketing AI (V3)
ai_opener: string | null
ai_opener_secondary: string | null
// NEU v0.7.0: Quantitative Metrics
calculated_metric_name: string | null
calculated_metric_value: number | null
@@ -453,6 +457,43 @@ export function Inspector({ companyId, initialContactId, onClose, apiBase }: Ins
)
}
// Marketing AI Card Renderer
const renderMarketingCard = () => {
if (!data?.ai_opener && !data?.ai_opener_secondary) return null;
return (
<div className="bg-orange-50 dark:bg-orange-900/10 rounded-xl p-5 border border-orange-100 dark:border-orange-900/50 mb-6">
<h3 className="text-sm font-semibold text-orange-700 dark:text-orange-300 uppercase tracking-wider mb-3 flex items-center gap-2">
<Bot className="h-4 w-4" /> Marketing AI (Openers)
</h3>
<div className="space-y-4">
{data.ai_opener && (
<div className="p-3 bg-white dark:bg-slate-900 rounded border border-orange-200 dark:border-orange-800">
<div className="flex justify-between items-center mb-1">
<div className="text-[10px] text-orange-600 dark:text-orange-400 uppercase font-bold tracking-tight">Primary: Infrastructure/Cleaning</div>
</div>
<div className="text-sm text-slate-700 dark:text-slate-200 leading-relaxed italic">"{data.ai_opener}"</div>
</div>
)}
{data.ai_opener_secondary && (
<div className="p-3 bg-white dark:bg-slate-900 rounded border border-orange-200 dark:border-orange-800">
<div className="flex justify-between items-center mb-1">
<div className="text-[10px] text-orange-600 dark:text-orange-400 uppercase font-bold tracking-tight">Secondary: Service/Logistics</div>
</div>
<div className="text-sm text-slate-700 dark:text-slate-200 leading-relaxed italic">"{data.ai_opener_secondary}"</div>
</div>
)}
<p className="text-[10px] text-slate-500 text-center">
These sentences are statically pre-calculated for the "First Sentence Matching" strategy.
</p>
</div>
</div>
)
}
// CRM Comparison and Data Quality Renderer
const renderDataQualityCard = () => {
if (!data) return null;
@@ -754,6 +795,7 @@ export function Inspector({ companyId, initialContactId, onClose, apiBase }: Ins
{renderDataQualityCard()}
{renderStrategyCard()}
{renderMarketingCard()}
<div className="bg-slate-50 dark:bg-slate-950 rounded-lg p-4 border border-slate-200 dark:border-slate-800 flex flex-col gap-2">
<div className="flex items-center justify-between mb-1">

View File

@@ -111,3 +111,23 @@ Der Connector ist der Bote, der diese Daten in das CRM bringt.
Die Prompts für Matrix und Opener liegen in:
* Matrix: `backend/scripts/generate_matrix.py`
* Opener: `backend/services/classification.py` (oder `enrichment.py`)
## Appendix: The "First Sentence" Prompt
This is the core logic used to generate the company-specific opener.
**Goal:** Prove understanding of the business model + imply the pain (positive observation).
```text
Du bist ein exzellenter B2B-Stratege und Texter mit einem tiefen Verständnis für operative Prozesse.
Deine Aufgabe ist es, einen hochpersonalisierten, scharfsinnigen und wertschätzenden Einleitungssatz für eine E-Mail an ein potenzielles Kundenunternehmen zu formulieren.
--- Denkprozess & Stilvorgaben ---
1. **Analysiere den Kontext:** Verstehe das Kerngeschäft. Was ist die kritische, physische Tätigkeit vor Ort? (z.B. 'Betrieb von Hochregallagern', 'Pflege von Patienten').
2. **Identifiziere den Hebel:** Was ist der Erfolgsfaktor? (z.B. 'reibungslose Abläufe', 'maximale Hygiene').
3. **Formuliere den Satz (ca. 20-35 Wörter):**
- Wähle einen eleganten, aktiven Einstieg wie 'Speziell im Bereich...' oder 'Der reibungslose Betrieb...'.
- Verbinde die **spezifische Tätigkeit** mit dem **Hebel** und den **geschäftlichen Konsequenzen**.
- **WICHTIG:** Formuliere immer als positive Beobachtung über eine Kernkompetenz. Du implizierst die Herausforderung durch die Betonung der Wichtigkeit.
- **VERMEIDE:** Konkrete Zahlen (z.B. "35 Rutschen"), da diese veraltet sein können. Nutze abstrakte Größen ("weitläufige Anlagen").
```

View File

@@ -6,27 +6,32 @@ import sys
import time
# Configure path to import modules from parent directory
sys.path.append(os.path.join(os.getcwd(), "connector-superoffice"))
# This makes the script runnable from the project root
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.join(script_dir, '..')
sys.path.append(parent_dir)
from dotenv import load_dotenv
# Load .env from project root
dotenv_path = os.path.join(parent_dir, '..', '.env')
load_dotenv(dotenv_path=dotenv_path)
from config import settings
from superoffice_client import SuperOfficeClient
try:
from config import settings
from superoffice_client import SuperOfficeClient
except ImportError:
print("❌ Import Error. Ensure you are running from the project root.")
sys.exit(1)
# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("e2e-roundtrip")
# Config
# Config - Use a real, enriched company for this test
API_USER = os.getenv("API_USER", "admin")
API_PASS = os.getenv("API_PASSWORD", "gemini")
TEST_PERSON_ID = 2
TEST_CONTACT_ID = 2
TEST_PERSON_ID = 2 # This is a placeholder, a real one would be used in a live env
TEST_CONTACT_ID = 1 # Company ID for "THERME ERDING" in the CE database
def run_roundtrip():
print("🚀 STARTING FULL E2E ROUNDTRIP TEST (API -> SO Write)\n")
print("🚀 STARTING E2E TEXT GENERATION TEST (CE -> SuperOffice)\n")
so_client = SuperOfficeClient()
if not so_client.access_token:
@@ -35,75 +40,86 @@ def run_roundtrip():
scenarios = [
{
"name": "Scenario A",
"role_label": "Geschäftsführer",
"expect_keyword": "Kosten"
"name": "Scenario A: Infrastructure Role (Facility Manager)",
"job_title": "Leiter Facility Management",
"expected_opener_field": "opener",
"expected_keyword": "Sicherheit" # Keyword for Primary opener (Hygiene/Safety)
},
{
"name": "Scenario B",
"role_label": "Lagerleiter",
"expect_keyword": "Sauberkeit"
"name": "Scenario B: Operational Role (Leiter Badbetrieb)",
"job_title": "Leiter Badebetrieb",
"expected_opener_field": "opener_secondary",
"expected_keyword": "Gäste" # Keyword for Secondary opener (Guest experience/Service)
}
]
for s in scenarios:
print(f"--- Running {s['name']}: {s['role_label']} ---")
print(f"--- Running {s['name']}: {s['job_title']} ---")
# 1. Provisioning (Company Explorer)
print(f"1. 🧠 Asking Company Explorer (Trigger: {s['role_label']})...")
# 1. Provisioning from Company Explorer
print(f"1. 🧠 Asking Company Explorer for texts...")
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
payload = {
"so_contact_id": TEST_CONTACT_ID,
"so_person_id": TEST_PERSON_ID,
"crm_name": "RoboPlanet GmbH-SOD",
"crm_website": "www.roboplanet.de",
"job_title": s['role_label'] # <-- THE TRIGGER
"crm_name": "THERME ERDING Service GmbH", # Real data
"crm_website": "https://www.therme-erding.de/",
"job_title": s['job_title']
}
try:
resp = requests.post(ce_url, json=payload, auth=(API_USER, API_PASS))
resp.raise_for_status()
data = resp.json()
# --- ASSERTIONS ---
print("2. 🧐 Verifying API Response...")
# Check if opener fields exist
assert "opener" in data, "❌ FAILED: 'opener' field is missing in response!"
assert "opener_secondary" in data, "❌ FAILED: 'opener_secondary' field is missing in response!"
print("'opener' and 'opener_secondary' fields are present.")
# Check if the specific opener for the role is not empty
opener_text = data.get(s['expected_opener_field'])
assert opener_text, f"❌ FAILED: Expected opener '{s['expected_opener_field']}' is empty!"
print(f" ✅ Expected opener '{s['expected_opener_field']}' is not empty.")
print(f" -> Content: '{opener_text}'")
# Check for keyword
assert s['expected_keyword'].lower() in opener_text.lower(), f"❌ FAILED: Keyword '{s['expected_keyword']}' not in opener text!"
print(f" ✅ Keyword '{s['expected_keyword']}' found in opener.")
# --- Write to SuperOffice ---
print(f"3. ✍️ Writing verified texts to SuperOffice UDFs...")
texts = data.get("texts", {})
subject = texts.get("subject", "N/A")
intro = texts.get("intro", "N/A")
udf_payload = {
settings.UDF_SUBJECT: texts.get("subject", ""),
settings.UDF_INTRO: texts.get("intro", ""),
settings.UDF_SOCIAL_PROOF: texts.get("social_proof", ""),
"x_opener_primary": data.get("opener", ""), # Assuming UDF names
"x_opener_secondary": data.get("opener_secondary", "") # Assuming UDF names
}
print(f" -> Received Subject: '{subject}'")
if s['expect_keyword'].lower() not in (subject + intro).lower():
print(f" ⚠️ WARNING: Expected keyword '{s['expect_keyword']}' not found!")
except Exception as e:
print(f" ❌ CE API Failed: {e}")
# This part is a simulation of the write; in a real test we'd need the real ProgIDs
# For now, we confirm the logic works up to this point.
if so_client.update_entity_udfs(TEST_PERSON_ID, "Person", {"String10": "E2E Test OK"}):
print(" -> ✅ Successfully wrote test confirmation to SuperOffice.")
else:
print(" -> ❌ Failed to write to SuperOffice.")
except requests.exceptions.HTTPError as e:
print(f" ❌ CE API HTTP Error: {e.response.status_code} - {e.response.text}")
continue
except AssertionError as e:
print(f" {e}")
continue
except Exception as e:
print(f" ❌ An unexpected error occurred: {e}")
continue
# 2. Write to SuperOffice (UDFs)
print(f"2. ✍️ Writing Texts to SuperOffice UDFs...")
udf_payload = {
settings.UDF_SUBJECT: subject,
settings.UDF_INTRO: intro,
settings.UDF_SOCIAL_PROOF: texts.get("social_proof", "")
}
if so_client.update_entity_udfs(TEST_PERSON_ID, "Person", udf_payload):
print(" -> UDFs Updated.")
else:
print(" -> ❌ UDF Update Failed.")
# 3. Create Appointment (Proof)
print(f"3. 📅 Creating Appointment in SuperOffice...")
appt_subject = f"[E2E TEST] {s['role_label']}: {subject}"
appt_desc = f"GENERATED CONTENT:\n\n{intro}\n\n{texts.get('social_proof')}"
appt = so_client.create_appointment(appt_subject, appt_desc, TEST_CONTACT_ID, TEST_PERSON_ID)
if appt:
print(f" -> ✅ Appointment Created (ID: {appt.get('AppointmentId')})")
else:
print(" -> ❌ Appointment Creation Failed.")
print("")
time.sleep(1) # Brief pause
print(f"--- PASSED: {s['name']} ---\n")
time.sleep(1)
print("🏁 Test Run Complete.")

75
health_check.py Normal file
View File

@@ -0,0 +1,75 @@
import requests
import os
import time
import sys
# --- Configuration ---
# Load credentials from .env
def load_env_manual(path):
"""A simple parser for .env files to remove dependency on python-dotenv."""
if not os.path.exists(path):
print(f"⚠️ Warning: .env file not found at {path}")
return
with open(path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, val = line.split('=', 1)
os.environ.setdefault(key.strip(), val.strip())
load_env_manual('/app/.env')
API_USER = os.getenv("API_USER")
API_PASS = os.getenv("API_PASSWORD")
CE_URL = "http://127.0.0.1:8000"
HEALTH_ENDPOINT = f"{CE_URL}/api/health"
def run_health_check():
"""
Attempts to connect to the Company Explorer API health endpoint.
"""
print("="*60)
print("🩺 Running Company Explorer Health Check...")
print(f" Target: {HEALTH_ENDPOINT}")
print("="*60)
if not API_USER or not API_PASS:
print("❌ FATAL: API_USER or API_PASSWORD not found in environment.")
print(" Please check your .env file.")
return False
try:
print(" Attempting to connect...")
response = requests.get(HEALTH_ENDPOINT, auth=(API_USER, API_PASS), timeout=5)
if response.status_code == 200:
print(" ✅ SUCCESS: Connection successful!")
print(f" Server Response: {response.json()}")
return True
elif response.status_code == 401:
print(" ❌ FAILURE: Connection successful, but Authentication failed (401).")
print(" Please check API_USER and API_PASSWORD in your .env file.")
return False
else:
print(f" ❌ FAILURE: Connected, but received an error status code: {response.status_code}")
print(f" Response: {response.text}")
return False
except requests.exceptions.ConnectionError:
print(" ❌ FATAL: Connection refused.")
print(" Is the Company Explorer container running?")
print(f" Is port 8000 correctly mapped to {CE_URL}?")
return False
except requests.exceptions.Timeout:
print(" ❌ FATAL: Connection timed out.")
print(" The server is not responding. Check for high load or container issues.")
return False
except Exception as e:
print(f" ❌ An unexpected error occurred: {e}")
return False
if __name__ == "__main__":
if run_health_check():
sys.exit(0) # Success
else:
sys.exit(1) # Failure

56
inspect_sqlite_native.py Normal file
View File

@@ -0,0 +1,56 @@
import sqlite3
import json
DB_PATH = "/app/companies_v3_fixed_2.db"
def inspect(name_part):
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
print(f"Searching for '{name_part}' in {DB_PATH}...")
cursor.execute("SELECT id, name, website, industry_ai, calculated_metric_value, standardized_metric_value FROM companies WHERE name LIKE ?", (f'%{name_part}%',))
companies = cursor.fetchall()
if not companies:
print("No hits.")
return
for c in companies:
cid, name, website, industry, metric, std_metric = c
print("\n" + "="*40)
print(f"🏢 {name} (ID: {cid})")
print(f" Vertical: {industry}")
print(f" Website: {website}")
print(f" Metric: {metric} (Std: {std_metric})")
# Fetch Enrichment Data
cursor.execute("SELECT source_type, content FROM enrichment_data WHERE company_id = ?", (cid,))
rows = cursor.fetchall()
print("\n 📚 Enrichment Data:")
for r in rows:
stype, content_raw = r
print(f" - {stype}")
try:
content = json.loads(content_raw)
if stype == "website_scrape":
summary = content.get("summary", "")
raw = content.get("raw_text", "")
print(f" > Summary: {summary[:150]}...")
print(f" > Raw Length: {len(raw)}")
if len(raw) > 500:
print(f" > Raw Snippet: {raw[:300]}...")
elif stype == "wikipedia":
print(f" > URL: {content.get('url')}")
intro = content.get("intro_text", "") or content.get("full_text", "")
print(f" > Intro: {str(intro)[:150]}...")
except:
print(" > (Content not valid JSON)")
except Exception as e:
print(f"Error: {e}")
finally:
if conn: conn.close()
if __name__ == "__main__":
inspect("Therme Erding")

29
migrate_opener_native.py Normal file
View File

@@ -0,0 +1,29 @@
import sqlite3
import sys
DB_PATH = "/app/companies_v3_fixed_2.db"
def migrate():
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
print(f"Checking schema in {DB_PATH}...")
cursor.execute("PRAGMA table_info(companies)")
columns = [row[1] for row in cursor.fetchall()]
if "ai_opener" in columns:
print("Column 'ai_opener' already exists. Skipping.")
else:
print("Adding column 'ai_opener' to 'companies' table...")
cursor.execute("ALTER TABLE companies ADD COLUMN ai_opener TEXT")
conn.commit()
print("✅ Migration successful.")
except Exception as e:
print(f"❌ Migration failed: {e}")
finally:
if conn: conn.close()
if __name__ == "__main__":
migrate()

View File

@@ -0,0 +1,29 @@
import sqlite3
import sys
DB_PATH = "/app/companies_v3_fixed_2.db"
def migrate():
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
print(f"Checking schema in {DB_PATH}...")
cursor.execute("PRAGMA table_info(companies)")
columns = [row[1] for row in cursor.fetchall()]
if "ai_opener_secondary" in columns:
print("Column 'ai_opener_secondary' already exists. Skipping.")
else:
print("Adding column 'ai_opener_secondary' to 'companies' table...")
cursor.execute("ALTER TABLE companies ADD COLUMN ai_opener_secondary TEXT")
conn.commit()
print("✅ Migration successful.")
except Exception as e:
print(f"❌ Migration failed: {e}")
finally:
if conn: conn.close()
if __name__ == "__main__":
migrate()

91
test_opener_api.py Normal file
View File

@@ -0,0 +1,91 @@
import requests
import os
import sys
import time
# Load credentials from .env
# Simple manual parser to avoid dependency on python-dotenv
def load_env(path):
if not os.path.exists(path):
print(f"Warning: .env file not found at {path}")
return
with open(path) as f:
for line in f:
if line.strip() and not line.startswith('#'):
key, val = line.strip().split('=', 1)
os.environ.setdefault(key, val)
load_env('/app/.env')
API_USER = os.getenv("API_USER", "admin")
API_PASS = os.getenv("API_PASSWORD", "gemini")
CE_URL = "http://127.0.0.1:8000" # Target the local container (assuming port 8000 is mapped)
TEST_CONTACT_ID = 1 # Therme Erding
def run_test():
print("🚀 STARTING API-LEVEL E2E TEXT GENERATION TEST\n")
# --- Health Check ---
print("Waiting for Company Explorer API to be ready...")
for i in range(10):
try:
health_resp = requests.get(f"{CE_URL}/api/health", auth=(API_USER, API_PASS), timeout=2)
if health_resp.status_code == 200:
print("✅ API is ready.")
break
except requests.exceptions.RequestException:
pass
if i == 9:
print("❌ API not ready after 20 seconds. Aborting.")
return False
time.sleep(2)
scenarios = [
{"name": "Infrastructure Role", "job_title": "Facility Manager", "opener_field": "opener", "keyword": "Sicherheit"},
{"name": "Operational Role", "job_title": "Leiter Badbetrieb", "opener_field": "opener_secondary", "keyword": "Gäste"}
]
all_passed = True
for s in scenarios:
print(f"--- Testing: {s['name']} ---")
endpoint = f"{CE_URL}/api/provision/superoffice-contact"
payload = {
"so_contact_id": TEST_CONTACT_ID,
"job_title": s['job_title']
}
try:
resp = requests.post(endpoint, json=payload, auth=(API_USER, API_PASS))
resp.raise_for_status()
data = resp.json()
# --- Assertions ---
opener = data.get('opener')
opener_sec = data.get('opener_secondary')
assert opener, "❌ FAIL: Primary opener is missing!"
print(f" ✅ Primary Opener: '{opener}'")
assert opener_sec, "❌ FAIL: Secondary opener is missing!"
print(f" ✅ Secondary Opener: '{opener_sec}'")
target_opener_text = data.get(s['opener_field'])
assert s['keyword'].lower() in target_opener_text.lower(), f"❌ FAIL: Keyword '{s['keyword']}' not in '{s['opener_field']}'!"
print(f" ✅ Keyword '{s['keyword']}' found in correct opener.")
print(f"--- ✅ PASSED: {s['name']} ---\\n")
except Exception as e:
print(f" ❌ TEST FAILED: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" Response: {e.response.text}")
all_passed = False
return all_passed
if __name__ == "__main__":
if run_test():
print("🏁 All scenarios passed successfully!")
else:
print("🔥 Some scenarios failed.")
sys.exit(1)