Files
Brancheneinstufung2/lead-engine/app.py

249 lines
9.9 KiB
Python

import streamlit as st
import pandas as pd
from db import get_leads, init_db, reset_lead
import json
import re
import os
from enrich import run_sync, refresh_ce_data, sync_single_lead
from generate_reply import generate_email_draft
def clean_html_to_text(html_content):
"""Simple helper to convert HTML email body to readable plain text."""
if not html_content:
return ""
# Remove head and style tags entirely
clean = re.sub(r'<head.*?>.*?</head>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
clean = re.sub(r'<style.*?>.*?</style>', '', clean, flags=re.DOTALL | re.IGNORECASE)
# Replace <br> and </p> with newlines
clean = re.sub(r'<br\s*/?>', '\n', clean, flags=re.IGNORECASE)
clean = re.sub(r'</p>', '\n', clean, flags=re.IGNORECASE)
# Remove all other tags
clean = re.sub(r'<.*?>', '', clean)
# Decode some common entities
clean = clean.replace('&nbsp;', ' ').replace('&amp;', '&').replace('&quot;', '"')
# Cleanup multiple newlines
clean = re.sub(r'\n\s*\n+', '\n\n', clean).strip()
return clean
st.set_page_config(page_title="TradingTwins Lead Engine", layout="wide")
st.title("🚀 Lead Engine: TradingTwins")
# Sidebar Actions
st.sidebar.header("Actions")
if st.sidebar.button("1. Ingest Emails (Mock)"):
from ingest import ingest_mock_leads
init_db()
count = ingest_mock_leads()
st.sidebar.success(f"Ingested {count} new leads.")
st.rerun()
if st.sidebar.button("2. Ingest Real Emails (Graph API)"):
try:
from trading_twins_ingest import process_leads
with st.spinner("Fetching emails from Microsoft Graph..."):
count = process_leads()
if count > 0:
st.sidebar.success(f"Successfully ingested {count} new leads form inbox!")
else:
st.sidebar.info("No new leads found in inbox.")
st.rerun()
except Exception as e:
st.sidebar.error(f"Ingest failed: {e}")
if st.sidebar.button("3. Sync to Company Explorer"):
with st.spinner("Syncing with Company Explorer API..."):
# Capture output for debugging
try:
# We redirect stdout to capture prints
import io
from contextlib import redirect_stdout
f = io.StringIO()
with redirect_stdout(f):
run_sync()
output = f.getvalue()
st.success("Sync finished!")
with st.expander("See Process Log", expanded=True):
st.code(output)
except Exception as e:
st.error(f"Sync Failed: {e}")
if st.sidebar.checkbox("Show System Debug"):
st.sidebar.subheader("System Diagnostics")
# 1. API Key Check
from lookup_role import get_gemini_key
key = get_gemini_key()
if key:
st.sidebar.success(f"Gemini Key found ({key[:5]}...)")
else:
st.sidebar.error("Gemini Key NOT found!")
# 2. SerpAPI Check
serp_key = os.getenv("SERP_API")
if serp_key:
st.sidebar.success(f"SerpAPI Key found ({serp_key[:5]}...)")
else:
st.sidebar.error("SerpAPI Key NOT found in Env!")
# 3. Network Check
try:
import requests
res = requests.get("https://generativelanguage.googleapis.com", timeout=2)
st.sidebar.success(f"Gemini API Reachable ({res.status_code})")
except Exception as e:
st.sidebar.error(f"Network Error: {e}")
# 4. Live Lookup Test
if st.sidebar.button("Test Role Lookup (Georg Stahl)"):
from lookup_role import lookup_person_role
with st.sidebar.status("Running Lookup..."):
res = lookup_person_role("Georg Stahl", "Klemm Bohrtechnik GmbH")
if res:
st.sidebar.success(f"Result: {res}")
else:
st.sidebar.error("Result: None")
# Main View
leads = get_leads()
df = pd.DataFrame(leads)
if not df.empty:
col1, col2, col3 = st.columns(3)
col1.metric("Total Leads", len(df))
col2.metric("New / Unsynced", len(df[df['status'] == 'new']))
col3.metric("Synced to CE", len(df[df['status'] == 'synced']))
st.subheader("Lead Pipeline")
for index, row in df.iterrows():
# Format date for title
date_str = ""
if row.get('received_at'):
try:
dt = pd.to_datetime(row['received_at'])
date_str = dt.strftime("%d.%m. %H:%M")
except:
pass
with st.expander(f"{date_str} | {row['company_name']} ({row['status']})"):
# Quality Warning
meta = {}
if row.get('lead_metadata'):
try: meta = json.loads(row['lead_metadata'])
except: pass
if meta.get('is_low_quality'):
st.warning("⚠️ Low Quality Lead detected (Free-mail or missing company). Use for reclamation if applicable.")
c1, c2 = st.columns(2)
# --- Left Column: Lead Data ---
c1.write(f"**Contact:** {row['contact_name']}")
c1.write(f"**Email:** {row['email']}")
# Metadata Display
meta = {}
if row.get('lead_metadata'):
try:
meta = json.loads(row['lead_metadata'])
except:
pass
role = meta.get('role')
if role:
c1.info(f"**Role:** {role}")
else:
if c1.button("🔍 Find Role (SerpAPI)", key=f"role_{row['id']}"):
from enrich import enrich_contact_role
with st.spinner("Searching LinkedIn via Google..."):
found_role = enrich_contact_role(row)
if found_role:
st.success(f"Found: {found_role}")
st.rerun()
else:
st.error("No role found.")
if meta:
c1.write("---")
c1.write(f"**Area:** {meta.get('area', '-')}")
c1.write(f"**Purpose:** {meta.get('purpose', '-')}")
c1.write(f"**Location:** {meta.get('zip', '')} {meta.get('city', '')}")
# Manual Sync Button for individual lead
if row['status'] == 'new':
if c1.button("🚀 Sync to Company Explorer", key=f"sync_single_{row['id']}"):
with st.spinner("Processing lead (Role + CE Sync)..."):
res = sync_single_lead(row['id'])
if res.get('status') != 'error':
st.success("Lead synced successfully!")
st.rerun()
else:
st.error(f"Sync failed: {res.get('message')}")
else:
if c1.button("🔄 Reset Status to New", key=f"reset_{row['id']}"):
reset_lead(row['id'])
st.toast("Lead status reset.")
st.rerun()
with c1.expander("Show Original Email Content"):
st.text(clean_html_to_text(row['raw_body']))
if st.checkbox("Show Raw HTML", key=f"raw_{row['id']}"):
st.code(row['raw_body'], language="html")
# --- Right Column: Enrichment & Response ---
enrichment = json.loads(row['enrichment_data']) if row['enrichment_data'] else {}
if enrichment:
c2.write("--- Integration Status ---")
ce_id = enrichment.get('ce_id')
if ce_id:
c2.success(f"✅ Linked to Company Explorer (ID: {ce_id})")
# CE Data Display
ce_data = enrichment.get('ce_data', {})
# Support both names (CE uses industry_ai internally)
vertical = ce_data.get('industry_ai') or ce_data.get('vertical')
summary = ce_data.get('research_dossier') or ce_data.get('summary')
if vertical and vertical != 'None':
c2.info(f"**Industry:** {vertical}")
else:
c2.warning("Industry Analysis pending...")
if summary:
with c2.expander("Show Analysis Summary"):
st.write(summary)
# Refresh Button
if c2.button("🔄 Refresh Analysis Data", key=f"refresh_{row['id']}"):
with st.spinner("Fetching latest data from Company Explorer..."):
new_data = refresh_ce_data(row['id'], ce_id)
st.toast("Data refreshed!")
st.rerun()
# Generate Reply Button (only if we have data)
c2.write("--- Response Generation ---")
if c2.button("✨ Generate Expert Reply", key=f"gen_{row['id']}"):
with st.spinner("Generating draft with Gemini..."):
# Prepare lead data dict from row
lead_dict = row.to_dict()
draft = generate_email_draft(lead_dict, ce_data)
# Store draft in session state to persist
st.session_state[f"draft_{row['id']}"] = draft
# Display Draft
if f"draft_{row['id']}" in st.session_state:
c2.text_area("Draft Email", value=st.session_state[f"draft_{row['id']}"], height=300)
else:
c2.warning("⚠️ Not yet synced or failed")
c2.info(f"Log: {enrichment.get('message')}")
else:
st.info("No leads found. Click 'Ingest Emails' in the sidebar.")