Add lead-engine source code to main repo

This commit is contained in:
Jarvis
2026-01-31 17:25:01 +00:00
parent 8347d5c7ae
commit 8cbac74b2f
8 changed files with 736 additions and 0 deletions

View File

@@ -0,0 +1,80 @@
# 🤖 Automation Workflow Design (n8n + Python)
This document outlines the architecture for the **TradingTwins Lead Engine** automation, combining **n8n** (Trigger/Flow Control) and **Python** (Business Logic/Sync).
## 1. High-Level Architecture
We use a **Hybrid Approach**:
* **n8n:** Handles connectivity (IMAP, Slack, CRM) and triggers.
* **Python Lead Engine:** Handles data processing, parsing, and syncing with Company Explorer.
```mermaid
sequenceDiagram
participant Mail as 📧 Email Server (IMAP)
participant n8n as ⚡ n8n Workflow
participant API as 🐍 Lead Engine API
participant DB as 🗄️ SQLite (leads.db)
participant CE as 🏢 Company Explorer
Mail->>n8n: New Email (Subject: "Lead...")
n8n->>n8n: Extract Body & Subject
n8n->>API: POST /api/ingest { "body": "...", "subject": "..." }
rect rgb(240, 248, 255)
Note right of API: Python Async Process
API->>DB: Save Lead (Status: 'new')
API-->>n8n: 200 OK (Lead Queued)
API->>CE: Check Existence / Create
CE-->>API: Company ID
API->>CE: Trigger Discovery
API->>DB: Update Status: 'synced'
end
API->>n8n: (Optional) Webhook "Sync Complete"
n8n->>Slack: Notify Sales "New Lead Synced!"
```
## 2. n8n Workflow Specification
**Trigger:** `IMAP Read` Node
* **Filter:** Subject contains "TradingTwins" OR Sender is "notification@tradingtwins.com"
* **Interval:** Every 5 minutes (or 1 min).
**Action:** `HTTP Request` Node
* **Method:** POST
* **URL:** `http://lead-engine:8000/api/ingest` (Internal Docker Network)
* **Body:** JSON
```json
{
"source": "tradingtwins",
"subject": "New Lead: Musterfirma GmbH",
"raw_body": "..."
}
```
## 3. Python Lead Engine Changes
To support this, the current Streamlit-only application needs a **FastAPI backend** (or similar) running alongside or instead of the Streamlit loop for API access.
### Requirements:
1. **Split `app.py`:** Separate UI (Streamlit) from Logic/API.
2. **New `api.py`:** FastAPI instance exposing `POST /api/ingest`.
3. **Refactor `enrich.py`:** Ensure `run_sync` can be called programmatically for a specific lead ID.
### Proposed File Structure:
```
lead-engine/
├── api.py # FastAPI (New)
├── ui.py # Streamlit (Renamed from app.py)
├── core/
│ ├── db.py # Shared DB access
│ ├── enrich.py # Sync Logic
│ └── parser.py # Parsing Logic
├── Dockerfile # Needs to run both (e.g. via supervisord or entrypoint script)
```
## 4. Why this approach?
* **Reliability:** n8n is better at keeping IMAP connections alive than a custom Python loop.
* **Scalability:** We can easily add other lead sources (e.g., Typeform, LinkedIn) just by adding another n8n trigger pointing to the same Python API.
* **Focus:** Python code focuses purely on "What to do with the data", not "How to get the data".

10
lead-engine/Dockerfile Normal file
View File

@@ -0,0 +1,10 @@
FROM python:3.9-slim
WORKDIR /app
COPY . .
RUN pip install streamlit pandas
ENV PYTHONUNBUFFERED=1
CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0"]

View File

