Files
Floke 57081bf102 [30388f42] Infrastructure Hardening & Final Touches: Stabilized Lead Engine (Nginx routing, manager.py, Dockerfile fixes), restored known-good Nginx configs, and ensured all recent fixes are committed. System is ready for migration.
- Fixed Nginx proxy for /feedback/ and /lead/ routes.
- Restored manager.py to use persistent SQLite DB and corrected test lead triggers.
- Refined Dockerfile for lead-engine to ensure clean dependency installs.
- Applied latest API configs (.env) to lead-engine and duckdns services.
- Updated documentation (GEMINI.md, readme.md, RELOCATION.md, lead-engine/README.md) to reflect final state and lessons learned.
- Committed all pending changes to main branch.
2026-03-07 20:01:48 +00:00

97 lines
4.2 KiB
Python

import time
import json
import logging
import os
import sys
import time
import json
import logging
import os
import sys
import threading
import uvicorn
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("lead-monitor")
# Ensure the lead-engine root is in path for imports
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
if BASE_DIR not in sys.path:
sys.path.append(BASE_DIR)
from db import get_leads
from enrich import refresh_ce_data
# Import the core logic from manager
try:
from trading_twins.manager import process_lead as start_trading_twins_workflow
logger.info("✅ Trading Twins modules imported successfully.")
except ImportError as e:
logger.error(f"❌ Failed to import trading_twins: {e}")
start_trading_twins_workflow = None
def run_monitor():
logger.info("Starting Lead Monitor (Polling CE for updates)...")
while True:
try:
leads = get_leads()
# Filter leads that are synced but missing analysis data
pending_leads = []
for lead in leads:
if lead['status'] == 'synced':
enrichment = json.loads(lead['enrichment_data']) if lead['enrichment_data'] else {}
ce_data = enrichment.get('ce_data', {})
ce_id = enrichment.get('ce_id')
# If we have a CE ID but no vertical/summary yet, it's pending
vertical = ce_data.get('industry_ai') or ce_data.get('vertical')
if ce_id and (not vertical or vertical == 'None'):
pending_leads.append(lead)
if pending_leads:
logger.info(f"Checking {len(pending_leads)} pending leads for analysis updates...")
for lead in pending_leads:
enrichment = json.loads(lead['enrichment_data'])
ce_id = enrichment['ce_id']
logger.info(f" -> Refreshing Lead {lead['id']} ({lead['company_name']})...")
new_data = refresh_ce_data(lead['id'], ce_id)
new_vertical = new_data.get('industry_ai') or new_data.get('vertical')
if new_vertical and new_vertical != 'None':
logger.info(f" [SUCCESS] Analysis finished for {lead['company_name']}: {new_vertical}")
# Trigger Trading Twins Process
if start_trading_twins_workflow:
logger.info(f" [ACTION] Triggering Trading Twins Process for {lead['company_name']}...")
try:
# Extract details for the manager.process_lead function
email = lead.get('email')
name = lead.get('contact_name', 'Interessent')
company = lead.get('company_name', 'Ihre Firma')
opener = new_data.get('ai_opener') or "Vielen Dank für Ihre Anfrage."
request_id = f"lead_{lead['id']}_{int(time.time())}"
if email:
# Calling the function from manager.py
# Signature: process_lead(request_id, company, opener, receiver)
start_trading_twins_workflow(request_id, company, opener, email)
else:
logger.warning(f" [SKIP] No email address found for lead {lead['id']}")
except Exception as e:
logger.error(f" [ERROR] Failed to start workflow: {e}")
else:
logger.warning(" [SKIP] Workflow Logic not available (Import Error)")
except Exception as e:
logger.error(f"Monitor error: {e}")
# Wait before next check
time.sleep(30) # Poll every 30 seconds
if __name__ == "__main__":
run_monitor()