[31388f42] Implement Expert Response Generator with Gemini & CE integration

This commit is contained in:
2026-03-02 08:46:22 +00:00
parent 128b117d08
commit 900114bf77
5 changed files with 293 additions and 14 deletions

View File

@@ -2,7 +2,8 @@ import streamlit as st
import pandas as pd
from db import get_leads, init_db
import json
from enrich import run_sync # Import our sync function
from enrich import run_sync, refresh_ce_data
from generate_reply import generate_email_draft
st.set_page_config(page_title="TradingTwins Lead Engine", layout="wide")
@@ -18,7 +19,20 @@ if st.sidebar.button("1. Ingest Emails (Mock)"):
st.sidebar.success(f"Ingested {count} new leads.")
st.rerun()
if st.sidebar.button("2. Sync to Company Explorer"):
if st.sidebar.button("2. Ingest Real Emails (Graph API)"):
try:
from trading_twins_ingest import process_leads
with st.spinner("Fetching emails from Microsoft Graph..."):
count = process_leads()
if count > 0:
st.sidebar.success(f"Successfully ingested {count} new leads form inbox!")
else:
st.sidebar.info("No new leads found in inbox.")
st.rerun()
except Exception as e:
st.sidebar.error(f"Ingest failed: {e}")
if st.sidebar.button("3. Sync to Company Explorer"):
with st.spinner("Syncing with Company Explorer API..."):
# Capture output for debugging
try:
@@ -52,24 +66,76 @@ if not df.empty:
for index, row in df.iterrows():
with st.expander(f"{row['company_name']} ({row['status']})"):
c1, c2 = st.columns(2)
# --- Left Column: Lead Data ---
c1.write(f"**Contact:** {row['contact_name']}")
c1.write(f"**Email:** {row['email']}")
c1.text(row['raw_body'][:200] + "...")
# Metadata Display
meta = {}
if row.get('lead_metadata'):
try:
meta = json.loads(row['lead_metadata'])
except:
pass
if meta:
c1.write("---")
c1.write(f"**Area:** {meta.get('area', '-')}")
c1.write(f"**Purpose:** {meta.get('purpose', '-')}")
c1.write(f"**Location:** {meta.get('zip', '')} {meta.get('city', '')}")
with c1.expander("Show Original Email Body"):
st.code(row['raw_body'], language="html")
# --- Right Column: Enrichment & Response ---
enrichment = json.loads(row['enrichment_data']) if row['enrichment_data'] else {}
if enrichment:
c2.write("--- Integration Status ---")
if enrichment.get('ce_id'):
c2.success(f"✅ Linked to Company Explorer (ID: {enrichment['ce_id']})")
c2.write(f"**CE Status:** {enrichment.get('ce_status')}")
ce_id = enrichment.get('ce_id')
if ce_id:
c2.success(f"✅ Linked to Company Explorer (ID: {ce_id})")
# CE Data Display
ce_data = enrichment.get('ce_data', {})
vertical = ce_data.get('vertical')
summary = ce_data.get('summary')
if vertical:
c2.info(f"**Industry:** {vertical}")
else:
c2.warning("Industry Analysis pending...")
if summary:
with c2.expander("Show Analysis Summary"):
st.write(summary)
# Refresh Button
if c2.button("🔄 Refresh Analysis Data", key=f"refresh_{row['id']}"):
with st.spinner("Fetching latest data from Company Explorer..."):
new_data = refresh_ce_data(row['id'], ce_id)
st.toast("Data refreshed!")
st.rerun()
# Generate Reply Button (only if we have data)
c2.write("--- Response Generation ---")
if c2.button("✨ Generate Expert Reply", key=f"gen_{row['id']}"):
with st.spinner("Generating draft with Gemini..."):
# Prepare lead data dict from row
lead_dict = row.to_dict()
draft = generate_email_draft(lead_dict, ce_data)
# Store draft in session state to persist
st.session_state[f"draft_{row['id']}"] = draft
# Display Draft
if f"draft_{row['id']}" in st.session_state:
c2.text_area("Draft Email", value=st.session_state[f"draft_{row['id']}"], height=300)
else:
c2.warning("⚠️ Not yet synced or failed")
c2.info(f"Log: {enrichment.get('message')}")
if enrichment.get('ce_data'):
c2.json(enrichment['ce_data'])
c2.info(f"Log: {enrichment.get('message')}")
else:
st.info("No leads found. Click 'Ingest Emails' in the sidebar.")

