From 30c19a546efe1aa16d5688cd5f1f2f3c34526e45 Mon Sep 17 00:00:00 2001 From: Jarvis Date: Sat, 31 Jan 2026 17:25:01 +0000 Subject: [PATCH] Add lead-engine source code to main repo --- lead-engine/AUTOMATION_DESIGN.md | 80 +++++++ lead-engine/Dockerfile | 10 + lead-engine/TRADING_TWINS_DOCUMENTATION.md | 105 +++++++++ lead-engine/app.py | 75 +++++++ lead-engine/db.py | 87 ++++++++ lead-engine/enrich.py | 235 +++++++++++++++++++++ lead-engine/ingest.py | 75 +++++++ lead-engine/test_create_company.py | 69 ++++++ 8 files changed, 736 insertions(+) create mode 100644 lead-engine/AUTOMATION_DESIGN.md create mode 100644 lead-engine/Dockerfile create mode 100644 lead-engine/TRADING_TWINS_DOCUMENTATION.md create mode 100644 lead-engine/app.py create mode 100644 lead-engine/db.py create mode 100644 lead-engine/enrich.py create mode 100644 lead-engine/ingest.py create mode 100644 lead-engine/test_create_company.py diff --git a/lead-engine/AUTOMATION_DESIGN.md b/lead-engine/AUTOMATION_DESIGN.md new file mode 100644 index 00000000..a0118dd4 --- /dev/null +++ b/lead-engine/AUTOMATION_DESIGN.md @@ -0,0 +1,80 @@ +# 🤖 Automation Workflow Design (n8n + Python) + +This document outlines the architecture for the **TradingTwins Lead Engine** automation, combining **n8n** (Trigger/Flow Control) and **Python** (Business Logic/Sync). + +## 1. High-Level Architecture + +We use a **Hybrid Approach**: +* **n8n:** Handles connectivity (IMAP, Slack, CRM) and triggers. +* **Python Lead Engine:** Handles data processing, parsing, and syncing with Company Explorer. + +```mermaid +sequenceDiagram + participant Mail as 📧 Email Server (IMAP) + participant n8n as ⚡ n8n Workflow + participant API as 🐍 Lead Engine API + participant DB as 🗄️ SQLite (leads.db) + participant CE as 🏢 Company Explorer + + Mail->>n8n: New Email (Subject: "Lead...") + n8n->>n8n: Extract Body & Subject + n8n->>API: POST /api/ingest { "body": "...", "subject": "..." } + + rect rgb(240, 248, 255) + Note right of API: Python Async Process + API->>DB: Save Lead (Status: 'new') + API-->>n8n: 200 OK (Lead Queued) + + API->>CE: Check Existence / Create + CE-->>API: Company ID + API->>CE: Trigger Discovery + API->>DB: Update Status: 'synced' + end + + API->>n8n: (Optional) Webhook "Sync Complete" + n8n->>Slack: Notify Sales "New Lead Synced!" +``` + +## 2. n8n Workflow Specification + +**Trigger:** `IMAP Read` Node +* **Filter:** Subject contains "TradingTwins" OR Sender is "notification@tradingtwins.com" +* **Interval:** Every 5 minutes (or 1 min). + +**Action:** `HTTP Request` Node +* **Method:** POST +* **URL:** `http://lead-engine:8000/api/ingest` (Internal Docker Network) +* **Body:** JSON + ```json + { + "source": "tradingtwins", + "subject": "New Lead: Musterfirma GmbH", + "raw_body": "..." + } + ``` + +## 3. Python Lead Engine Changes + +To support this, the current Streamlit-only application needs a **FastAPI backend** (or similar) running alongside or instead of the Streamlit loop for API access. + +### Requirements: +1. **Split `app.py`:** Separate UI (Streamlit) from Logic/API. +2. **New `api.py`:** FastAPI instance exposing `POST /api/ingest`. +3. **Refactor `enrich.py`:** Ensure `run_sync` can be called programmatically for a specific lead ID. + +### Proposed File Structure: +``` +lead-engine/ +├── api.py # FastAPI (New) +├── ui.py # Streamlit (Renamed from app.py) +├── core/ +│ ├── db.py # Shared DB access +│ ├── enrich.py # Sync Logic +│ └── parser.py # Parsing Logic +├── Dockerfile # Needs to run both (e.g. via supervisord or entrypoint script) +``` + +## 4. Why this approach? +* **Reliability:** n8n is better at keeping IMAP connections alive than a custom Python loop. +* **Scalability:** We can easily add other lead sources (e.g., Typeform, LinkedIn) just by adding another n8n trigger pointing to the same Python API. +* **Focus:** Python code focuses purely on "What to do with the data", not "How to get the data". diff --git a/lead-engine/Dockerfile b/lead-engine/Dockerfile new file mode 100644 index 00000000..ed452e66 --- /dev/null +++ b/lead-engine/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.9-slim + +WORKDIR /app + +COPY . . + +RUN pip install streamlit pandas + +ENV PYTHONUNBUFFERED=1 +CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0"] diff --git a/lead-engine/TRADING_TWINS_DOCUMENTATION.md b/lead-engine/TRADING_TWINS_DOCUMENTATION.md new file mode 100644 index 00000000..ed8f4e6a --- /dev/null +++ b/lead-engine/TRADING_TWINS_DOCUMENTATION.md @@ -0,0 +1,105 @@ +# 🚀 TradingTwins Lead Engine - Documentation + +## 1. Projektübersicht +Die **TradingTwins Lead Engine** ist ein spezialisiertes Automatisierungs-Tool zur Verarbeitung eingehender B2B-Leads (speziell von der Plattform "tradingtwins"). + +**Ziel:** Minimierung der "Time-to-Response" und Maximierung der Lead-Qualität durch automatische Anreicherung und Bewertung. + +Das Tool fungiert als **Middleware** zwischen dem E-Mail-Eingang und dem zentralen Intelligence-Hub (**Company Explorer**). + +--- + +## 2. Architektur & Workflow + +### System-Komponenten +1. **Lead Engine (Python/Streamlit):** + * Hält den State der Leads (Neu, Synced, Drafted). + * Bietet ein Dashboard zur Überwachung. + * Führt die Logik aus. +2. **Company Explorer (CE):** + * Single Source of Truth für Firmendaten. + * Führt die eigentliche Web-Recherche (Discovery) und Bewertung (Scoring) durch. +3. **SQLite Datenbank (`data/leads.db`):** + * Lokaler Speicher für Lead-Status und Historie. + +### Der Prozess-Flow (Sync Logic) + +```mermaid +graph TD + A[E-Mail Eingang] -->|Parser| B(Lead Engine DB) + B --> C{Check: Existiert Firma in CE?} + + C -- JA --> D[Hole ID & Status] + C -- NEIN --> E[Erstelle Firma via API] + + E --> F[Trigger: Discovery & Enrichment] + D --> G[Update Lead-Status: 'Synced'] + F --> G + + G --> H[Dashboard: Ready for Response] +``` + +--- + +## 3. Installation & Deployment + +Das System läuft als Docker-Container. + +### Voraussetzungen +* Zugriff auf das `lead-engine-mvp` Git-Repository. +* Laufender **Company Explorer** im selben Netzwerk (oder erreichbar via IP). + +### Starten (Docker) + +```bash +# 1. Image bauen +docker build -t lead-engine . + +# 2. Container starten +# WICHTIG: Ports mappen & Netwerk beachten +docker run -d \ + -p 8501:8501 \ + --name lead-engine \ + -e CE_API_URL="http://192.168.178.6:8090/ce/api" \ + -e CE_API_USER="admin" \ + -e CE_API_PASS="gemini" \ + lead-engine +``` + +### Environment Variablen + +| Variable | Default | Beschreibung | +| :--- | :--- | :--- | +| `CE_API_URL` | `http://192.168.178.6:8090/ce/api` | Basis-URL zur Company Explorer API (via Nginx Proxy) | +| `CE_API_USER` | `admin` | Benutzer für Basic Auth | +| `CE_API_PASS` | `gemini` | Passwort für Basic Auth | +| `PYTHONUNBUFFERED` | `1` | Wichtig für Docker Logs | + +--- + +## 4. Technische Details + +### Datenmodell (SQLite) +Die lokale Datenbank `leads.db` speichert: +* `source_id`: ID aus der E-Mail (TradingTwins Lead-ID). +* `raw_body`: Der originale E-Mail-Text. +* `enrichment_data`: JSON-Blob mit Daten vom Company Explorer (Score, Vertical, IDs). +* `status`: `new` -> `synced` -> `drafted` -> `sent`. + +### API Integration (Company Explorer) +Das Tool nutzt folgende Endpunkte des CE: + +1. **Suche:** `GET /companies?search={Name}` + * Prüft auf Duplikate. +2. **Erstellung:** `POST /companies` + * Payload: `{"name": "...", "city": "..."}` +3. **Discovery:** `POST /enrich/discover` + * Triggert den Crawler, um Webseite und Branche zu finden. + +--- + +## 5. Roadmap / Offene Punkte + +* **[ ] IMAP Live-Anbindung:** Ersetzen des aktuellen Mock-Ingests durch echten IMAP-Listener (Skript `check_mail.py` existiert bereits als Vorlage). +* **[ ] Response Generation:** Abruf eines "Sales Pitches" vom Company Explorer basierend auf den angereicherten Daten (Pain Points, Vertical). +* **[ ] CRM Push:** Optionale Weiterleitung an SuperOffice/Odoo nach erfolgreicher Qualifizierung. diff --git a/lead-engine/app.py b/lead-engine/app.py new file mode 100644 index 00000000..2dbcf780 --- /dev/null +++ b/lead-engine/app.py @@ -0,0 +1,75 @@ +import streamlit as st +import pandas as pd +from db import get_leads, init_db +import json +from enrich import run_sync # Import our sync function + +st.set_page_config(page_title="TradingTwins Lead Engine", layout="wide") + +st.title("🚀 Lead Engine: TradingTwins") + +# Sidebar Actions +st.sidebar.header("Actions") + +if st.sidebar.button("1. Ingest Emails (Mock)"): + from ingest import ingest_mock_leads + init_db() + count = ingest_mock_leads() + st.sidebar.success(f"Ingested {count} new leads.") + st.rerun() + +if st.sidebar.button("2. Sync to Company Explorer"): + with st.spinner("Syncing with Company Explorer API..."): + # Capture output for debugging + try: + # We redirect stdout to capture prints + import io + from contextlib import redirect_stdout + f = io.StringIO() + with redirect_stdout(f): + run_sync() + output = f.getvalue() + + st.success("Sync finished!") + with st.expander("See Process Log", expanded=True): + st.code(output) + + except Exception as e: + st.error(f"Sync Failed: {e}") + +# Main View +leads = get_leads() +df = pd.DataFrame(leads) + +if not df.empty: + col1, col2, col3 = st.columns(3) + col1.metric("Total Leads", len(df)) + col2.metric("New / Unsynced", len(df[df['status'] == 'new'])) + col3.metric("Synced to CE", len(df[df['status'] == 'synced'])) + + st.subheader("Lead Pipeline") + + for index, row in df.iterrows(): + with st.expander(f"{row['company_name']} ({row['status']})"): + c1, c2 = st.columns(2) + c1.write(f"**Contact:** {row['contact_name']}") + c1.write(f"**Email:** {row['email']}") + c1.text(row['raw_body'][:200] + "...") + + enrichment = json.loads(row['enrichment_data']) if row['enrichment_data'] else {} + + if enrichment: + c2.write("--- Integration Status ---") + if enrichment.get('ce_id'): + c2.success(f"✅ Linked to Company Explorer (ID: {enrichment['ce_id']})") + c2.write(f"**CE Status:** {enrichment.get('ce_status')}") + else: + c2.warning("⚠️ Not yet synced or failed") + + c2.info(f"Log: {enrichment.get('message')}") + + if enrichment.get('ce_data'): + c2.json(enrichment['ce_data']) + +else: + st.info("No leads found. Click 'Ingest Emails' in the sidebar.") diff --git a/lead-engine/db.py b/lead-engine/db.py new file mode 100644 index 00000000..f994bd63 --- /dev/null +++ b/lead-engine/db.py @@ -0,0 +1,87 @@ +import sqlite3 +import json +import os +from datetime import datetime + +# Robust path handling: +# 1. Get the directory where this script (db.py) lives +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +# 2. Define data directory and db path relative to it +DATA_DIR = os.path.join(BASE_DIR, 'data') +DB_PATH = os.path.join(DATA_DIR, 'leads.db') + +def init_db(): + # Ensure data directory exists + if not os.path.exists(DATA_DIR): + os.makedirs(DATA_DIR) + + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + c.execute(''' + CREATE TABLE IF NOT EXISTS leads ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_id TEXT UNIQUE, + received_at TIMESTAMP, + company_name TEXT, + contact_name TEXT, + email TEXT, + phone TEXT, + raw_body TEXT, + enrichment_data TEXT, + status TEXT DEFAULT 'new', + response_draft TEXT, + sent_at TIMESTAMP + ) + ''') + conn.commit() + conn.close() + +def insert_lead(lead_data): + # Ensure DB exists before inserting (if ingest runs before init) + if not os.path.exists(DB_PATH): + init_db() + + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + try: + c.execute(''' + INSERT INTO leads (source_id, received_at, company_name, contact_name, email, phone, raw_body, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + lead_data.get('id'), + datetime.now(), + lead_data.get('company'), + lead_data.get('contact'), + lead_data.get('email'), + lead_data.get('phone'), + lead_data.get('raw_body'), + 'new' + )) + conn.commit() + return True + except sqlite3.IntegrityError: + return False + finally: + conn.close() + +def get_leads(): + if not os.path.exists(DB_PATH): + init_db() + + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + c = conn.cursor() + c.execute('SELECT * FROM leads ORDER BY received_at DESC') + rows = c.fetchall() + conn.close() + return [dict(row) for row in rows] + +def update_lead_status(lead_id, status, response_draft=None): + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + if response_draft: + c.execute('UPDATE leads SET status = ?, response_draft = ? WHERE id = ?', (status, response_draft, lead_id)) + else: + c.execute('UPDATE leads SET status = ? WHERE id = ?', (status, lead_id)) + conn.commit() + conn.close() diff --git a/lead-engine/enrich.py b/lead-engine/enrich.py new file mode 100644 index 00000000..fff920a0 --- /dev/null +++ b/lead-engine/enrich.py @@ -0,0 +1,235 @@ +import json +import requests +import os +import time +from requests.auth import HTTPBasicAuth +from db import update_lead_status, get_leads, DB_PATH +import sqlite3 + +# --- Configuration --- +CE_API_URL = os.getenv("CE_API_URL", "http://192.168.178.6:8090/ce/api") +CE_API_USER = os.getenv("CE_API_USER", "admin") +CE_API_PASS = os.getenv("CE_API_PASS", "gemini") + +def get_auth(): + return HTTPBasicAuth(CE_API_USER, CE_API_PASS) + +# 1. CORE LOGIC: Interaction with Company Explorer + +def find_company(name): + """Prüft, ob Firma im CE existiert.""" + try: + res = requests.get( + f"{CE_API_URL}/companies", + params={"search": name, "limit": 1}, + auth=get_auth(), + timeout=5 + ) + if res.status_code == 200: + data = res.json() + if data.get("items"): + return data["items"][0] # Return first match + except Exception as e: + print(f"❌ CE API Error (Find '{name}'): {e}") + return None + +def create_company(lead_data): + """Legt Firma im CE an.""" + payload = { + "name": lead_data['company_name'], + "city": lead_data.get('city', ''), + "country": "DE", + "website": None + } + try: + res = requests.post( + f"{CE_API_URL}/companies", + json=payload, + auth=get_auth(), + timeout=5 + ) + if res.status_code == 200: + return res.json() + else: + print(f"❌ Create Failed: {res.status_code} {res.text}") + except Exception as e: + print(f"❌ CE API Error (Create '{lead_data['company_name']}'): {e}") + return None + +def trigger_discovery(company_id): + """Startet die automatische Webseiten-Suche im CE.""" + try: + res = requests.post( + f"{CE_API_URL}/enrich/discover", + json={"company_id": company_id}, + auth=get_auth(), + timeout=5 + ) + return res.status_code == 200 + except Exception as e: + print(f"❌ Trigger Discovery Failed (ID {company_id}): {e}") + return False + +def get_ce_status(company_id): + """Holt den aktuellen Status einer Firma aus dem CE.""" + try: + res = requests.get( + f"{CE_API_URL}/companies/{company_id}", + auth=get_auth(), + timeout=5 + ) + if res.status_code == 200: + return res.json() + except Exception as e: + print(f"❌ Get Status Failed (ID {company_id}): {e}") + return None + +def get_pitch(company_id): + """ + Versucht, einen generierten Pitch abzurufen. + (Hier simulieren wir es erst mal, oder nutzen ein Feld aus der Company, falls CE das schon bietet) + """ + # TODO: Implement real endpoint once CE has it (e.g. /companies/{id}/analysis/pitch) + # For now, we fetch the company and look for 'industry_ai' or similar fields to construct a simple pitch locally + company = get_ce_status(company_id) + if company and company.get('industry_ai'): + industry = company['industry_ai'] + # Simple Template based on industry + return f"Wir haben gesehen, dass Sie im Bereich {industry} tätig sind. Für {industry} haben wir spezielle Roboter-Lösungen..." + return None + +# 2. ORCHESTRATION: The Sync Process + +def process_new_lead(lead): + """Phase 1: Identifikation & Anlage""" + print(f"\n⚙️ [Phase 1] Syncing Lead: {lead['company_name']} ({lead['id']})") + + ce_company = find_company(lead['company_name']) + + enrichment_info = { + "ce_status": "unknown", + "ce_id": None, + "message": "", + "last_check": time.time() + } + + if ce_company: + print(f"✅ Company exists in CE (ID: {ce_company['id']})") + enrichment_info["ce_status"] = "linked" # Linked but maybe not fully enriched/fetched + enrichment_info["ce_id"] = ce_company['id'] + enrichment_info["ce_data"] = ce_company + enrichment_info["message"] = f"Matched existing account (Status: {ce_company.get('status')})." + + # If existing company is not enriched yet, trigger discovery too? + if ce_company.get('status') == 'NEW': + trigger_discovery(ce_company['id']) + enrichment_info["message"] += " Triggered Discovery for existing raw account." + + else: + print(f"✨ New Company. Creating in CE...") + city = "" + # Simple extraction + if lead['raw_body'] and "Stadt:" in lead['raw_body']: + try: + for line in lead['raw_body'].split('\n'): + if "Stadt:" in line: + city = line.split("Stadt:")[1].strip() + break + except: pass + + new_company = create_company({ + "company_name": lead['company_name'], + "city": city + }) + + if new_company: + print(f"✅ Created (ID: {new_company['id']}).") + enrichment_info["ce_status"] = "created" + enrichment_info["ce_id"] = new_company['id'] + + if trigger_discovery(new_company['id']): + enrichment_info["message"] = "Created & Discovery started." + print("🚀 Discovery queued.") + else: + enrichment_info["message"] = "Created, but Discovery trigger failed." + else: + enrichment_info["message"] = "Failed to create in CE." + + # Save State + update_lead_enrichment(lead['id'], enrichment_info, status='synced') + +def process_synced_lead(lead): + """Phase 2: Closing the Loop (Abrufen der Ergebnisse)""" + enrichment = json.loads(lead['enrichment_data']) + ce_id = enrichment.get('ce_id') + + if not ce_id: + return # Should not happen if status is synced + + print(f"\n🔄 [Phase 2] Checking Enrichment for: {lead['company_name']} (CE ID: {ce_id})") + + company = get_ce_status(ce_id) + if not company: + return + + # Check if CE is done + ce_status = company.get('status') # NEW, DISCOVERING, ENRICHED... + + if ce_status == 'ENRICHED': + print(f"✅ Analysis Complete! Fetching Pitch...") + + # Get Pitch (Mocked or Real) + pitch = get_pitch(ce_id) + + if pitch: + enrichment['pitch'] = pitch + enrichment['ce_data'] = company # Update with latest data + enrichment['message'] = "Enrichment complete. Pitch ready." + + # Save final state + update_lead_enrichment(lead['id'], enrichment, status='drafted') # 'drafted' means ready for human review + print(f"🎉 Lead is ready for response!") + else: + print("⚠️ Enriched, but no pitch generated yet.") + else: + print(f"⏳ Still processing in CE (Status: {ce_status})...") + +def update_lead_enrichment(lead_id, data, status=None): + conn = sqlite3.connect(DB_PATH) + c = conn.cursor() + if status: + c.execute('UPDATE leads SET enrichment_data = ?, status = ? WHERE id = ?', + (json.dumps(data), status, lead_id)) + else: + c.execute('UPDATE leads SET enrichment_data = ? WHERE id = ?', + (json.dumps(data), lead_id)) + conn.commit() + conn.close() + +def run_sync(): + leads = get_leads() + print(f"Scanning {len(leads)} leads...") + + count = 0 + for lead in leads: + status = lead['status'] + + # Fallback for old/mock leads or explicit 'new' status + is_mock = False + if lead['enrichment_data']: + try: + enrichment = json.loads(lead['enrichment_data']) + is_mock = enrichment.get('recommendation') == 'Manual Review (Mock Data)' + except: pass + + if status == 'new' or not status or is_mock: + process_new_lead(lead) + count += 1 + elif status == 'synced': + process_synced_lead(lead) + count += 1 + + print(f"Sync cycle finished. Processed {count} leads.") + +if __name__ == "__main__": + run_sync() diff --git a/lead-engine/ingest.py b/lead-engine/ingest.py new file mode 100644 index 00000000..49f09a12 --- /dev/null +++ b/lead-engine/ingest.py @@ -0,0 +1,75 @@ +import re +from db import insert_lead + +def parse_tradingtwins_email(body): + data = {} + + # Simple regex extraction based on the email format + patterns = { + 'id': r'Lead-ID:\s*(\d+)', + 'company': r'Firma:\s*(.+)', + 'contact_first': r'Vorname:\s*(.+)', + 'contact_last': r'Nachname:\s*(.+)', + 'email': r'E-Mail:\s*([^\s<]+)', + 'phone': r'Rufnummer:\s*([^\n]+)', + 'area': r'Reinigungs-Flche:\s*([^\n]+)', + 'purpose': r'Einsatzzweck:\s*([^\n]+)' + } + + for key, pattern in patterns.items(): + match = re.search(pattern, body) + if match: + data[key] = match.group(1).strip() + + # Combine names + if 'contact_first' in data and 'contact_last' in data: + data['contact'] = f"{data['contact_first']} {data['contact_last']}" + + data['raw_body'] = body + return data + +def ingest_mock_leads(): + # Mock data from the session context + leads = [ + { + 'id': '2397256', + 'company': 'pronorm Einbauküchen GmbH', + 'contact': 'Jakob Funk', + 'email': 'jakob.funk@pronorm.de', + 'phone': '+49 5733 979175', + 'raw_body': """ + Lead-ID: 2397256 + Firma: pronorm Einbauküchen GmbH + Vorname: Jakob + Nachname: Funk + Reinigungs-Flche: 1.001 - 10.000 qm + Einsatzzweck: Reinigung von Böden + """ + }, + { + 'id': '2414364', + 'company': 'Quad & Rollershop Schwabmünchen GmbH', + 'contact': 'Manfred Bihler', + 'email': 'Rollershop.Schwabmuenchen@web.de', + 'phone': '+49 8232 905246', + 'raw_body': """ + Lead-ID: 2414364 + Firma: Quad & Rollershop Schwabmünchen GmbH + Vorname: Manfred + Nachname: Bihler + Reinigungs-Flche: 301 - 1.000 qm + Einsatzzweck: Reinigung von Böden + """ + } + ] + + count = 0 + for lead in leads: + if insert_lead(lead): + count += 1 + return count + +if __name__ == "__main__": + from db import init_db + init_db() + print(f"Ingested {ingest_mock_leads()} new leads.") diff --git a/lead-engine/test_create_company.py b/lead-engine/test_create_company.py new file mode 100644 index 00000000..05eaf158 --- /dev/null +++ b/lead-engine/test_create_company.py @@ -0,0 +1,69 @@ +import json +import urllib.request +import base64 +import os + +# Config +API_URL = "http://192.168.178.6:8090/ce/api" +USER = "admin" +PASS = "gemini" + +def get_auth_header(): + auth_str = f"{USER}:{PASS}" + b64_auth = base64.b64encode(auth_str.encode()).decode() + return {"Authorization": f"Basic {b64_auth}", "Content-Type": "application/json"} + +def test_create(): + print(f"Testing Create against {API_URL}...") + + # 1. Random Name to ensure new creation + import random + company_name = f"Test Company {random.randint(1000,9999)}" + + payload = { + "name": company_name, + "city": "Berlin", + "country": "DE" + } + + req = urllib.request.Request( + f"{API_URL}/companies", + data=json.dumps(payload).encode('utf-8'), + headers=get_auth_header(), + method="POST" + ) + + try: + with urllib.request.urlopen(req) as response: + if response.status == 200: + data = json.loads(response.read().decode()) + print(f"✅ SUCCESS: Created Company '{company_name}'") + print(f" ID: {data.get('id')}") + print(f" Name: {data.get('name')}") + return data.get('id') + else: + print(f"❌ FAILED: Status {response.status}") + print(response.read().decode()) + except Exception as e: + print(f"❌ EXCEPTION: {e}") + return None + +def test_find(company_id): + if not company_id: return + + print(f"\nVerifying creation via GET /companies/{company_id}...") + req = urllib.request.Request( + f"{API_URL}/companies/{company_id}", + headers=get_auth_header(), + method="GET" + ) + try: + with urllib.request.urlopen(req) as response: + data = json.loads(response.read().decode()) + print(f"✅ VERIFIED: Found company in DB.") + except Exception as e: + print(f"❌ VERIFICATION FAILED: {e}") + +if __name__ == "__main__": + cid = test_create() + test_find(cid)