From 900114bf776418bb867ff0ef4046633b752f10cd Mon Sep 17 00:00:00 2001
From: Floke
Date: Mon, 2 Mar 2026 08:46:22 +0000
Subject: [PATCH] [31388f42] Implement Expert Response Generator with Gemini &
CE integration
---
lead-engine/app.py | 88 +++++++++++++++++++++++----
lead-engine/db.py | 30 +++++++++-
lead-engine/enrich.py | 22 ++++++-
lead-engine/generate_reply.py | 108 ++++++++++++++++++++++++++++++++++
lead-engine/repair_leads.py | 59 +++++++++++++++++++
5 files changed, 293 insertions(+), 14 deletions(-)
create mode 100644 lead-engine/generate_reply.py
create mode 100644 lead-engine/repair_leads.py
diff --git a/lead-engine/app.py b/lead-engine/app.py
index 2dbcf780..e0aa6b7c 100644
--- a/lead-engine/app.py
+++ b/lead-engine/app.py
@@ -2,7 +2,8 @@ import streamlit as st
import pandas as pd
from db import get_leads, init_db
import json
-from enrich import run_sync # Import our sync function
+from enrich import run_sync, refresh_ce_data
+from generate_reply import generate_email_draft
st.set_page_config(page_title="TradingTwins Lead Engine", layout="wide")
@@ -18,7 +19,20 @@ if st.sidebar.button("1. Ingest Emails (Mock)"):
st.sidebar.success(f"Ingested {count} new leads.")
st.rerun()
-if st.sidebar.button("2. Sync to Company Explorer"):
+if st.sidebar.button("2. Ingest Real Emails (Graph API)"):
+ try:
+ from trading_twins_ingest import process_leads
+ with st.spinner("Fetching emails from Microsoft Graph..."):
+ count = process_leads()
+ if count > 0:
+ st.sidebar.success(f"Successfully ingested {count} new leads form inbox!")
+ else:
+ st.sidebar.info("No new leads found in inbox.")
+ st.rerun()
+ except Exception as e:
+ st.sidebar.error(f"Ingest failed: {e}")
+
+if st.sidebar.button("3. Sync to Company Explorer"):
with st.spinner("Syncing with Company Explorer API..."):
# Capture output for debugging
try:
@@ -52,24 +66,76 @@ if not df.empty:
for index, row in df.iterrows():
with st.expander(f"{row['company_name']} ({row['status']})"):
c1, c2 = st.columns(2)
+
+ # --- Left Column: Lead Data ---
c1.write(f"**Contact:** {row['contact_name']}")
c1.write(f"**Email:** {row['email']}")
- c1.text(row['raw_body'][:200] + "...")
+ # Metadata Display
+ meta = {}
+ if row.get('lead_metadata'):
+ try:
+ meta = json.loads(row['lead_metadata'])
+ except:
+ pass
+
+ if meta:
+ c1.write("---")
+ c1.write(f"**Area:** {meta.get('area', '-')}")
+ c1.write(f"**Purpose:** {meta.get('purpose', '-')}")
+ c1.write(f"**Location:** {meta.get('zip', '')} {meta.get('city', '')}")
+
+ with c1.expander("Show Original Email Body"):
+ st.code(row['raw_body'], language="html")
+
+ # --- Right Column: Enrichment & Response ---
enrichment = json.loads(row['enrichment_data']) if row['enrichment_data'] else {}
if enrichment:
c2.write("--- Integration Status ---")
- if enrichment.get('ce_id'):
- c2.success(f"✅ Linked to Company Explorer (ID: {enrichment['ce_id']})")
- c2.write(f"**CE Status:** {enrichment.get('ce_status')}")
+ ce_id = enrichment.get('ce_id')
+
+ if ce_id:
+ c2.success(f"✅ Linked to Company Explorer (ID: {ce_id})")
+
+ # CE Data Display
+ ce_data = enrichment.get('ce_data', {})
+ vertical = ce_data.get('vertical')
+ summary = ce_data.get('summary')
+
+ if vertical:
+ c2.info(f"**Industry:** {vertical}")
+ else:
+ c2.warning("Industry Analysis pending...")
+
+ if summary:
+ with c2.expander("Show Analysis Summary"):
+ st.write(summary)
+
+ # Refresh Button
+ if c2.button("🔄 Refresh Analysis Data", key=f"refresh_{row['id']}"):
+ with st.spinner("Fetching latest data from Company Explorer..."):
+ new_data = refresh_ce_data(row['id'], ce_id)
+ st.toast("Data refreshed!")
+ st.rerun()
+
+ # Generate Reply Button (only if we have data)
+ c2.write("--- Response Generation ---")
+ if c2.button("✨ Generate Expert Reply", key=f"gen_{row['id']}"):
+ with st.spinner("Generating draft with Gemini..."):
+ # Prepare lead data dict from row
+ lead_dict = row.to_dict()
+ draft = generate_email_draft(lead_dict, ce_data)
+ # Store draft in session state to persist
+ st.session_state[f"draft_{row['id']}"] = draft
+
+ # Display Draft
+ if f"draft_{row['id']}" in st.session_state:
+ c2.text_area("Draft Email", value=st.session_state[f"draft_{row['id']}"], height=300)
+
else:
c2.warning("⚠️ Not yet synced or failed")
-
- c2.info(f"Log: {enrichment.get('message')}")
-
- if enrichment.get('ce_data'):
- c2.json(enrichment['ce_data'])
+ c2.info(f"Log: {enrichment.get('message')}")
else:
st.info("No leads found. Click 'Ingest Emails' in the sidebar.")
diff --git a/lead-engine/db.py b/lead-engine/db.py
index f994bd63..a483f02d 100644
--- a/lead-engine/db.py
+++ b/lead-engine/db.py
@@ -27,12 +27,21 @@ def init_db():
email TEXT,
phone TEXT,
raw_body TEXT,
+ lead_metadata TEXT,
enrichment_data TEXT,
status TEXT DEFAULT 'new',
response_draft TEXT,
sent_at TIMESTAMP
)
''')
+
+ # Simple migration check: check if lead_metadata column exists
+ try:
+ c.execute('SELECT lead_metadata FROM leads LIMIT 1')
+ except sqlite3.OperationalError:
+ print("Migrating DB: Adding lead_metadata column...")
+ c.execute('ALTER TABLE leads ADD COLUMN lead_metadata TEXT')
+
conn.commit()
conn.close()
@@ -41,12 +50,20 @@ def insert_lead(lead_data):
if not os.path.exists(DB_PATH):
init_db()
+ # Extract metadata fields
+ meta = {
+ 'area': lead_data.get('area'),
+ 'purpose': lead_data.get('purpose'),
+ 'zip': lead_data.get('zip'),
+ 'city': lead_data.get('city')
+ }
+
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
try:
c.execute('''
- INSERT INTO leads (source_id, received_at, company_name, contact_name, email, phone, raw_body, status)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ INSERT INTO leads (source_id, received_at, company_name, contact_name, email, phone, raw_body, lead_metadata, status)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
lead_data.get('id'),
datetime.now(),
@@ -55,6 +72,7 @@ def insert_lead(lead_data):
lead_data.get('email'),
lead_data.get('phone'),
lead_data.get('raw_body'),
+ json.dumps(meta),
'new'
))
conn.commit()
@@ -64,6 +82,14 @@ def insert_lead(lead_data):
finally:
conn.close()
+def update_lead_metadata(lead_id, meta_data):
+ """Helper to update metadata for existing leads (repair)"""
+ conn = sqlite3.connect(DB_PATH)
+ c = conn.cursor()
+ c.execute('UPDATE leads SET lead_metadata = ? WHERE id = ?', (json.dumps(meta_data), lead_id))
+ conn.commit()
+ conn.close()
+
def get_leads():
if not os.path.exists(DB_PATH):
init_db()
diff --git a/lead-engine/enrich.py b/lead-engine/enrich.py
index 015af4fc..4728f720 100644
--- a/lead-engine/enrich.py
+++ b/lead-engine/enrich.py
@@ -5,7 +5,7 @@ import sqlite3
# Füge das Hauptverzeichnis zum Python-Pfad hinzu, damit der Connector gefunden wird
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
-from company_explorer_connector import handle_company_workflow
+from company_explorer_connector import handle_company_workflow, get_company_details
from db import get_leads, DB_PATH
def update_lead_enrichment(lead_id, data, status):
@@ -18,6 +18,26 @@ def update_lead_enrichment(lead_id, data, status):
conn.close()
print(f"Lead {lead_id} aktualisiert. Neuer Status: {status}")
+def refresh_ce_data(lead_id, ce_id):
+ """
+ Holt die aktuellsten Daten (inkl. Analyse-Ergebnis) vom Company Explorer
+ und aktualisiert den lokalen Lead.
+ """
+ print(f"Refreshing data for CE ID {ce_id}...")
+ ce_data = get_company_details(ce_id)
+
+ # Bestehende Enrichment-Daten holen, um nichts zu überschreiben
+ # (Vereinfachung: Wir bauen das dict neu auf)
+ enrichment_data = {
+ "sync_status": "refreshed",
+ "ce_id": ce_id,
+ "message": "Data refreshed from CE",
+ "ce_data": ce_data
+ }
+
+ update_lead_enrichment(lead_id, enrichment_data, status='synced')
+ return ce_data
+
def run_sync():
"""
Haupt-Synchronisationsprozess.
diff --git a/lead-engine/generate_reply.py b/lead-engine/generate_reply.py
new file mode 100644
index 00000000..5f03c62d
--- /dev/null
+++ b/lead-engine/generate_reply.py
@@ -0,0 +1,108 @@
+import os
+import json
+import requests
+
+# Load API Key
+def get_gemini_key():
+ try:
+ # Try finding key in parent dir
+ key_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'gemini_api_key.txt')
+ if os.path.exists(key_path):
+ with open(key_path, 'r') as f:
+ return f.read().strip()
+ except:
+ pass
+ return os.getenv("GEMINI_API_KEY")
+
+def generate_email_draft(lead_data, company_data, booking_link="https://outlook.office365.com/owa/calendar/RoboplanetGmbH@robo-planet.de/bookings/"):
+ """
+ Generates a personalized sales email using Gemini API.
+ """
+ api_key = get_gemini_key()
+ if not api_key:
+ return "Error: Gemini API Key not found."
+
+ # Extract Data
+ company_name = lead_data.get('company_name', 'Interessent')
+ contact_name = lead_data.get('contact_name', 'Damen und Herren')
+
+ # Metadata from Lead
+ meta = {}
+ if lead_data.get('lead_metadata'):
+ try:
+ meta = json.loads(lead_data.get('lead_metadata'))
+ except:
+ pass
+
+ area = meta.get('area', 'Unbekannte Fläche')
+ purpose = meta.get('purpose', 'Reinigung')
+ city = meta.get('city', '')
+
+ # Data from Company Explorer
+ ce_summary = company_data.get('summary', 'Keine Details verfügbar.')
+ ce_vertical = company_data.get('vertical', 'Allgemein')
+ ce_website = company_data.get('website', '')
+
+ # Prompt Engineering
+ prompt = f"""
+ Du bist ein erfahrener Vertriebsexperte für Roboter-Reinigungslösungen bei Robo-Planet.
+ Deine Aufgabe ist es, eine Antwort-E-Mail auf eine Lead-Anfrage zu formulieren.
+
+ HINTERGRUND ZUM PROSPEKT (Aus Analyse):
+ - Firma: {company_name}
+ - Standort: {city}
+ - Branche/Vertical: {ce_vertical}
+ - Web-Zusammenfassung: {ce_summary}
+
+ ANFRAGE-DETAILS (Vom Kunden):
+ - Reinigungsfläche: {area}
+ - Einsatzzweck: {purpose}
+ - Kontaktperson: {contact_name}
+
+ DEIN ZIEL:
+ Schreibe eine kurze, prägnante und wertschätzende E-Mail.
+ 1. Bedanke dich für die Anfrage.
+ 2. Zeige kurz, dass du verstanden hast, was die Firma macht (nutze den Kontext aus 'Web-Zusammenfassung' in einem Satz, z.B. "Als führender Anbieter von xyz...").
+ 3. Gehe auf die Fläche ({area}) ein.
+ - Wenn > 1000qm oder Industrie/Halle: Erwähne den "Puma M20" oder "Scrubber 75" als Kraftpaket.
+ - Wenn < 1000qm oder Büro/Praxis/Gastro: Erwähne den "Phantas" oder "Pudu CC1" als wendige Lösung.
+ - Wenn "Unbekannt": Stelle eine offene Frage zur Umgebung.
+ 4. Call to Action: Schlage ein kurzes Beratungsgespräch vor.
+ 5. Füge diesen Buchungslink ein: {booking_link}
+
+ TONALITÄT:
+ Professionell, hilfreich, auf den Punkt. Keine Marketing-Floskeln.
+
+ FORMAT:
+ Betreff: [Vorschlag für Betreff]
+
+ [E-Mail Text]
+ """
+
+ # Call Gemini API
+ url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={api_key}"
+ headers = {'Content-Type': 'application/json'}
+ payload = {
+ "contents": [{"parts": [{"text": prompt}]}]
+ }
+
+ try:
+ response = requests.post(url, headers=headers, json=payload)
+ response.raise_for_status()
+ result = response.json()
+ return result['candidates'][0]['content']['parts'][0]['text']
+ except Exception as e:
+ return f"Error generating draft: {str(e)}"
+
+if __name__ == "__main__":
+ # Test Mock
+ mock_lead = {
+ "company_name": "Klinikum Test",
+ "contact_name": "Dr. Müller",
+ "lead_metadata": json.dumps({"area": "5000 qm", "purpose": "Desinfektion und Boden", "city": "Berlin"})
+ }
+ mock_company = {
+ "vertical": "Healthcare / Krankenhaus",
+ "summary": "Ein großes Klinikum der Maximalversorgung mit Fokus auf Kardiologie."
+ }
+ print(generate_email_draft(mock_lead, mock_company))
diff --git a/lead-engine/repair_leads.py b/lead-engine/repair_leads.py
new file mode 100644
index 00000000..4e4472d8
--- /dev/null
+++ b/lead-engine/repair_leads.py
@@ -0,0 +1,59 @@
+import sqlite3
+import json
+import re
+import os
+import sys
+
+# Add path to import db
+sys.path.append(os.path.dirname(__file__))
+from db import get_leads, update_lead_metadata, init_db
+
+def parse_tradingtwins_html_local(html_body):
+ """
+ Extracts data from the Tradingtwins HTML table structure.
+ Copied logic to ensure independence.
+ """
+ data = {}
+ field_map = {
+ 'Einsatzzweck': 'purpose',
+ 'Reinigungs-Fläche': 'area',
+ 'PLZ': 'zip',
+ 'Stadt': 'city'
+ }
+
+ for label, key in field_map.items():
+ pattern = fr'>\s*{re.escape(label)}:\s*
.*?]*>(.*?)
'
+ match = re.search(pattern, html_body, re.DOTALL | re.IGNORECASE)
+ if match:
+ raw_val = match.group(1).strip()
+ clean_val = re.sub(r'<[^>]+>', '', raw_val).strip()
+ data[key] = clean_val
+ return data
+
+def repair_database():
+ print("Initializing DB (migrating schema if needed)...")
+ init_db()
+
+ leads = get_leads()
+ print(f"Found {len(leads)} leads to check.")
+
+ count = 0
+ for lead in leads:
+ # Check if metadata is missing or empty
+ current_meta = lead.get('lead_metadata')
+ if not current_meta or current_meta == '{}' or current_meta == 'null':
+ print(f"Repairing Lead {lead['id']} ({lead['company_name']})...")
+
+ raw_body = lead.get('raw_body', '')
+ if raw_body:
+ extracted = parse_tradingtwins_html_local(raw_body)
+ update_lead_metadata(lead['id'], extracted)
+ print(f" -> Extracted: {extracted}")
+ count += 1
+ else:
+ print(" -> No raw body found.")
+
+ print(f"Repaired {count} leads.")
+
+if __name__ == "__main__":
+ repair_database()