This commit integrates the Roboplanet website contact form submissions into the Lead Engine, allowing them to be processed alongside TradingTwins leads.
Key changes:
- **Database Schema Update (db.py):** Added a new source column to the leads table for tracking lead origin (TradingTwins or Website-Formular). Includes a migration check to safely add the column.
- **Improved HTML Parsing (ingest.py):** Refined the `parse_roboplanet_form` function to accurately extract data from the specific HTML structure of Roboplanet contact form emails.
- **Enhanced Ingestion Logic (trading_twins_ingest.py):**
- Renamed `fetch_tradingtwins_emails` to `fetch_new_leads_emails` and updated it to fetch emails from both lead sources.
- Modified `process_leads` to dynamically select the correct parser based on email subject.
- Ensured `source` field is correctly populated and `is_low_quality` checks are applied for both lead types.
- **UI Enhancement (app.py):** Updated the Streamlit UI to visually distinguish lead types with icons and improved the "Low Quality Lead" warning message.
This feature enables a unified processing pipeline for different lead sources and provides better visibility in the Lead Engine dashboard.
257 lines
10 KiB
Python
257 lines
10 KiB
Python
import streamlit as st
|
|
import pandas as pd
|
|
from db import get_leads, init_db, reset_lead
|
|
import json
|
|
import re
|
|
import os
|
|
from enrich import run_sync, refresh_ce_data, sync_single_lead
|
|
from generate_reply import generate_email_draft
|
|
|
|
def clean_html_to_text(html_content):
|
|
"""Surgical helper to extract relevant Tradingtwins data and format it cleanly."""
|
|
if not html_content:
|
|
return ""
|
|
|
|
# 1. Strip head and style
|
|
clean = re.sub(r'<head.*?>.*?</head>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
|
|
clean = re.sub(r'<style.*?>.*?</style>', '', clean, flags=re.DOTALL | re.IGNORECASE)
|
|
|
|
# 2. Extract the core data block (from 'Datum:' until the matchmaking plug)
|
|
# We look for the first 'Datum:' label
|
|
start_match = re.search(r'Datum:', clean, re.IGNORECASE)
|
|
end_match = re.search(r'Kennen Sie schon Ihr persönliches Konto', clean, re.IGNORECASE)
|
|
|
|
if start_match:
|
|
start_pos = start_match.start()
|
|
end_pos = end_match.start() if end_match else len(clean)
|
|
clean = clean[start_pos:end_pos]
|
|
|
|
# 3. Format Table Structure: </td><td> should be a space/tab, </tr> a newline
|
|
# This prevents the "Label on one line, value on next" issue
|
|
clean = re.sub(r'</td>\s*<td.*?>', ' ', clean, flags=re.IGNORECASE)
|
|
clean = re.sub(r'</tr>', '\n', clean, flags=re.IGNORECASE)
|
|
|
|
# 4. Standard Cleanup
|
|
clean = re.sub(r'<br\s*/?>', '\n', clean, flags=re.IGNORECASE)
|
|
clean = re.sub(r'</p>', '\n', clean, flags=re.IGNORECASE)
|
|
clean = re.sub(r'<.*?>', '', clean)
|
|
|
|
# 5. Entity Decoding
|
|
clean = clean.replace(' ', ' ').replace('&', '&').replace('"', '"').replace('>', '>')
|
|
|
|
# 6. Final Polish: remove empty lines and leading/trailing whitespace
|
|
lines = [line.strip() for line in clean.split('\n') if line.strip()]
|
|
return '\n'.join(lines)
|
|
|
|
st.set_page_config(page_title="TradingTwins Lead Engine", layout="wide")
|
|
|
|
st.title("🚀 Lead Engine: TradingTwins")
|
|
|
|
# Sidebar Actions
|
|
st.sidebar.header("Actions")
|
|
|
|
if st.sidebar.button("1. Ingest Emails (Mock)"):
|
|
from ingest import ingest_mock_leads
|
|
init_db()
|
|
count = ingest_mock_leads()
|
|
st.sidebar.success(f"Ingested {count} new leads.")
|
|
st.rerun()
|
|
|
|
if st.sidebar.button("2. Ingest Real Emails (Graph API)"):
|
|
try:
|
|
from trading_twins_ingest import process_leads
|
|
with st.spinner("Fetching emails from Microsoft Graph..."):
|
|
count = process_leads()
|
|
if count > 0:
|
|
st.sidebar.success(f"Successfully ingested {count} new leads form inbox!")
|
|
else:
|
|
st.sidebar.info("No new leads found in inbox.")
|
|
st.rerun()
|
|
except Exception as e:
|
|
st.sidebar.error(f"Ingest failed: {e}")
|
|
|
|
if st.sidebar.button("3. Sync to Company Explorer"):
|
|
with st.spinner("Syncing with Company Explorer API..."):
|
|
# Capture output for debugging
|
|
try:
|
|
# We redirect stdout to capture prints
|
|
import io
|
|
from contextlib import redirect_stdout
|
|
f = io.StringIO()
|
|
with redirect_stdout(f):
|
|
run_sync()
|
|
output = f.getvalue()
|
|
|
|
st.success("Sync finished!")
|
|
with st.expander("See Process Log", expanded=True):
|
|
st.code(output)
|
|
|
|
except Exception as e:
|
|
st.error(f"Sync Failed: {e}")
|
|
|
|
if st.sidebar.checkbox("Show System Debug"):
|
|
st.sidebar.subheader("System Diagnostics")
|
|
|
|
# 1. API Key Check
|
|
from lookup_role import get_gemini_key
|
|
key = get_gemini_key()
|
|
if key:
|
|
st.sidebar.success(f"Gemini Key found ({key[:5]}...)")
|
|
else:
|
|
st.sidebar.error("Gemini Key NOT found!")
|
|
|
|
# 2. SerpAPI Check
|
|
serp_key = os.getenv("SERP_API")
|
|
if serp_key:
|
|
st.sidebar.success(f"SerpAPI Key found ({serp_key[:5]}...)")
|
|
else:
|
|
st.sidebar.error("SerpAPI Key NOT found in Env!")
|
|
|
|
# 3. Network Check
|
|
try:
|
|
import requests
|
|
res = requests.get("https://generativelanguage.googleapis.com", timeout=2)
|
|
st.sidebar.success(f"Gemini API Reachable ({res.status_code})")
|
|
except Exception as e:
|
|
st.sidebar.error(f"Network Error: {e}")
|
|
|
|
# 4. Live Lookup Test
|
|
if st.sidebar.button("Test Role Lookup (Georg Stahl)"):
|
|
from lookup_role import lookup_person_role
|
|
with st.sidebar.status("Running Lookup..."):
|
|
res = lookup_person_role("Georg Stahl", "Klemm Bohrtechnik GmbH")
|
|
if res:
|
|
st.sidebar.success(f"Result: {res}")
|
|
else:
|
|
st.sidebar.error("Result: None")
|
|
|
|
# Main View
|
|
leads = get_leads()
|
|
df = pd.DataFrame(leads)
|
|
|
|
if not df.empty:
|
|
col1, col2, col3 = st.columns(3)
|
|
col1.metric("Total Leads", len(df))
|
|
col2.metric("New / Unsynced", len(df[df['status'] == 'new']))
|
|
col3.metric("Synced to CE", len(df[df['status'] == 'synced']))
|
|
|
|
st.subheader("Lead Pipeline")
|
|
|
|
for index, row in df.iterrows():
|
|
# Format date for title
|
|
date_str = ""
|
|
if row.get('received_at'):
|
|
try:
|
|
dt = pd.to_datetime(row['received_at'])
|
|
date_str = dt.strftime("%d.%m. %H:%M")
|
|
except:
|
|
pass
|
|
|
|
# --- DYNAMIC TITLE ---
|
|
source_icon = "🌐" if row.get('source') == 'Website-Formular' else "🤝"
|
|
title = f"{source_icon} {row.get('source', 'Lead')} | {date_str} | {row['company_name']}"
|
|
|
|
with st.expander(title):
|
|
# Metadata Parsing
|
|
meta = {}
|
|
if row.get('lead_metadata'):
|
|
try: meta = json.loads(row['lead_metadata'])
|
|
except: pass
|
|
|
|
# --- TOP SECTION: QUALITY WARNING ---
|
|
# Now directly checks the metadata from DB, which is more reliable
|
|
if meta.get('is_low_quality'):
|
|
st.warning("⚠️ **Low Quality Lead detected** (Free-mail provider or missing company name). Please verify manually.")
|
|
|
|
# --- SECTION 1: LEAD INFO & INTELLIGENCE ---
|
|
col_lead, col_intel = st.columns(2)
|
|
|
|
with col_lead:
|
|
st.markdown("### 📋 Lead Data")
|
|
st.write(f"**Salutation:** {meta.get('salutation', '-')}")
|
|
st.write(f"**Contact:** {row['contact_name']}")
|
|
st.write(f"**Email:** {row['email']}")
|
|
st.write(f"**Phone:** {meta.get('phone', row.get('phone', '-'))}")
|
|
|
|
role = meta.get('role')
|
|
if role:
|
|
st.info(f"**Role:** {role}")
|
|
else:
|
|
if st.button("🔍 Find Role", key=f"role_{row['id']}"):
|
|
from enrich import enrich_contact_role
|
|
with st.spinner("Searching..."):
|
|
found_role = enrich_contact_role(row)
|
|
if found_role: st.success(f"Found: {found_role}"); st.rerun()
|
|
else: st.error("No role found.")
|
|
|
|
st.write(f"**Area:** {meta.get('area', '-')}")
|
|
st.write(f"**Purpose:** {meta.get('purpose', '-')}")
|
|
st.write(f"**Functions:** {meta.get('cleaning_functions', '-')}")
|
|
st.write(f"**Location:** {meta.get('zip', '')} {meta.get('city', '')}")
|
|
|
|
with col_intel:
|
|
st.markdown("### 🔍 Intelligence (CE)")
|
|
enrichment = json.loads(row['enrichment_data']) if row['enrichment_data'] else {}
|
|
ce_id = enrichment.get('ce_id')
|
|
|
|
if ce_id:
|
|
st.success(f"✅ Linked to Company Explorer (ID: {ce_id})")
|
|
ce_data = enrichment.get('ce_data', {})
|
|
|
|
vertical = ce_data.get('industry_ai') or ce_data.get('vertical')
|
|
summary = ce_data.get('research_dossier') or ce_data.get('summary')
|
|
|
|
if vertical and vertical != 'None':
|
|
st.info(f"**Industry:** {vertical}")
|
|
else:
|
|
st.warning("Industry Analysis pending...")
|
|
|
|
if summary:
|
|
with st.expander("Show AI Research Dossier", expanded=True):
|
|
st.write(summary)
|
|
|
|
if st.button("🔄 Refresh CE Data", key=f"refresh_{row['id']}"):
|
|
with st.spinner("Fetching..."):
|
|
refresh_ce_data(row['id'], ce_id)
|
|
st.rerun()
|
|
else:
|
|
st.warning("⚠️ Not synced with Company Explorer yet")
|
|
if st.button("🚀 Sync to Company Explorer", key=f"sync_single_{row['id']}"):
|
|
with st.spinner("Syncing..."):
|
|
sync_single_lead(row['id'])
|
|
st.rerun()
|
|
|
|
st.divider()
|
|
|
|
# --- SECTION 2: ORIGINAL EMAIL ---
|
|
with st.expander("✉️ View Original Email Content"):
|
|
st.text(clean_html_to_text(row['raw_body']))
|
|
if st.checkbox("Show Raw HTML", key=f"raw_{row['id']}"):
|
|
st.code(row['raw_body'], language="html")
|
|
|
|
st.divider()
|
|
|
|
# --- SECTION 3: RESPONSE DRAFT (Full Width) ---
|
|
st.markdown("### 📝 Response Draft")
|
|
if row['status'] != 'new' and ce_id:
|
|
if st.button("✨ Generate Expert Reply", key=f"gen_{row['id']}", type="primary"):
|
|
with st.spinner("Writing email..."):
|
|
ce_data = enrichment.get('ce_data', {})
|
|
draft = generate_email_draft(row.to_dict(), ce_data)
|
|
st.session_state[f"draft_{row['id']}"] = draft
|
|
|
|
if f"draft_{row['id']}" in st.session_state:
|
|
st.text_area("Email Entwurf", value=st.session_state[f"draft_{row['id']}"], height=400)
|
|
st.button("📋 Copy to Clipboard", key=f"copy_{row['id']}", on_click=lambda: st.write("Copy functionality simulated"))
|
|
else:
|
|
st.info("Sync with Company Explorer first to generate a response.")
|
|
|
|
if row['status'] != 'new':
|
|
st.markdown("---")
|
|
if st.button("🔄 Reset Lead Status", key=f"reset_{row['id']}", help="Back to 'new' status"):
|
|
reset_lead(row['id'])
|
|
st.rerun()
|
|
|
|
else:
|
|
st.info("No leads found. Click 'Ingest Emails' in the sidebar.")
|