84 lines
3.6 KiB
Python
84 lines
3.6 KiB
Python
import time
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
# Path setup to import local modules
|
|
sys.path.append(os.path.dirname(__file__))
|
|
from db import get_leads
|
|
from enrich import refresh_ce_data
|
|
|
|
# Import our new Trading Twins Orchestrator
|
|
try:
|
|
from trading_twins.orchestrator import TradingTwinsOrchestrator
|
|
except ImportError:
|
|
# Fallback for dev environment or missing dependencies
|
|
TradingTwinsOrchestrator = None
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger("lead-monitor")
|
|
|
|
def run_monitor():
|
|
logger.info("Starting Lead Monitor (Polling CE for updates)...")
|
|
|
|
# Initialize Orchestrator once
|
|
orchestrator = TradingTwinsOrchestrator() if TradingTwinsOrchestrator else None
|
|
|
|
while True:
|
|
try:
|
|
leads = get_leads()
|
|
# Filter leads that are synced but missing analysis data
|
|
pending_leads = []
|
|
for lead in leads:
|
|
if lead['status'] == 'synced':
|
|
enrichment = json.loads(lead['enrichment_data']) if lead['enrichment_data'] else {}
|
|
ce_data = enrichment.get('ce_data', {})
|
|
ce_id = enrichment.get('ce_id')
|
|
|
|
# If we have a CE ID but no vertical/summary yet, it's pending
|
|
vertical = ce_data.get('industry_ai') or ce_data.get('vertical')
|
|
if ce_id and (not vertical or vertical == 'None'):
|
|
pending_leads.append(lead)
|
|
|
|
if pending_leads:
|
|
logger.info(f"Checking {len(pending_leads)} pending leads for analysis updates...")
|
|
for lead in pending_leads:
|
|
enrichment = json.loads(lead['enrichment_data'])
|
|
ce_id = enrichment['ce_id']
|
|
|
|
logger.info(f" -> Refreshing Lead {lead['id']} ({lead['company_name']})...")
|
|
new_data = refresh_ce_data(lead['id'], ce_id)
|
|
|
|
new_vertical = new_data.get('industry_ai') or new_data.get('vertical')
|
|
if new_vertical and new_vertical != 'None':
|
|
logger.info(f" [SUCCESS] Analysis finished for {lead['company_name']}: {new_vertical}")
|
|
|
|
# Trigger Trading Twins Process
|
|
if orchestrator:
|
|
logger.info(f" [ACTION] Triggering Trading Twins Orchestrator for {lead['company_name']}...")
|
|
try:
|
|
# Extract contact details safely
|
|
email = lead.get('email')
|
|
name = lead.get('contact_name', 'Interessent')
|
|
company = lead.get('company_name', 'Ihre Firma')
|
|
|
|
if email:
|
|
orchestrator.process_lead(email, name, company)
|
|
else:
|
|
logger.warning(f" [SKIP] No email address found for lead {lead['id']}")
|
|
except Exception as e:
|
|
logger.error(f" [ERROR] Failed to trigger orchestrator: {e}")
|
|
else:
|
|
logger.warning(" [SKIP] Orchestrator not available (Import Error)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Monitor error: {e}")
|
|
|
|
# Wait before next check
|
|
time.sleep(30) # Poll every 30 seconds
|
|
|
|
if __name__ == "__main__":
|
|
run_monitor()
|