@@ -0,0 +1,105 @@
# 🚀 TradingTwins Lead Engine - Documentation
## 1. Projektübersicht
Die **TradingTwins Lead Engine** ist ein spezialisiertes Automatisierungs-Tool zur Verarbeitung eingehender B2B-Leads (speziell von der Plattform "tradingtwins").
**Ziel:** Minimierung der "Time-to-Response" und Maximierung der Lead-Qualität durch automatische Anreicherung und Bewertung.
Das Tool fungiert als **Middleware** zwischen dem E-Mail-Eingang und dem zentralen Intelligence-Hub (**Company Explorer**).
---
## 2. Architektur & Workflow
### System-Komponenten
1. **Lead Engine (Python/Streamlit):**
* Hält den State der Leads (Neu, Synced, Drafted).
* Bietet ein Dashboard zur Überwachung.
* Führt die Logik aus.
2. **Company Explorer (CE):**
* Single Source of Truth für Firmendaten.
* Führt die eigentliche Web-Recherche (Discovery) und Bewertung (Scoring) durch.
3. **SQLite Datenbank (`data/leads.db`):**
* Lokaler Speicher für Lead-Status und Historie.
### Der Prozess-Flow (Sync Logic)
```mermaid
graph TD
A[E-Mail Eingang] -->|Parser| B(Lead Engine DB)
B --> C{Check: Existiert Firma in CE?}
C -- JA --> D[Hole ID & Status]
C -- NEIN --> E[Erstelle Firma via API]
E --> F[Trigger: Discovery & Enrichment]
D --> G[Update Lead-Status: 'Synced']
F --> G
G --> H[Dashboard: Ready for Response]
```
---
## 3. Installation & Deployment
Das System läuft als Docker-Container.
### Voraussetzungen
* Zugriff auf das `lead-engine-mvp` Git-Repository.
* Laufender **Company Explorer** im selben Netzwerk (oder erreichbar via IP).
### Starten (Docker)
```bash
# 1. Image bauen
docker build -t lead-engine .
# 2. Container starten
# WICHTIG: Ports mappen & Netwerk beachten
docker run -d \
-p 8501:8501 \
--name lead-engine \
-e CE_API_URL="http://192.168.178.6:8090/ce/api" \
-e CE_API_USER="admin" \
-e CE_API_PASS="gemini" \
lead-engine
```
### Environment Variablen
| Variable | Default | Beschreibung |
| :--- | :--- | :--- |
| `CE_API_URL` | `http://192.168.178.6:8090/ce/api` | Basis-URL zur Company Explorer API (via Nginx Proxy) |
| `CE_API_USER` | `admin` | Benutzer für Basic Auth |
| `CE_API_PASS` | `gemini` | Passwort für Basic Auth |
| `PYTHONUNBUFFERED` | `1` | Wichtig für Docker Logs |
---
## 4. Technische Details
### Datenmodell (SQLite)
Die lokale Datenbank `leads.db` speichert:
* `source_id`: ID aus der E-Mail (TradingTwins Lead-ID).
* `raw_body`: Der originale E-Mail-Text.
* `enrichment_data`: JSON-Blob mit Daten vom Company Explorer (Score, Vertical, IDs).
* `status`: `new` -> `synced` -> `drafted` -> `sent`.
### API Integration (Company Explorer)
Das Tool nutzt folgende Endpunkte des CE:
1. **Suche:** `GET /companies?search={Name}`
* Prüft auf Duplikate.
2. **Erstellung:** `POST /companies`
* Payload: `{"name": "...", "city": "..."}`
3. **Discovery:** `POST /enrich/discover`
* Triggert den Crawler, um Webseite und Branche zu finden.
---
## 5. Roadmap / Offene Punkte
* **[ ] IMAP Live-Anbindung:** Ersetzen des aktuellen Mock-Ingests durch echten IMAP-Listener (Skript `check_mail.py` existiert bereits als Vorlage).
* **[ ] Response Generation:** Abruf eines "Sales Pitches" vom Company Explorer basierend auf den angereicherten Daten (Pain Points, Vertical).
* **[ ] CRM Push:** Optionale Weiterleitung an SuperOffice/Odoo nach erfolgreicher Qualifizierung.

