From 07ab0b47b6d30ec6e2f66a5de95def7d25e7774f Mon Sep 17 00:00:00 2001 From: Floke Date: Mon, 2 Mar 2026 10:20:06 +0000 Subject: [PATCH] [31388f42] Fix field naming: Use industry_ai and research_dossier from CE API --- lead-engine/app.py | 20 ++++++++++--- lead-engine/generate_reply.py | 4 +-- lead-engine/monitor.py | 56 +++++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 6 deletions(-) create mode 100644 lead-engine/monitor.py diff --git a/lead-engine/app.py b/lead-engine/app.py index b8f80ea5..e2c99fd4 100644 --- a/lead-engine/app.py +++ b/lead-engine/app.py @@ -4,7 +4,7 @@ from db import get_leads, init_db import json import re import os -from enrich import run_sync, refresh_ce_data +from enrich import run_sync, refresh_ce_data, sync_single_lead from generate_reply import generate_email_draft def clean_html_to_text(html_content): @@ -164,6 +164,17 @@ if not df.empty: c1.write(f"**Purpose:** {meta.get('purpose', '-')}") c1.write(f"**Location:** {meta.get('zip', '')} {meta.get('city', '')}") + # Manual Sync Button for individual lead + if row['status'] == 'new': + if c1.button("🚀 Sync to Company Explorer", key=f"sync_single_{row['id']}"): + with st.spinner("Processing lead (Role + CE Sync)..."): + res = sync_single_lead(row['id']) + if res.get('status') != 'error': + st.success("Lead synced successfully!") + st.rerun() + else: + st.error(f"Sync failed: {res.get('message')}") + with c1.expander("Show Original Email Content"): st.text(clean_html_to_text(row['raw_body'])) if st.checkbox("Show Raw HTML", key=f"raw_{row['id']}"): @@ -181,10 +192,11 @@ if not df.empty: # CE Data Display ce_data = enrichment.get('ce_data', {}) - vertical = ce_data.get('vertical') - summary = ce_data.get('summary') + # Support both names (CE uses industry_ai internally) + vertical = ce_data.get('industry_ai') or ce_data.get('vertical') + summary = ce_data.get('research_dossier') or ce_data.get('summary') - if vertical: + if vertical and vertical != 'None': c2.info(f"**Industry:** {vertical}") else: c2.warning("Industry Analysis pending...") diff --git a/lead-engine/generate_reply.py b/lead-engine/generate_reply.py index 8abcdb3a..4656ab78 100644 --- a/lead-engine/generate_reply.py +++ b/lead-engine/generate_reply.py @@ -47,8 +47,8 @@ def generate_email_draft(lead_data, company_data, booking_link="https://outlook. role = meta.get('role', 'Unbekannt') # Data from Company Explorer - ce_summary = company_data.get('summary', 'Keine Details verfügbar.') - ce_vertical = company_data.get('vertical', 'Allgemein') + ce_summary = company_data.get('research_dossier') or company_data.get('summary', 'Keine Details verfügbar.') + ce_vertical = company_data.get('industry_ai') or company_data.get('vertical', 'Allgemein') ce_website = company_data.get('website', '') # Prompt Engineering diff --git a/lead-engine/monitor.py b/lead-engine/monitor.py new file mode 100644 index 00000000..1aadf4bc --- /dev/null +++ b/lead-engine/monitor.py @@ -0,0 +1,56 @@ +import time +import json +import logging +import os +import sys + +# Path setup to import local modules +sys.path.append(os.path.dirname(__file__)) +from db import get_leads +from enrich import refresh_ce_data + +# Setup logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger("lead-monitor") + +def run_monitor(): + logger.info("Starting Lead Monitor (Polling CE for updates)...") + + while True: + try: + leads = get_leads() + # Filter leads that are synced but missing analysis data + pending_leads = [] + for lead in leads: + if lead['status'] == 'synced': + enrichment = json.loads(lead['enrichment_data']) if lead['enrichment_data'] else {} + ce_data = enrichment.get('ce_data', {}) + ce_id = enrichment.get('ce_id') + + # If we have a CE ID but no vertical/summary yet, it's pending + vertical = ce_data.get('industry_ai') or ce_data.get('vertical') + if ce_id and (not vertical or vertical == 'None'): + pending_leads.append(lead) + + if pending_leads: + logger.info(f"Checking {len(pending_leads)} pending leads for analysis updates...") + for lead in pending_leads: + enrichment = json.loads(lead['enrichment_data']) + ce_id = enrichment['ce_id'] + + logger.info(f" -> Refreshing Lead {lead['id']} ({lead['company_name']})...") + new_data = refresh_ce_data(lead['id'], ce_id) + + new_vertical = new_data.get('industry_ai') or new_data.get('vertical') + if new_vertical and new_vertical != 'None': + logger.info(f" [SUCCESS] Analysis finished for {lead['company_name']}: {new_vertical}") + # Optional: Here we could trigger the Auto-Reply generation in the future + + except Exception as e: + logger.error(f"Monitor error: {e}") + + # Wait before next check + time.sleep(30) # Poll every 30 seconds + +if __name__ == "__main__": + run_monitor()