import time import json import logging import os import sys import time import json import logging import os import sys import threading import uvicorn # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("lead-monitor") # Ensure the lead-engine root is in path for imports BASE_DIR = os.path.dirname(os.path.abspath(__file__)) if BASE_DIR not in sys.path: sys.path.append(BASE_DIR) from db import get_leads from enrich import refresh_ce_data # Import the core logic from manager try: from trading_twins.manager import process_lead as start_trading_twins_workflow logger.info("✅ Trading Twins modules imported successfully.") except ImportError as e: logger.error(f"❌ Failed to import trading_twins: {e}") start_trading_twins_workflow = None def run_monitor(): logger.info("Starting Lead Monitor (Polling CE for updates)...") while True: try: leads = get_leads() # Filter leads that are synced but missing analysis data pending_leads = [] for lead in leads: if lead['status'] == 'synced': enrichment = json.loads(lead['enrichment_data']) if lead['enrichment_data'] else {} ce_data = enrichment.get('ce_data', {}) ce_id = enrichment.get('ce_id') # If we have a CE ID but no vertical/summary yet, it's pending vertical = ce_data.get('industry_ai') or ce_data.get('vertical') if ce_id and (not vertical or vertical == 'None'): pending_leads.append(lead) if pending_leads: logger.info(f"Checking {len(pending_leads)} pending leads for analysis updates...") for lead in pending_leads: enrichment = json.loads(lead['enrichment_data']) ce_id = enrichment['ce_id'] logger.info(f" -> Refreshing Lead {lead['id']} ({lead['company_name']})...") new_data = refresh_ce_data(lead['id'], ce_id) new_vertical = new_data.get('industry_ai') or new_data.get('vertical') if new_vertical and new_vertical != 'None': logger.info(f" [SUCCESS] Analysis finished for {lead['company_name']}: {new_vertical}") # Trigger Trading Twins Process if start_trading_twins_workflow: logger.info(f" [ACTION] Triggering Trading Twins Process for {lead['company_name']}...") try: # Extract details for the manager.process_lead function email = lead.get('email') name = lead.get('contact_name', 'Interessent') company = lead.get('company_name', 'Ihre Firma') opener = new_data.get('ai_opener') or "Vielen Dank für Ihre Anfrage." request_id = f"lead_{lead['id']}_{int(time.time())}" if email: # Calling the function from manager.py # Signature: process_lead(request_id, company, opener, receiver) start_trading_twins_workflow(request_id, company, opener, email) else: logger.warning(f" [SKIP] No email address found for lead {lead['id']}") except Exception as e: logger.error(f" [ERROR] Failed to start workflow: {e}") else: logger.warning(" [SKIP] Workflow Logic not available (Import Error)") except Exception as e: logger.error(f"Monitor error: {e}") # Wait before next check time.sleep(30) # Poll every 30 seconds if __name__ == "__main__": run_monitor()