75
lead-engine/app.py Normal file
View File

@@ -0,0 +1,75 @@
import streamlit as st
import pandas as pd
from db import get_leads, init_db
import json
from enrich import run_sync # Import our sync function
st.set_page_config(page_title="TradingTwins Lead Engine", layout="wide")
st.title("🚀 Lead Engine: TradingTwins")
# Sidebar Actions
st.sidebar.header("Actions")
if st.sidebar.button("1. Ingest Emails (Mock)"):
from ingest import ingest_mock_leads
init_db()
count = ingest_mock_leads()
st.sidebar.success(f"Ingested {count} new leads.")
st.rerun()
if st.sidebar.button("2. Sync to Company Explorer"):
with st.spinner("Syncing with Company Explorer API..."):
# Capture output for debugging
try:
# We redirect stdout to capture prints
import io
from contextlib import redirect_stdout
f = io.StringIO()
with redirect_stdout(f):
run_sync()
output = f.getvalue()
st.success("Sync finished!")
with st.expander("See Process Log", expanded=True):
st.code(output)
except Exception as e:
st.error(f"Sync Failed: {e}")
# Main View
leads = get_leads()
df = pd.DataFrame(leads)
if not df.empty:
col1, col2, col3 = st.columns(3)
col1.metric("Total Leads", len(df))
col2.metric("New / Unsynced", len(df[df['status'] == 'new']))
col3.metric("Synced to CE", len(df[df['status'] == 'synced']))
st.subheader("Lead Pipeline")
for index, row in df.iterrows():
with st.expander(f"{row['company_name']} ({row['status']})"):
c1, c2 = st.columns(2)
c1.write(f"**Contact:** {row['contact_name']}")
c1.write(f"**Email:** {row['email']}")
c1.text(row['raw_body'][:200] + "...")
enrichment = json.loads(row['enrichment_data']) if row['enrichment_data'] else {}
if enrichment:
c2.write("--- Integration Status ---")
if enrichment.get('ce_id'):
c2.success(f"✅ Linked to Company Explorer (ID: {enrichment['ce_id']})")
c2.write(f"**CE Status:** {enrichment.get('ce_status')}")
else:
c2.warning("⚠️ Not yet synced or failed")
c2.info(f"Log: {enrichment.get('message')}")
if enrichment.get('ce_data'):
c2.json(enrichment['ce_data'])
else:
st.info("No leads found. Click 'Ingest Emails' in the sidebar.")

87
lead-engine/db.py Normal file
View File

@@ -0,0 +1,87 @@
import sqlite3
import json
import os
from datetime import datetime
# Robust path handling:
# 1. Get the directory where this script (db.py) lives
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 2. Define data directory and db path relative to it
DATA_DIR = os.path.join(BASE_DIR, 'data')
DB_PATH = os.path.join(DATA_DIR, 'leads.db')
def init_db():
# Ensure data directory exists
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id TEXT UNIQUE,
received_at TIMESTAMP,
company_name TEXT,
contact_name TEXT,
email TEXT,
phone TEXT,
raw_body TEXT,
enrichment_data TEXT,
status TEXT DEFAULT 'new',
response_draft TEXT,
sent_at TIMESTAMP
)
''')
conn.commit()
conn.close()
def insert_lead(lead_data):
# Ensure DB exists before inserting (if ingest runs before init)
if not os.path.exists(DB_PATH):
init_db()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
try:
c.execute('''
INSERT INTO leads (source_id, received_at, company_name, contact_name, email, phone, raw_body, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
lead_data.get('id'),
datetime.now(),
lead_data.get('company'),
lead_data.get('contact'),
lead_data.get('email'),
lead_data.get('phone'),
lead_data.get('raw_body'),
'new'
))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
finally:
conn.close()
def get_leads():
if not os.path.exists(DB_PATH):
init_db()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT * FROM leads ORDER BY received_at DESC')
rows = c.fetchall()
conn.close()
return [dict(row) for row in rows]
def update_lead_status(lead_id, status, response_draft=None):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
if response_draft:
c.execute('UPDATE leads SET status = ?, response_draft = ? WHERE id = ?', (status, response_draft, lead_id))
else:
c.execute('UPDATE leads SET status = ? WHERE id = ?', (status, lead_id))
conn.commit()
conn.close()