View File

@@ -27,12 +27,21 @@ def init_db():
email TEXT,
phone TEXT,
raw_body TEXT,
lead_metadata TEXT,
enrichment_data TEXT,
status TEXT DEFAULT 'new',
response_draft TEXT,
sent_at TIMESTAMP
)
''')
# Simple migration check: check if lead_metadata column exists
try:
c.execute('SELECT lead_metadata FROM leads LIMIT 1')
except sqlite3.OperationalError:
print("Migrating DB: Adding lead_metadata column...")
c.execute('ALTER TABLE leads ADD COLUMN lead_metadata TEXT')
conn.commit()
conn.close()
@@ -41,12 +50,20 @@ def insert_lead(lead_data):
if not os.path.exists(DB_PATH):
init_db()
# Extract metadata fields
meta = {
'area': lead_data.get('area'),
'purpose': lead_data.get('purpose'),
'zip': lead_data.get('zip'),
'city': lead_data.get('city')
}
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
try:
c.execute('''
INSERT INTO leads (source_id, received_at, company_name, contact_name, email, phone, raw_body, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO leads (source_id, received_at, company_name, contact_name, email, phone, raw_body, lead_metadata, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
lead_data.get('id'),
datetime.now(),
@@ -55,6 +72,7 @@ def insert_lead(lead_data):
lead_data.get('email'),
lead_data.get('phone'),
lead_data.get('raw_body'),
json.dumps(meta),
'new'
))
conn.commit()
@@ -64,6 +82,14 @@ def insert_lead(lead_data):
finally:
conn.close()
def update_lead_metadata(lead_id, meta_data):
"""Helper to update metadata for existing leads (repair)"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('UPDATE leads SET lead_metadata = ? WHERE id = ?', (json.dumps(meta_data), lead_id))
conn.commit()
conn.close()
def get_leads():
if not os.path.exists(DB_PATH):
init_db()

View File

@@ -5,7 +5,7 @@ import sqlite3
# Füge das Hauptverzeichnis zum Python-Pfad hinzu, damit der Connector gefunden wird
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from company_explorer_connector import handle_company_workflow
from company_explorer_connector import handle_company_workflow, get_company_details
from db import get_leads, DB_PATH
def update_lead_enrichment(lead_id, data, status):
@@ -18,6 +18,26 @@ def update_lead_enrichment(lead_id, data, status):
conn.close()
print(f"Lead {lead_id} aktualisiert. Neuer Status: {status}")
def refresh_ce_data(lead_id, ce_id):
"""
Holt die aktuellsten Daten (inkl. Analyse-Ergebnis) vom Company Explorer
und aktualisiert den lokalen Lead.
"""
print(f"Refreshing data for CE ID {ce_id}...")
ce_data = get_company_details(ce_id)
# Bestehende Enrichment-Daten holen, um nichts zu überschreiben
# (Vereinfachung: Wir bauen das dict neu auf)
enrichment_data = {
"sync_status": "refreshed",
"ce_id": ce_id,
"message": "Data refreshed from CE",
"ce_data": ce_data
}
update_lead_enrichment(lead_id, enrichment_data, status='synced')
return ce_data
def run_sync():
"""
Haupt-Synchronisationsprozess.

View File

