Add lead-engine source code to main repo
This commit is contained in:
80
lead-engine/AUTOMATION_DESIGN.md
Normal file
80
lead-engine/AUTOMATION_DESIGN.md
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
# 🤖 Automation Workflow Design (n8n + Python)
|
||||||
|
|
||||||
|
This document outlines the architecture for the **TradingTwins Lead Engine** automation, combining **n8n** (Trigger/Flow Control) and **Python** (Business Logic/Sync).
|
||||||
|
|
||||||
|
## 1. High-Level Architecture
|
||||||
|
|
||||||
|
We use a **Hybrid Approach**:
|
||||||
|
* **n8n:** Handles connectivity (IMAP, Slack, CRM) and triggers.
|
||||||
|
* **Python Lead Engine:** Handles data processing, parsing, and syncing with Company Explorer.
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
participant Mail as 📧 Email Server (IMAP)
|
||||||
|
participant n8n as ⚡ n8n Workflow
|
||||||
|
participant API as 🐍 Lead Engine API
|
||||||
|
participant DB as 🗄️ SQLite (leads.db)
|
||||||
|
participant CE as 🏢 Company Explorer
|
||||||
|
|
||||||
|
Mail->>n8n: New Email (Subject: "Lead...")
|
||||||
|
n8n->>n8n: Extract Body & Subject
|
||||||
|
n8n->>API: POST /api/ingest { "body": "...", "subject": "..." }
|
||||||
|
|
||||||
|
rect rgb(240, 248, 255)
|
||||||
|
Note right of API: Python Async Process
|
||||||
|
API->>DB: Save Lead (Status: 'new')
|
||||||
|
API-->>n8n: 200 OK (Lead Queued)
|
||||||
|
|
||||||
|
API->>CE: Check Existence / Create
|
||||||
|
CE-->>API: Company ID
|
||||||
|
API->>CE: Trigger Discovery
|
||||||
|
API->>DB: Update Status: 'synced'
|
||||||
|
end
|
||||||
|
|
||||||
|
API->>n8n: (Optional) Webhook "Sync Complete"
|
||||||
|
n8n->>Slack: Notify Sales "New Lead Synced!"
|
||||||
|
```
|
||||||
|
|
||||||
|
## 2. n8n Workflow Specification
|
||||||
|
|
||||||
|
**Trigger:** `IMAP Read` Node
|
||||||
|
* **Filter:** Subject contains "TradingTwins" OR Sender is "notification@tradingtwins.com"
|
||||||
|
* **Interval:** Every 5 minutes (or 1 min).
|
||||||
|
|
||||||
|
**Action:** `HTTP Request` Node
|
||||||
|
* **Method:** POST
|
||||||
|
* **URL:** `http://lead-engine:8000/api/ingest` (Internal Docker Network)
|
||||||
|
* **Body:** JSON
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"source": "tradingtwins",
|
||||||
|
"subject": "New Lead: Musterfirma GmbH",
|
||||||
|
"raw_body": "..."
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 3. Python Lead Engine Changes
|
||||||
|
|
||||||
|
To support this, the current Streamlit-only application needs a **FastAPI backend** (or similar) running alongside or instead of the Streamlit loop for API access.
|
||||||
|
|
||||||
|
### Requirements:
|
||||||
|
1. **Split `app.py`:** Separate UI (Streamlit) from Logic/API.
|
||||||
|
2. **New `api.py`:** FastAPI instance exposing `POST /api/ingest`.
|
||||||
|
3. **Refactor `enrich.py`:** Ensure `run_sync` can be called programmatically for a specific lead ID.
|
||||||
|
|
||||||
|
### Proposed File Structure:
|
||||||
|
```
|
||||||
|
lead-engine/
|
||||||
|
├── api.py # FastAPI (New)
|
||||||
|
├── ui.py # Streamlit (Renamed from app.py)
|
||||||
|
├── core/
|
||||||
|
│ ├── db.py # Shared DB access
|
||||||
|
│ ├── enrich.py # Sync Logic
|
||||||
|
│ └── parser.py # Parsing Logic
|
||||||
|
├── Dockerfile # Needs to run both (e.g. via supervisord or entrypoint script)
|
||||||
|
```
|
||||||
|
|
||||||
|
## 4. Why this approach?
|
||||||
|
* **Reliability:** n8n is better at keeping IMAP connections alive than a custom Python loop.
|
||||||
|
* **Scalability:** We can easily add other lead sources (e.g., Typeform, LinkedIn) just by adding another n8n trigger pointing to the same Python API.
|
||||||
|
* **Focus:** Python code focuses purely on "What to do with the data", not "How to get the data".
|
||||||
10
lead-engine/Dockerfile
Normal file
10
lead-engine/Dockerfile
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
FROM python:3.9-slim
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
RUN pip install streamlit pandas
|
||||||
|
|
||||||
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0"]
|
||||||
105
lead-engine/TRADING_TWINS_DOCUMENTATION.md
Normal file
105
lead-engine/TRADING_TWINS_DOCUMENTATION.md
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
# 🚀 TradingTwins Lead Engine - Documentation
|
||||||
|
|
||||||
|
## 1. Projektübersicht
|
||||||
|
Die **TradingTwins Lead Engine** ist ein spezialisiertes Automatisierungs-Tool zur Verarbeitung eingehender B2B-Leads (speziell von der Plattform "tradingtwins").
|
||||||
|
|
||||||
|
**Ziel:** Minimierung der "Time-to-Response" und Maximierung der Lead-Qualität durch automatische Anreicherung und Bewertung.
|
||||||
|
|
||||||
|
Das Tool fungiert als **Middleware** zwischen dem E-Mail-Eingang und dem zentralen Intelligence-Hub (**Company Explorer**).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2. Architektur & Workflow
|
||||||
|
|
||||||
|
### System-Komponenten
|
||||||
|
1. **Lead Engine (Python/Streamlit):**
|
||||||
|
* Hält den State der Leads (Neu, Synced, Drafted).
|
||||||
|
* Bietet ein Dashboard zur Überwachung.
|
||||||
|
* Führt die Logik aus.
|
||||||
|
2. **Company Explorer (CE):**
|
||||||
|
* Single Source of Truth für Firmendaten.
|
||||||
|
* Führt die eigentliche Web-Recherche (Discovery) und Bewertung (Scoring) durch.
|
||||||
|
3. **SQLite Datenbank (`data/leads.db`):**
|
||||||
|
* Lokaler Speicher für Lead-Status und Historie.
|
||||||
|
|
||||||
|
### Der Prozess-Flow (Sync Logic)
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
graph TD
|
||||||
|
A[E-Mail Eingang] -->|Parser| B(Lead Engine DB)
|
||||||
|
B --> C{Check: Existiert Firma in CE?}
|
||||||
|
|
||||||
|
C -- JA --> D[Hole ID & Status]
|
||||||
|
C -- NEIN --> E[Erstelle Firma via API]
|
||||||
|
|
||||||
|
E --> F[Trigger: Discovery & Enrichment]
|
||||||
|
D --> G[Update Lead-Status: 'Synced']
|
||||||
|
F --> G
|
||||||
|
|
||||||
|
G --> H[Dashboard: Ready for Response]
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3. Installation & Deployment
|
||||||
|
|
||||||
|
Das System läuft als Docker-Container.
|
||||||
|
|
||||||
|
### Voraussetzungen
|
||||||
|
* Zugriff auf das `lead-engine-mvp` Git-Repository.
|
||||||
|
* Laufender **Company Explorer** im selben Netzwerk (oder erreichbar via IP).
|
||||||
|
|
||||||
|
### Starten (Docker)
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 1. Image bauen
|
||||||
|
docker build -t lead-engine .
|
||||||
|
|
||||||
|
# 2. Container starten
|
||||||
|
# WICHTIG: Ports mappen & Netwerk beachten
|
||||||
|
docker run -d \
|
||||||
|
-p 8501:8501 \
|
||||||
|
--name lead-engine \
|
||||||
|
-e CE_API_URL="http://192.168.178.6:8090/ce/api" \
|
||||||
|
-e CE_API_USER="admin" \
|
||||||
|
-e CE_API_PASS="gemini" \
|
||||||
|
lead-engine
|
||||||
|
```
|
||||||
|
|
||||||
|
### Environment Variablen
|
||||||
|
|
||||||
|
| Variable | Default | Beschreibung |
|
||||||
|
| :--- | :--- | :--- |
|
||||||
|
| `CE_API_URL` | `http://192.168.178.6:8090/ce/api` | Basis-URL zur Company Explorer API (via Nginx Proxy) |
|
||||||
|
| `CE_API_USER` | `admin` | Benutzer für Basic Auth |
|
||||||
|
| `CE_API_PASS` | `gemini` | Passwort für Basic Auth |
|
||||||
|
| `PYTHONUNBUFFERED` | `1` | Wichtig für Docker Logs |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 4. Technische Details
|
||||||
|
|
||||||
|
### Datenmodell (SQLite)
|
||||||
|
Die lokale Datenbank `leads.db` speichert:
|
||||||
|
* `source_id`: ID aus der E-Mail (TradingTwins Lead-ID).
|
||||||
|
* `raw_body`: Der originale E-Mail-Text.
|
||||||
|
* `enrichment_data`: JSON-Blob mit Daten vom Company Explorer (Score, Vertical, IDs).
|
||||||
|
* `status`: `new` -> `synced` -> `drafted` -> `sent`.
|
||||||
|
|
||||||
|
### API Integration (Company Explorer)
|
||||||
|
Das Tool nutzt folgende Endpunkte des CE:
|
||||||
|
|
||||||
|
1. **Suche:** `GET /companies?search={Name}`
|
||||||
|
* Prüft auf Duplikate.
|
||||||
|
2. **Erstellung:** `POST /companies`
|
||||||
|
* Payload: `{"name": "...", "city": "..."}`
|
||||||
|
3. **Discovery:** `POST /enrich/discover`
|
||||||
|
* Triggert den Crawler, um Webseite und Branche zu finden.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 5. Roadmap / Offene Punkte
|
||||||
|
|
||||||
|
* **[ ] IMAP Live-Anbindung:** Ersetzen des aktuellen Mock-Ingests durch echten IMAP-Listener (Skript `check_mail.py` existiert bereits als Vorlage).
|
||||||
|
* **[ ] Response Generation:** Abruf eines "Sales Pitches" vom Company Explorer basierend auf den angereicherten Daten (Pain Points, Vertical).
|
||||||
|
* **[ ] CRM Push:** Optionale Weiterleitung an SuperOffice/Odoo nach erfolgreicher Qualifizierung.
|
||||||
75
lead-engine/app.py
Normal file
75
lead-engine/app.py
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
import streamlit as st
|
||||||
|
import pandas as pd
|
||||||
|
from db import get_leads, init_db
|
||||||
|
import json
|
||||||
|
from enrich import run_sync # Import our sync function
|
||||||
|
|
||||||
|
st.set_page_config(page_title="TradingTwins Lead Engine", layout="wide")
|
||||||
|
|
||||||
|
st.title("🚀 Lead Engine: TradingTwins")
|
||||||
|
|
||||||
|
# Sidebar Actions
|
||||||
|
st.sidebar.header("Actions")
|
||||||
|
|
||||||
|
if st.sidebar.button("1. Ingest Emails (Mock)"):
|
||||||
|
from ingest import ingest_mock_leads
|
||||||
|
init_db()
|
||||||
|
count = ingest_mock_leads()
|
||||||
|
st.sidebar.success(f"Ingested {count} new leads.")
|
||||||
|
st.rerun()
|
||||||
|
|
||||||
|
if st.sidebar.button("2. Sync to Company Explorer"):
|
||||||
|
with st.spinner("Syncing with Company Explorer API..."):
|
||||||
|
# Capture output for debugging
|
||||||
|
try:
|
||||||
|
# We redirect stdout to capture prints
|
||||||
|
import io
|
||||||
|
from contextlib import redirect_stdout
|
||||||
|
f = io.StringIO()
|
||||||
|
with redirect_stdout(f):
|
||||||
|
run_sync()
|
||||||
|
output = f.getvalue()
|
||||||
|
|
||||||
|
st.success("Sync finished!")
|
||||||
|
with st.expander("See Process Log", expanded=True):
|
||||||
|
st.code(output)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
st.error(f"Sync Failed: {e}")
|
||||||
|
|
||||||
|
# Main View
|
||||||
|
leads = get_leads()
|
||||||
|
df = pd.DataFrame(leads)
|
||||||
|
|
||||||
|
if not df.empty:
|
||||||
|
col1, col2, col3 = st.columns(3)
|
||||||
|
col1.metric("Total Leads", len(df))
|
||||||
|
col2.metric("New / Unsynced", len(df[df['status'] == 'new']))
|
||||||
|
col3.metric("Synced to CE", len(df[df['status'] == 'synced']))
|
||||||
|
|
||||||
|
st.subheader("Lead Pipeline")
|
||||||
|
|
||||||
|
for index, row in df.iterrows():
|
||||||
|
with st.expander(f"{row['company_name']} ({row['status']})"):
|
||||||
|
c1, c2 = st.columns(2)
|
||||||
|
c1.write(f"**Contact:** {row['contact_name']}")
|
||||||
|
c1.write(f"**Email:** {row['email']}")
|
||||||
|
c1.text(row['raw_body'][:200] + "...")
|
||||||
|
|
||||||
|
enrichment = json.loads(row['enrichment_data']) if row['enrichment_data'] else {}
|
||||||
|
|
||||||
|
if enrichment:
|
||||||
|
c2.write("--- Integration Status ---")
|
||||||
|
if enrichment.get('ce_id'):
|
||||||
|
c2.success(f"✅ Linked to Company Explorer (ID: {enrichment['ce_id']})")
|
||||||
|
c2.write(f"**CE Status:** {enrichment.get('ce_status')}")
|
||||||
|
else:
|
||||||
|
c2.warning("⚠️ Not yet synced or failed")
|
||||||
|
|
||||||
|
c2.info(f"Log: {enrichment.get('message')}")
|
||||||
|
|
||||||
|
if enrichment.get('ce_data'):
|
||||||
|
c2.json(enrichment['ce_data'])
|
||||||
|
|
||||||
|
else:
|
||||||
|
st.info("No leads found. Click 'Ingest Emails' in the sidebar.")
|
||||||
87
lead-engine/db.py
Normal file
87
lead-engine/db.py
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
import sqlite3
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
# Robust path handling:
|
||||||
|
# 1. Get the directory where this script (db.py) lives
|
||||||
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
# 2. Define data directory and db path relative to it
|
||||||
|
DATA_DIR = os.path.join(BASE_DIR, 'data')
|
||||||
|
DB_PATH = os.path.join(DATA_DIR, 'leads.db')
|
||||||
|
|
||||||
|
def init_db():
|
||||||
|
# Ensure data directory exists
|
||||||
|
if not os.path.exists(DATA_DIR):
|
||||||
|
os.makedirs(DATA_DIR)
|
||||||
|
|
||||||
|
conn = sqlite3.connect(DB_PATH)
|
||||||
|
c = conn.cursor()
|
||||||
|
c.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS leads (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
source_id TEXT UNIQUE,
|
||||||
|
received_at TIMESTAMP,
|
||||||
|
company_name TEXT,
|
||||||
|
contact_name TEXT,
|
||||||
|
email TEXT,
|
||||||
|
phone TEXT,
|
||||||
|
raw_body TEXT,
|
||||||
|
enrichment_data TEXT,
|
||||||
|
status TEXT DEFAULT 'new',
|
||||||
|
response_draft TEXT,
|
||||||
|
sent_at TIMESTAMP
|
||||||
|
)
|
||||||
|
''')
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def insert_lead(lead_data):
|
||||||
|
# Ensure DB exists before inserting (if ingest runs before init)
|
||||||
|
if not os.path.exists(DB_PATH):
|
||||||
|
init_db()
|
||||||
|
|
||||||
|
conn = sqlite3.connect(DB_PATH)
|
||||||
|
c = conn.cursor()
|
||||||
|
try:
|
||||||
|
c.execute('''
|
||||||
|
INSERT INTO leads (source_id, received_at, company_name, contact_name, email, phone, raw_body, status)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
''', (
|
||||||
|
lead_data.get('id'),
|
||||||
|
datetime.now(),
|
||||||
|
lead_data.get('company'),
|
||||||
|
lead_data.get('contact'),
|
||||||
|
lead_data.get('email'),
|
||||||
|
lead_data.get('phone'),
|
||||||
|
lead_data.get('raw_body'),
|
||||||
|
'new'
|
||||||
|
))
|
||||||
|
conn.commit()
|
||||||
|
return True
|
||||||
|
except sqlite3.IntegrityError:
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def get_leads():
|
||||||
|
if not os.path.exists(DB_PATH):
|
||||||
|
init_db()
|
||||||
|
|
||||||
|
conn = sqlite3.connect(DB_PATH)
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
c = conn.cursor()
|
||||||
|
c.execute('SELECT * FROM leads ORDER BY received_at DESC')
|
||||||
|
rows = c.fetchall()
|
||||||
|
conn.close()
|
||||||
|
return [dict(row) for row in rows]
|
||||||
|
|
||||||
|
def update_lead_status(lead_id, status, response_draft=None):
|
||||||
|
conn = sqlite3.connect(DB_PATH)
|
||||||
|
c = conn.cursor()
|
||||||
|
if response_draft:
|
||||||
|
c.execute('UPDATE leads SET status = ?, response_draft = ? WHERE id = ?', (status, response_draft, lead_id))
|
||||||
|
else:
|
||||||
|
c.execute('UPDATE leads SET status = ? WHERE id = ?', (status, lead_id))
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
235
lead-engine/enrich.py
Normal file
235
lead-engine/enrich.py
Normal file
@@ -0,0 +1,235 @@
|
|||||||
|
import json
|
||||||
|
import requests
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from requests.auth import HTTPBasicAuth
|
||||||
|
from db import update_lead_status, get_leads, DB_PATH
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
# --- Configuration ---
|
||||||
|
CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api")
|
||||||
|
CE_API_USER = os.getenv("CE_API_USER", "admin")
|
||||||
|
CE_API_PASS = os.getenv("CE_API_PASS", "gemini")
|
||||||
|
|
||||||
|
def get_auth():
|
||||||
|
return HTTPBasicAuth(CE_API_USER, CE_API_PASS)
|
||||||
|
|
||||||
|
# 1. CORE LOGIC: Interaction with Company Explorer
|
||||||
|
|
||||||
|
def find_company(name):
|
||||||
|
"""Prüft, ob Firma im CE existiert."""
|
||||||
|
try:
|
||||||
|
res = requests.get(
|
||||||
|
f"{CE_API_URL}/companies",
|
||||||
|
params={"search": name, "limit": 1},
|
||||||
|
auth=get_auth(),
|
||||||
|
timeout=5
|
||||||
|
)
|
||||||
|
if res.status_code == 200:
|
||||||
|
data = res.json()
|
||||||
|
if data.get("items"):
|
||||||
|
return data["items"][0] # Return first match
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ CE API Error (Find '{name}'): {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def create_company(lead_data):
|
||||||
|
"""Legt Firma im CE an."""
|
||||||
|
payload = {
|
||||||
|
"name": lead_data['company_name'],
|
||||||
|
"city": lead_data.get('city', ''),
|
||||||
|
"country": "DE",
|
||||||
|
"website": None
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
res = requests.post(
|
||||||
|
f"{CE_API_URL}/companies",
|
||||||
|
json=payload,
|
||||||
|
auth=get_auth(),
|
||||||
|
timeout=5
|
||||||
|
)
|
||||||
|
if res.status_code == 200:
|
||||||
|
return res.json()
|
||||||
|
else:
|
||||||
|
print(f"❌ Create Failed: {res.status_code} {res.text}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ CE API Error (Create '{lead_data['company_name']}'): {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def trigger_discovery(company_id):
|
||||||
|
"""Startet die automatische Webseiten-Suche im CE."""
|
||||||
|
try:
|
||||||
|
res = requests.post(
|
||||||
|
f"{CE_API_URL}/enrich/discover",
|
||||||
|
json={"company_id": company_id},
|
||||||
|
auth=get_auth(),
|
||||||
|
timeout=5
|
||||||
|
)
|
||||||
|
return res.status_code == 200
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Trigger Discovery Failed (ID {company_id}): {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_ce_status(company_id):
|
||||||
|
"""Holt den aktuellen Status einer Firma aus dem CE."""
|
||||||
|
try:
|
||||||
|
res = requests.get(
|
||||||
|
f"{CE_API_URL}/companies/{company_id}",
|
||||||
|
auth=get_auth(),
|
||||||
|
timeout=5
|
||||||
|
)
|
||||||
|
if res.status_code == 200:
|
||||||
|
return res.json()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Get Status Failed (ID {company_id}): {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_pitch(company_id):
|
||||||
|
"""
|
||||||
|
Versucht, einen generierten Pitch abzurufen.
|
||||||
|
(Hier simulieren wir es erst mal, oder nutzen ein Feld aus der Company, falls CE das schon bietet)
|
||||||
|
"""
|
||||||
|
# TODO: Implement real endpoint once CE has it (e.g. /companies/{id}/analysis/pitch)
|
||||||
|
# For now, we fetch the company and look for 'industry_ai' or similar fields to construct a simple pitch locally
|
||||||
|
company = get_ce_status(company_id)
|
||||||
|
if company and company.get('industry_ai'):
|
||||||
|
industry = company['industry_ai']
|
||||||
|
# Simple Template based on industry
|
||||||
|
return f"Wir haben gesehen, dass Sie im Bereich {industry} tätig sind. Für {industry} haben wir spezielle Roboter-Lösungen..."
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 2. ORCHESTRATION: The Sync Process
|
||||||
|
|
||||||
|
def process_new_lead(lead):
|
||||||
|
"""Phase 1: Identifikation & Anlage"""
|
||||||
|
print(f"\n⚙️ [Phase 1] Syncing Lead: {lead['company_name']} ({lead['id']})")
|
||||||
|
|
||||||
|
ce_company = find_company(lead['company_name'])
|
||||||
|
|
||||||
|
enrichment_info = {
|
||||||
|
"ce_status": "unknown",
|
||||||
|
"ce_id": None,
|
||||||
|
"message": "",
|
||||||
|
"last_check": time.time()
|
||||||
|
}
|
||||||
|
|
||||||
|
if ce_company:
|
||||||
|
print(f"✅ Company exists in CE (ID: {ce_company['id']})")
|
||||||
|
enrichment_info["ce_status"] = "linked" # Linked but maybe not fully enriched/fetched
|
||||||
|
enrichment_info["ce_id"] = ce_company['id']
|
||||||
|
enrichment_info["ce_data"] = ce_company
|
||||||
|
enrichment_info["message"] = f"Matched existing account (Status: {ce_company.get('status')})."
|
||||||
|
|
||||||
|
# If existing company is not enriched yet, trigger discovery too?
|
||||||
|
if ce_company.get('status') == 'NEW':
|
||||||
|
trigger_discovery(ce_company['id'])
|
||||||
|
enrichment_info["message"] += " Triggered Discovery for existing raw account."
|
||||||
|
|
||||||
|
else:
|
||||||
|
print(f"✨ New Company. Creating in CE...")
|
||||||
|
city = ""
|
||||||
|
# Simple extraction
|
||||||
|
if lead['raw_body'] and "Stadt:" in lead['raw_body']:
|
||||||
|
try:
|
||||||
|
for line in lead['raw_body'].split('\n'):
|
||||||
|
if "Stadt:" in line:
|
||||||
|
city = line.split("Stadt:")[1].strip()
|
||||||
|
break
|
||||||
|
except: pass
|
||||||
|
|
||||||
|
new_company = create_company({
|
||||||
|
"company_name": lead['company_name'],
|
||||||
|
"city": city
|
||||||
|
})
|
||||||
|
|
||||||
|
if new_company:
|
||||||
|
print(f"✅ Created (ID: {new_company['id']}).")
|
||||||
|
enrichment_info["ce_status"] = "created"
|
||||||
|
enrichment_info["ce_id"] = new_company['id']
|
||||||
|
|
||||||
|
if trigger_discovery(new_company['id']):
|
||||||
|
enrichment_info["message"] = "Created & Discovery started."
|
||||||
|
print("🚀 Discovery queued.")
|
||||||
|
else:
|
||||||
|
enrichment_info["message"] = "Created, but Discovery trigger failed."
|
||||||
|
else:
|
||||||
|
enrichment_info["message"] = "Failed to create in CE."
|
||||||
|
|
||||||
|
# Save State
|
||||||
|
update_lead_enrichment(lead['id'], enrichment_info, status='synced')
|
||||||
|
|
||||||
|
def process_synced_lead(lead):
|
||||||
|
"""Phase 2: Closing the Loop (Abrufen der Ergebnisse)"""
|
||||||
|
enrichment = json.loads(lead['enrichment_data'])
|
||||||
|
ce_id = enrichment.get('ce_id')
|
||||||
|
|
||||||
|
if not ce_id:
|
||||||
|
return # Should not happen if status is synced
|
||||||
|
|
||||||
|
print(f"\n🔄 [Phase 2] Checking Enrichment for: {lead['company_name']} (CE ID: {ce_id})")
|
||||||
|
|
||||||
|
company = get_ce_status(ce_id)
|
||||||
|
if not company:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check if CE is done
|
||||||
|
ce_status = company.get('status') # NEW, DISCOVERING, ENRICHED...
|
||||||
|
|
||||||
|
if ce_status == 'ENRICHED':
|
||||||
|
print(f"✅ Analysis Complete! Fetching Pitch...")
|
||||||
|
|
||||||
|
# Get Pitch (Mocked or Real)
|
||||||
|
pitch = get_pitch(ce_id)
|
||||||
|
|
||||||
|
if pitch:
|
||||||
|
enrichment['pitch'] = pitch
|
||||||
|
enrichment['ce_data'] = company # Update with latest data
|
||||||
|
enrichment['message'] = "Enrichment complete. Pitch ready."
|
||||||
|
|
||||||
|
# Save final state
|
||||||
|
update_lead_enrichment(lead['id'], enrichment, status='drafted') # 'drafted' means ready for human review
|
||||||
|
print(f"🎉 Lead is ready for response!")
|
||||||
|
else:
|
||||||
|
print("⚠️ Enriched, but no pitch generated yet.")
|
||||||
|
else:
|
||||||
|
print(f"⏳ Still processing in CE (Status: {ce_status})...")
|
||||||
|
|
||||||
|
def update_lead_enrichment(lead_id, data, status=None):
|
||||||
|
conn = sqlite3.connect(DB_PATH)
|
||||||
|
c = conn.cursor()
|
||||||
|
if status:
|
||||||
|
c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?',
|
||||||
|
(json.dumps(data), status, lead_id))
|
||||||
|
else:
|
||||||
|
c.execute('UPDATE leads SET enrichment_data = ? WHERE id = ?',
|
||||||
|
(json.dumps(data), lead_id))
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def run_sync():
|
||||||
|
leads = get_leads()
|
||||||
|
print(f"Scanning {len(leads)} leads...")
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
for lead in leads:
|
||||||
|
status = lead['status']
|
||||||
|
|
||||||
|
# Fallback for old/mock leads or explicit 'new' status
|
||||||
|
is_mock = False
|
||||||
|
if lead['enrichment_data']:
|
||||||
|
try:
|
||||||
|
enrichment = json.loads(lead['enrichment_data'])
|
||||||
|
is_mock = enrichment.get('recommendation') == 'Manual Review (Mock Data)'
|
||||||
|
except: pass
|
||||||
|
|
||||||
|
if status == 'new' or not status or is_mock:
|
||||||
|
process_new_lead(lead)
|
||||||
|
count += 1
|
||||||
|
elif status == 'synced':
|
||||||
|
process_synced_lead(lead)
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
print(f"Sync cycle finished. Processed {count} leads.")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
run_sync()
|
||||||
75
lead-engine/ingest.py
Normal file
75
lead-engine/ingest.py
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
import re
|
||||||
|
from db import insert_lead
|
||||||
|
|
||||||
|
def parse_tradingtwins_email(body):
|
||||||
|
data = {}
|
||||||
|
|
||||||
|
# Simple regex extraction based on the email format
|
||||||
|
patterns = {
|
||||||
|
'id': r'Lead-ID:\s*(\d+)',
|
||||||
|
'company': r'Firma:\s*(.+)',
|
||||||
|
'contact_first': r'Vorname:\s*(.+)',
|
||||||
|
'contact_last': r'Nachname:\s*(.+)',
|
||||||
|
'email': r'E-Mail:\s*([^\s<]+)',
|
||||||
|
'phone': r'Rufnummer:\s*([^\n]+)',
|
||||||
|
'area': r'Reinigungs-Flche:\s*([^\n]+)',
|
||||||
|
'purpose': r'Einsatzzweck:\s*([^\n]+)'
|
||||||
|
}
|
||||||
|
|
||||||
|
for key, pattern in patterns.items():
|
||||||
|
match = re.search(pattern, body)
|
||||||
|
if match:
|
||||||
|
data[key] = match.group(1).strip()
|
||||||
|
|
||||||
|
# Combine names
|
||||||
|
if 'contact_first' in data and 'contact_last' in data:
|
||||||
|
data['contact'] = f"{data['contact_first']} {data['contact_last']}"
|
||||||
|
|
||||||
|
data['raw_body'] = body
|
||||||
|
return data
|
||||||
|
|
||||||
|
def ingest_mock_leads():
|
||||||
|
# Mock data from the session context
|
||||||
|
leads = [
|
||||||
|
{
|
||||||
|
'id': '2397256',
|
||||||
|
'company': 'pronorm Einbauküchen GmbH',
|
||||||
|
'contact': 'Jakob Funk',
|
||||||
|
'email': 'jakob.funk@pronorm.de',
|
||||||
|
'phone': '+49 5733 979175',
|
||||||
|
'raw_body': """
|
||||||
|
Lead-ID: 2397256
|
||||||
|
Firma: pronorm Einbauküchen GmbH
|
||||||
|
Vorname: Jakob
|
||||||
|
Nachname: Funk
|
||||||
|
Reinigungs-Flche: 1.001 - 10.000 qm
|
||||||
|
Einsatzzweck: Reinigung von Böden
|
||||||
|
"""
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': '2414364',
|
||||||
|
'company': 'Quad & Rollershop Schwabmünchen GmbH',
|
||||||
|
'contact': 'Manfred Bihler',
|
||||||
|
'email': 'Rollershop.Schwabmuenchen@web.de',
|
||||||
|
'phone': '+49 8232 905246',
|
||||||
|
'raw_body': """
|
||||||
|
Lead-ID: 2414364
|
||||||
|
Firma: Quad & Rollershop Schwabmünchen GmbH
|
||||||
|
Vorname: Manfred
|
||||||
|
Nachname: Bihler
|
||||||
|
Reinigungs-Flche: 301 - 1.000 qm
|
||||||
|
Einsatzzweck: Reinigung von Böden
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
for lead in leads:
|
||||||
|
if insert_lead(lead):
|
||||||
|
count += 1
|
||||||
|
return count
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
from db import init_db
|
||||||
|
init_db()
|
||||||
|
print(f"Ingested {ingest_mock_leads()} new leads.")
|
||||||
69
lead-engine/test_create_company.py
Normal file
69
lead-engine/test_create_company.py
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
import json
|
||||||
|
import urllib.request
|
||||||
|
import base64
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Config
|
||||||
|
API_URL = "http://192.168.178.6:8090/ce/api"
|
||||||
|
USER = "admin"
|
||||||
|
PASS = "gemini"
|
||||||
|
|
||||||
|
def get_auth_header():
|
||||||
|
auth_str = f"{USER}:{PASS}"
|
||||||
|
b64_auth = base64.b64encode(auth_str.encode()).decode()
|
||||||
|
return {"Authorization": f"Basic {b64_auth}", "Content-Type": "application/json"}
|
||||||
|
|
||||||
|
def test_create():
|
||||||
|
print(f"Testing Create against {API_URL}...")
|
||||||
|
|
||||||
|
# 1. Random Name to ensure new creation
|
||||||
|
import random
|
||||||
|
company_name = f"Test Company {random.randint(1000,9999)}"
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"name": company_name,
|
||||||
|
"city": "Berlin",
|
||||||
|
"country": "DE"
|
||||||
|
}
|
||||||
|
|
||||||
|
req = urllib.request.Request(
|
||||||
|
f"{API_URL}/companies",
|
||||||
|
data=json.dumps(payload).encode('utf-8'),
|
||||||
|
headers=get_auth_header(),
|
||||||
|
method="POST"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req) as response:
|
||||||
|
if response.status == 200:
|
||||||
|
data = json.loads(response.read().decode())
|
||||||
|
print(f"✅ SUCCESS: Created Company '{company_name}'")
|
||||||
|
print(f" ID: {data.get('id')}")
|
||||||
|
print(f" Name: {data.get('name')}")
|
||||||
|
return data.get('id')
|
||||||
|
else:
|
||||||
|
print(f"❌ FAILED: Status {response.status}")
|
||||||
|
print(response.read().decode())
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ EXCEPTION: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def test_find(company_id):
|
||||||
|
if not company_id: return
|
||||||
|
|
||||||
|
print(f"\nVerifying creation via GET /companies/{company_id}...")
|
||||||
|
req = urllib.request.Request(
|
||||||
|
f"{API_URL}/companies/{company_id}",
|
||||||
|
headers=get_auth_header(),
|
||||||
|
method="GET"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req) as response:
|
||||||
|
data = json.loads(response.read().decode())
|
||||||
|
print(f"✅ VERIFIED: Found company in DB.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ VERIFICATION FAILED: {e}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
cid = test_create()
|
||||||
|
test_find(cid)
|
||||||
Reference in New Issue
Block a user