235
lead-engine/enrich.py Normal file
View File

@@ -0,0 +1,235 @@
import json
import requests
import os
import time
from requests.auth import HTTPBasicAuth
from db import update_lead_status, get_leads, DB_PATH
import sqlite3
# --- Configuration ---
CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api")
CE_API_USER = os.getenv("CE_API_USER", "admin")
CE_API_PASS = os.getenv("CE_API_PASS", "gemini")
def get_auth():
return HTTPBasicAuth(CE_API_USER, CE_API_PASS)
# 1. CORE LOGIC: Interaction with Company Explorer
def find_company(name):
"""Prüft, ob Firma im CE existiert."""
try:
res = requests.get(
f"{CE_API_URL}/companies",
params={"search": name, "limit": 1},
auth=get_auth(),
timeout=5
)
if res.status_code == 200:
data = res.json()
if data.get("items"):
return data["items"][0] # Return first match
except Exception as e:
print(f"❌ CE API Error (Find '{name}'): {e}")
return None
def create_company(lead_data):
"""Legt Firma im CE an."""
payload = {
"name": lead_data['company_name'],
"city": lead_data.get('city', ''),
"country": "DE",
"website": None
}
try:
res = requests.post(
f"{CE_API_URL}/companies",
json=payload,
auth=get_auth(),
timeout=5
)
if res.status_code == 200:
return res.json()
else:
print(f"❌ Create Failed: {res.status_code} {res.text}")
except Exception as e:
print(f"❌ CE API Error (Create '{lead_data['company_name']}'): {e}")
return None
def trigger_discovery(company_id):
"""Startet die automatische Webseiten-Suche im CE."""
try:
res = requests.post(
f"{CE_API_URL}/enrich/discover",
json={"company_id": company_id},
auth=get_auth(),
timeout=5
)
return res.status_code == 200
except Exception as e:
print(f"❌ Trigger Discovery Failed (ID {company_id}): {e}")
return False
def get_ce_status(company_id):
"""Holt den aktuellen Status einer Firma aus dem CE."""
try:
res = requests.get(
f"{CE_API_URL}/companies/{company_id}",
auth=get_auth(),
timeout=5
)
if res.status_code == 200:
return res.json()
except Exception as e:
print(f"❌ Get Status Failed (ID {company_id}): {e}")
return None
def get_pitch(company_id):
"""
Versucht, einen generierten Pitch abzurufen.
(Hier simulieren wir es erst mal, oder nutzen ein Feld aus der Company, falls CE das schon bietet)
"""
# TODO: Implement real endpoint once CE has it (e.g. /companies/{id}/analysis/pitch)
# For now, we fetch the company and look for 'industry_ai' or similar fields to construct a simple pitch locally
company = get_ce_status(company_id)
if company and company.get('industry_ai'):
industry = company['industry_ai']
# Simple Template based on industry
return f"Wir haben gesehen, dass Sie im Bereich {industry} tätig sind. Für {industry} haben wir spezielle Roboter-Lösungen..."
return None
# 2. ORCHESTRATION: The Sync Process
def process_new_lead(lead):
"""Phase 1: Identifikation & Anlage"""
print(f"\n⚙️ [Phase 1] Syncing Lead: {lead['company_name']} ({lead['id']})")
ce_company = find_company(lead['company_name'])
enrichment_info = {
"ce_status": "unknown",
"ce_id": None,
"message": "",
"last_check": time.time()
}
if ce_company:
print(f"✅ Company exists in CE (ID: {ce_company['id']})")
enrichment_info["ce_status"] = "linked" # Linked but maybe not fully enriched/fetched
enrichment_info["ce_id"] = ce_company['id']
enrichment_info["ce_data"] = ce_company
enrichment_info["message"] = f"Matched existing account (Status: {ce_company.get('status')})."
# If existing company is not enriched yet, trigger discovery too?
if ce_company.get('status') == 'NEW':
trigger_discovery(ce_company['id'])
enrichment_info["message"] += " Triggered Discovery for existing raw account."
else:
print(f"✨ New Company. Creating in CE...")
city = ""
# Simple extraction
if lead['raw_body'] and "Stadt:" in lead['raw_body']:
try:
for line in lead['raw_body'].split('\n'):
if "Stadt:" in line:
city = line.split("Stadt:")[1].strip()
break
except: pass
new_company = create_company({
"company_name": lead['company_name'],
"city": city
})
if new_company:
print(f"✅ Created (ID: {new_company['id']}).")
enrichment_info["ce_status"] = "created"
enrichment_info["ce_id"] = new_company['id']
if trigger_discovery(new_company['id']):
enrichment_info["message"] = "Created & Discovery started."
print("🚀 Discovery queued.")
else:
enrichment_info["message"] = "Created, but Discovery trigger failed."
else:
enrichment_info["message"] = "Failed to create in CE."
# Save State
update_lead_enrichment(lead['id'], enrichment_info, status='synced')
def process_synced_lead(lead):
"""Phase 2: Closing the Loop (Abrufen der Ergebnisse)"""
enrichment = json.loads(lead['enrichment_data'])
ce_id = enrichment.get('ce_id')
if not ce_id:
return # Should not happen if status is synced
print(f"\n🔄 [Phase 2] Checking Enrichment for: {lead['company_name']} (CE ID: {ce_id})")
company = get_ce_status(ce_id)
if not company:
return
# Check if CE is done
ce_status = company.get('status') # NEW, DISCOVERING, ENRICHED...
if ce_status == 'ENRICHED':
print(f"✅ Analysis Complete! Fetching Pitch...")
# Get Pitch (Mocked or Real)
pitch = get_pitch(ce_id)
if pitch:
enrichment['pitch'] = pitch
enrichment['ce_data'] = company # Update with latest data
enrichment['message'] = "Enrichment complete. Pitch ready."
# Save final state
update_lead_enrichment(lead['id'], enrichment, status='drafted') # 'drafted' means ready for human review
print(f"🎉 Lead is ready for response!")
else:
print("⚠️ Enriched, but no pitch generated yet.")
else:
print(f"⏳ Still processing in CE (Status: {ce_status})...")
def update_lead_enrichment(lead_id, data, status=None):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
if status:
c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?',
(json.dumps(data), status, lead_id))
else:
c.execute('UPDATE leads SET enrichment_data = ? WHERE id = ?',
(json.dumps(data), lead_id))
conn.commit()
conn.close()
def run_sync():
leads = get_leads()
print(f"Scanning {len(leads)} leads...")
count = 0
for lead in leads:
status = lead['status']
# Fallback for old/mock leads or explicit 'new' status
is_mock = False
if lead['enrichment_data']:
try:
enrichment = json.loads(lead['enrichment_data'])
is_mock = enrichment.get('recommendation') == 'Manual Review (Mock Data)'
except: pass
if status == 'new' or not status or is_mock:
process_new_lead(lead)
count += 1
elif status == 'synced':
process_synced_lead(lead)
count += 1
print(f"Sync cycle finished. Processed {count} leads.")
if __name__ == "__main__":
run_sync()