@@ -0,0 +1,108 @@
import os
import json
import requests
# Load API Key
def get_gemini_key():
try:
# Try finding key in parent dir
key_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'gemini_api_key.txt')
if os.path.exists(key_path):
with open(key_path, 'r') as f:
return f.read().strip()
except:
pass
return os.getenv("GEMINI_API_KEY")
def generate_email_draft(lead_data, company_data, booking_link="https://outlook.office365.com/owa/calendar/RoboplanetGmbH@robo-planet.de/bookings/"):
"""
Generates a personalized sales email using Gemini API.
"""
api_key = get_gemini_key()
if not api_key:
return "Error: Gemini API Key not found."
# Extract Data
company_name = lead_data.get('company_name', 'Interessent')
contact_name = lead_data.get('contact_name', 'Damen und Herren')
# Metadata from Lead
meta = {}
if lead_data.get('lead_metadata'):
try:
meta = json.loads(lead_data.get('lead_metadata'))
except:
pass
area = meta.get('area', 'Unbekannte Fläche')
purpose = meta.get('purpose', 'Reinigung')
city = meta.get('city', '')
# Data from Company Explorer
ce_summary = company_data.get('summary', 'Keine Details verfügbar.')
ce_vertical = company_data.get('vertical', 'Allgemein')
ce_website = company_data.get('website', '')
# Prompt Engineering
prompt = f"""
Du bist ein erfahrener Vertriebsexperte für Roboter-Reinigungslösungen bei Robo-Planet.
Deine Aufgabe ist es, eine Antwort-E-Mail auf eine Lead-Anfrage zu formulieren.
HINTERGRUND ZUM PROSPEKT (Aus Analyse):
- Firma: {company_name}
- Standort: {city}
- Branche/Vertical: {ce_vertical}
- Web-Zusammenfassung: {ce_summary}
ANFRAGE-DETAILS (Vom Kunden):
- Reinigungsfläche: {area}
- Einsatzzweck: {purpose}
- Kontaktperson: {contact_name}
DEIN ZIEL:
Schreibe eine kurze, prägnante und wertschätzende E-Mail.
1. Bedanke dich für die Anfrage.
2. Zeige kurz, dass du verstanden hast, was die Firma macht (nutze den Kontext aus 'Web-Zusammenfassung' in einem Satz, z.B. "Als führender Anbieter von xyz...").
3. Gehe auf die Fläche ({area}) ein.
- Wenn > 1000qm oder Industrie/Halle: Erwähne den "Puma M20" oder "Scrubber 75" als Kraftpaket.
- Wenn < 1000qm oder Büro/Praxis/Gastro: Erwähne den "Phantas" oder "Pudu CC1" als wendige Lösung.
- Wenn "Unbekannt": Stelle eine offene Frage zur Umgebung.
4. Call to Action: Schlage ein kurzes Beratungsgespräch vor.
5. Füge diesen Buchungslink ein: {booking_link}
TONALITÄT:
Professionell, hilfreich, auf den Punkt. Keine Marketing-Floskeln.
FORMAT:
Betreff: [Vorschlag für Betreff]
[E-Mail Text]
"""
# Call Gemini API
url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={api_key}"
headers = {'Content-Type': 'application/json'}
payload = {
"contents": [{"parts": [{"text": prompt}]}]
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
result = response.json()
return result['candidates'][0]['content']['parts'][0]['text']
except Exception as e:
return f"Error generating draft: {str(e)}"
if __name__ == "__main__":
# Test Mock
mock_lead = {
"company_name": "Klinikum Test",
"contact_name": "Dr. Müller",
"lead_metadata": json.dumps({"area": "5000 qm", "purpose": "Desinfektion und Boden", "city": "Berlin"})
}
mock_company = {
"vertical": "Healthcare / Krankenhaus",
"summary": "Ein großes Klinikum der Maximalversorgung mit Fokus auf Kardiologie."
}
print(generate_email_draft(mock_lead, mock_company))

View File

@@ -0,0 +1,59 @@
import sqlite3
import json
import re
import os
import sys
# Add path to import db
sys.path.append(os.path.dirname(__file__))
from db import get_leads, update_lead_metadata, init_db
def parse_tradingtwins_html_local(html_body):
"""
Extracts data from the Tradingtwins HTML table structure.
Copied logic to ensure independence.
"""
data = {}
field_map = {
'Einsatzzweck': 'purpose',
'Reinigungs-Fläche': 'area',
'PLZ': 'zip',
'Stadt': 'city'
}
for label, key in field_map.items():
pattern = fr'>\s*{re.escape(label)}:\s*</p>.*?<p[^>]*>(.*?)</p>'
match = re.search(pattern, html_body, re.DOTALL | re.IGNORECASE)
if match:
raw_val = match.group(1).strip()
clean_val = re.sub(r'<[^>]+>', '', raw_val).strip()
data[key] = clean_val
return data
def repair_database():
print("Initializing DB (migrating schema if needed)...")
init_db()
leads = get_leads()
print(f"Found {len(leads)} leads to check.")
count = 0
for lead in leads:
# Check if metadata is missing or empty
current_meta = lead.get('lead_metadata')
if not current_meta or current_meta == '{}' or current_meta == 'null':
print(f"Repairing Lead {lead['id']} ({lead['company_name']})...")
raw_body = lead.get('raw_body', '')
if raw_body:
extracted = parse_tradingtwins_html_local(raw_body)
update_lead_metadata(lead['id'], extracted)
print(f" -> Extracted: {extracted}")
count += 1
else:
print(" -> No raw body found.")
print(f"Repaired {count} leads.")
if __name__ == "__main__":
repair_database()