75
lead-engine/ingest.py Normal file
View File

@@ -0,0 +1,75 @@
import re
from db import insert_lead
def parse_tradingtwins_email(body):
data = {}
# Simple regex extraction based on the email format
patterns = {
'id': r'Lead-ID:\s*(\d+)',
'company': r'Firma:\s*(.+)',
'contact_first': r'Vorname:\s*(.+)',
'contact_last': r'Nachname:\s*(.+)',
'email': r'E-Mail:\s*([^\s<]+)',
'phone': r'Rufnummer:\s*([^\n]+)',
'area': r'Reinigungs-Flche:\s*([^\n]+)',
'purpose': r'Einsatzzweck:\s*([^\n]+)'
}
for key, pattern in patterns.items():
match = re.search(pattern, body)
if match:
data[key] = match.group(1).strip()
# Combine names
if 'contact_first' in data and 'contact_last' in data:
data['contact'] = f"{data['contact_first']} {data['contact_last']}"
data['raw_body'] = body
return data
def ingest_mock_leads():
# Mock data from the session context
leads = [
{
'id': '2397256',
'company': 'pronorm Einbauküchen GmbH',
'contact': 'Jakob Funk',
'email': 'jakob.funk@pronorm.de',
'phone': '+49 5733 979175',
'raw_body': """
Lead-ID: 2397256
Firma: pronorm Einbauküchen GmbH
Vorname: Jakob
Nachname: Funk
Reinigungs-Flche: 1.001 - 10.000 qm
Einsatzzweck: Reinigung von Böden
"""
},
{
'id': '2414364',
'company': 'Quad & Rollershop Schwabmünchen GmbH',
'contact': 'Manfred Bihler',
'email': 'Rollershop.Schwabmuenchen@web.de',
'phone': '+49 8232 905246',
'raw_body': """
Lead-ID: 2414364
Firma: Quad & Rollershop Schwabmünchen GmbH
Vorname: Manfred
Nachname: Bihler
Reinigungs-Flche: 301 - 1.000 qm
Einsatzzweck: Reinigung von Böden
"""
}
]
count = 0
for lead in leads:
if insert_lead(lead):
count += 1
return count
if __name__ == "__main__":
from db import init_db
init_db()
print(f"Ingested {ingest_mock_leads()} new leads.")

View File

@@ -0,0 +1,69 @@
import json
import urllib.request
import base64
import os
# Config
API_URL = "http://192.168.178.6:8090/ce/api"
USER = "admin"
PASS = "gemini"
def get_auth_header():
auth_str = f"{USER}:{PASS}"
b64_auth = base64.b64encode(auth_str.encode()).decode()
return {"Authorization": f"Basic {b64_auth}", "Content-Type": "application/json"}
def test_create():
print(f"Testing Create against {API_URL}...")
# 1. Random Name to ensure new creation
import random
company_name = f"Test Company {random.randint(1000,9999)}"
payload = {
"name": company_name,
"city": "Berlin",
"country": "DE"
}
req = urllib.request.Request(
f"{API_URL}/companies",
data=json.dumps(payload).encode('utf-8'),
headers=get_auth_header(),
method="POST"
)
try:
with urllib.request.urlopen(req) as response:
if response.status == 200:
data = json.loads(response.read().decode())
print(f"✅ SUCCESS: Created Company '{company_name}'")
print(f" ID: {data.get('id')}")
print(f" Name: {data.get('name')}")
return data.get('id')
else:
print(f"❌ FAILED: Status {response.status}")
print(response.read().decode())
except Exception as e:
print(f"❌ EXCEPTION: {e}")
return None
def test_find(company_id):
if not company_id: return
print(f"\nVerifying creation via GET /companies/{company_id}...")
req = urllib.request.Request(
f"{API_URL}/companies/{company_id}",
headers=get_auth_header(),
method="GET"
)
try:
with urllib.request.urlopen(req) as response:
data = json.loads(response.read().decode())
print(f"✅ VERIFIED: Found company in DB.")
except Exception as e:
print(f"❌ VERIFICATION FAILED: {e}")
if __name__ == "__main__":
cid = test_create()
test_find(cid)