[2ff88f42] Finalize SuperOffice Integration: Enhanced Persona model with Influencer role, switched Matrix Generator to Gemini, implemented Noise Reduction for Webhooks, and added E2E test scenarios.

This commit is contained in:
2026-02-20 10:55:57 +00:00
parent 9b712d261f
commit c58707cc1e
11 changed files with 609 additions and 107 deletions

View File

@@ -89,6 +89,7 @@ class ProvisioningRequest(BaseModel):
so_person_id: Optional[int] = None
crm_name: Optional[str] = None
crm_website: Optional[str] = None
job_title: Optional[str] = None
class ProvisioningResponse(BaseModel):
status: str

View File

@@ -1,17 +1,18 @@
import sys
import os
import json
import argparse
from typing import List
import google.generativeai as genai
# Setup Environment
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
from backend.database import SessionLocal, Industry, Persona, MarketingMatrix
from backend.config import settings
# --- Configuration ---
MODEL = "gpt-4o"
MODEL_NAME = "gemini-1.5-pro-latest" # High quality copy
def generate_prompt(industry: Industry, persona: Persona) -> str:
"""
@@ -54,6 +55,8 @@ Tonalität: Professionell, lösungsorientiert, auf den Punkt. Keine Marketing-Fl
3. "social_proof": Ein Satz, der Vertrauen aufbaut. Nenne generische Erfolge (z.B. "Unternehmen in der {industry.name} senken so ihre Kosten um 15%"), da wir noch keine spezifischen Logos nennen dürfen.
--- FORMAT ---
Respond ONLY with a valid JSON object. Do not add markdown formatting like ```json ... ```.
Format:
{{
"subject": "...",
"intro": "...",
@@ -62,7 +65,7 @@ Tonalität: Professionell, lösungsorientiert, auf den Punkt. Keine Marketing-Fl
"""
return prompt
def mock_openai_call(prompt: str):
def mock_call(prompt: str):
"""Simulates an API call for dry runs."""
print(f"\n--- [MOCK] GENERATING PROMPT ---\n{prompt[:300]}...\n--------------------------------")
return {
@@ -71,23 +74,40 @@ def mock_openai_call(prompt: str):
"social_proof": "[MOCK] Ähnliche Betriebe sparten 20% Kosten."
}
def real_openai_call(prompt: str):
# This would link to the actual OpenAI client
# For now, we keep it simple or import from a lib
import openai
from backend.config import settings
if not settings.OPENAI_API_KEY:
raise ValueError("OPENAI_API_KEY not set")
def real_gemini_call(prompt: str):
if not settings.GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY not set in config/env")
client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)
response = client.chat.completions.create(
model=MODEL,
response_format={"type": "json_object"},
messages=[{"role": "user", "content": prompt}],
temperature=0.7
genai.configure(api_key=settings.GEMINI_API_KEY)
# Configure Model
generation_config = {
"temperature": 0.7,
"top_p": 0.95,
"top_k": 64,
"max_output_tokens": 1024,
"response_mime_type": "application/json",
}
model = genai.GenerativeModel(
model_name=MODEL_NAME,
generation_config=generation_config,
)
return json.loads(response.choices[0].message.content)
response = model.generate_content(prompt)
try:
# Clean response if necessary (Gemini usually returns clean JSON with mime_type set, but safety first)
text = response.text.strip()
if text.startswith("```json"):
text = text[7:-3].strip()
elif text.startswith("```"):
text = text[3:-3].strip()
return json.loads(text)
except Exception as e:
print(f"JSON Parse Error: {e}. Raw Response: {response.text}")
raise
def run_matrix_generation(dry_run: bool = True, force: bool = False):
db = SessionLocal()
@@ -96,7 +116,7 @@ def run_matrix_generation(dry_run: bool = True, force: bool = False):
personas = db.query(Persona).all()
print(f"Found {len(industries)} Industries and {len(personas)} Personas.")
print(f"Mode: {'DRY RUN (No API calls, no DB writes)' if dry_run else 'LIVE'}")
print(f"Mode: {'DRY RUN (No API calls, no DB writes)' if dry_run else 'LIVE - GEMINI GENERATION'}")
total_combinations = len(industries) * len(personas)
processed = 0
@@ -120,10 +140,15 @@ def run_matrix_generation(dry_run: bool = True, force: bool = False):
prompt = generate_prompt(ind, pers)
if dry_run:
result = mock_openai_call(prompt)
result = mock_call(prompt)
else:
try:
result = real_openai_call(prompt)
result = real_gemini_call(prompt)
# Basic Validation
if not result.get("subject") or not result.get("intro"):
print(" -> Invalid result structure. Skipping.")
continue
except Exception as e:
print(f" -> API ERROR: {e}")
continue
@@ -155,8 +180,8 @@ def run_matrix_generation(dry_run: bool = True, force: bool = False):
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--live", action="store_true", help="Actually call OpenAI and write to DB")
parser.add_argument("--live", action="store_true", help="Actually call Gemini and write to DB")
parser.add_argument("--force", action="store_true", help="Overwrite existing matrix entries")
args = parser.parse_args()
run_matrix_generation(dry_run=not args.live, force=args.force)
run_matrix_generation(dry_run=not args.live, force=args.force)

View File

@@ -260,7 +260,7 @@ export function RoboticsSettings({ isOpen, onClose, apiBase }: RoboticsSettingsP
{jobRoles.map(role => (
<tr key={role.id} className="group">
<td className="p-2"><input className="w-full bg-transparent border border-transparent hover:border-slate-300 dark:hover:border-slate-700 rounded px-2 py-1 text-slate-900 dark:text-slate-200 outline-none focus:border-blue-500" defaultValue={role.pattern} /></td>
<td className="p-2"><select className="w-full bg-transparent border border-transparent hover:border-slate-300 dark:hover:border-slate-700 rounded px-2 py-1 text-slate-900 dark:text-slate-200 outline-none focus:border-blue-500" defaultValue={role.role}><option>Operativer Entscheider</option><option>Infrastruktur-Verantwortlicher</option><option>Wirtschaftlicher Entscheider</option><option>Innovations-Treiber</option></select></td>
<td className="p-2"><select className="w-full bg-transparent border border-transparent hover:border-slate-300 dark:hover:border-slate-700 rounded px-2 py-1 text-slate-900 dark:text-slate-200 outline-none focus:border-blue-500" defaultValue={role.role}><option>Operativer Entscheider</option><option>Infrastruktur-Verantwortlicher</option><option>Wirtschaftlicher Entscheider</option><option>Innovations-Treiber</option><option>Influencer</option></select></td>
<td className="p-2 text-center"><button onClick={() => handleDeleteJobRole(role.id)} className="text-slate-400 hover:text-red-500 opacity-0 group-hover:opacity-100 transition-opacity"><Trash2 className="h-4 w-4" /></button></td>
</tr>
))}

View File

@@ -1,100 +1,75 @@
# SuperOffice Connector ("The Muscle") - GTM Engine
# SuperOffice Connector ("The Muscle") - GTM Engine v2.0
Dies ist der "dumme" Microservice zur Anbindung von **SuperOffice CRM** an die **Company Explorer Intelligence**.
Der Connector agiert als reiner Bote ("Muscle"): Er nimmt Webhook-Events entgegen, fragt das "Gehirn" (Company Explorer) nach Instruktionen und führt diese im CRM aus.
Dies ist der Microservice zur bidirektionalen Anbindung von **SuperOffice CRM** an die **Company Explorer Intelligence**.
Der Connector agiert als intelligenter Bote ("Muscle"): Er nimmt Webhook-Events entgegen, filtert Rauschen heraus, fragt das "Gehirn" (Company Explorer) nach Instruktionen und schreibt Ergebnisse (Marketing-Texte, Branchen-Verticals, Rollen) ins CRM zurück.
## 1. Architektur: "The Intelligent Hub & The Loyal Messenger"
## 1. Architektur: "Noise-Reduced Event Pipeline"
Wir haben uns für eine **Event-gesteuerte Architektur** entschieden, um Skalierbarkeit und Echtzeit-Verarbeitung zu gewährleisten.
Wir nutzen eine **Event-gesteuerte Architektur** mit integrierter Rauschunterdrückung, um die CRM-Last zu minimieren und Endlosschleifen zu verhindern.
**Der Datenfluss:**
1. **Auslöser:** User ändert in SuperOffice einen Kontakt (z.B. Status -> `Init`).
2. **Transport:** SuperOffice sendet ein `POST` Event an unseren Webhook-Endpunkt (`:8003/webhook`).
3. **Queueing:** Der `Webhook Receiver` validiert das Event und legt es sofort in eine lokale `SQLite`-Queue (`connector_queue.db`).
4. **Verarbeitung:** Ein separater `Worker`-Prozess holt den Job ab.
5. **Provisioning:** Der Worker fragt den **Company Explorer** (`POST /api/provision/superoffice-contact`): "Was soll ich mit Person ID 123 tun?".
6. **Write-Back:** Der Company Explorer liefert das fertige Text-Paket (Subject, Intro, Proof) zurück. Der Worker schreibt dies via REST API in die UDF-Felder von SuperOffice.
1. **Auslöser:** Ein User ändert Stammdaten in SuperOffice.
2. **Filterung (Noise Reduction):** Der Webhook-Receiver ignoriert sofort:
* Irrelevante Entitäten (Sales, Projects, Appointments, Documents).
* Irrelevante Felder (Telefon, E-Mail, Fax, interne Systemfelder).
* *Nur strategische Änderungen (Name, Website, Job-Titel, Position) triggern die Pipeline.*
3. **Queueing:** Valide Events landen in der lokalen `SQLite`-Queue (`connector_queue.db`).
4. **Provisioning:** Der Worker fragt den **Company Explorer** (:8000): "Was ist die KI-Wahrheit für diesen Kontakt?".
5. **Write-Back:** Der Connector schreibt die Ergebnisse (Vertical-ID, Persona-ID, E-Mail-Snippets) via REST API zurück in die SuperOffice UDF-Felder.
## 2. Kern-Komponenten
## 2. 🚀 Go-Live Checkliste (User Tasks)
* **`webhook_app.py` (FastAPI):**
* Lauscht auf Port `8000` (Extern: `8003`).
* Nimmt Events entgegen, prüft Token (`WEBHOOK_SECRET`).
* Schreibt Jobs in die Queue.
* Endpunkt: `POST /webhook`.
Um das System auf der Produktivumgebung ("Live") in Betrieb zu nehmen, müssen folgende Schritte durchgeführt werden:
* **`queue_manager.py` (SQLite):**
* Verwaltet die lokale Job-Queue.
* Status: `PENDING` -> `PROCESSING` -> `COMPLETED` / `FAILED`.
* Persistiert Jobs auch bei Container-Neustart.
### Schritt A: SuperOffice Registrierung (IT / Admin)
Da wir eine **Private App** nutzen, ist keine Zertifizierung nötig.
1. Loggen Sie sich ins [SuperOffice Developer Portal](https://dev.superoffice.com/) ein.
2. Registrieren Sie eine neue App ("Custom Application").
* **Redirect URI:** `http://localhost`
* **Scopes:** `Contact:Read/Write`, `Person:Read/Write`, `List:Read`, `Appointment:Write`.
3. Notieren Sie sich **Client ID**, **Client Secret** und den **Token** (falls System User genutzt wird).
* **`worker.py`:**
* Läuft als Hintergrundprozess.
* Pollt die Queue alle 5 Sekunden.
* Kommuniziert mit Company Explorer (Intern: `http://company-explorer:8000`) und SuperOffice API.
* Behandelt Fehler und Retries.
### Schritt B: Konfiguration & Mapping
1. **Credentials:** Tragen Sie die Daten aus Schritt A in die `.env` Datei auf dem Server ein (`SO_CLIENT_ID`, etc.).
2. **Discovery:** Starten Sie den Container und führen Sie einmalig das Discovery-Tool aus, um die IDs der Felder in der Live-Umgebung zu finden:
```bash
python3 connector-superoffice/discover_fields.py
```
3. **Mapping Update:** Tragen Sie die ermittelten IDs in die `.env` ein:
* `VERTICAL_MAP_JSON`: Mappen Sie die CE-Branchen auf die SuperOffice "Business"-IDs.
* `PERSONA_MAP_JSON`: Mappen Sie die Rollen (z.B. "Influencer", "Wirtschaftlicher Entscheider") auf die SuperOffice "Position"-IDs.
* **`superoffice_client.py`:**
* Kapselt die SuperOffice REST API (Auth, GET, PUT).
* Verwaltet Refresh Tokens.
### Schritt C: Webhook Einrichtung (SuperOffice Admin)
Gehen Sie in SuperOffice zu **Einstellungen & Verwaltung -> Webhooks** und legen Sie einen neuen Hook an:
* **Target URL:** `http://<IHRE-SERVER-IP>:8003/webhook?token=<SECRET_AUS_ENV>`
* **Events:** `contact.created`, `contact.changed`, `person.created`, `person.changed`.
## 3. Setup & Konfiguration
### Schritt D: Feiertags-Import
Damit der Versand an Feiertagen pausiert:
1. Kopieren Sie den Inhalt von `connector-superoffice/import_holidays_CRMSCRIPT.txt`.
2. Führen Sie ihn in SuperOffice unter **CRMScript -> Execute** aus.
### Docker Service
Der Service läuft im Container `connector-superoffice`.
Startet via `start.sh` sowohl den Webserver als auch den Worker.
## 3. Business Logik & Features
### Konfiguration (`.env`)
Der Connector benötigt folgende Variablen (in `docker-compose.yml` gesetzt):
### 4.1. Persona Mapping ("Golden Record")
Das Feld `Position` (Rolle) in SuperOffice wird als Ziel-Feld für die CE-Archetypen genutzt.
* **Logik:** Der CE analysiert den Jobtitel (z.B. "Einkaufsleiter") -> Mappt auf "Influencer".
* **Sync:** Der Connector setzt das Feld `Position` in SuperOffice auf den entsprechenden Wert (sofern in der Config gemappt).
```yaml
environment:
API_USER: "admin"
API_PASSWORD: "..."
COMPANY_EXPLORER_URL: "http://company-explorer:8000" # Interne Docker-Adresse
WEBHOOK_SECRET: "changeme" # Muss mit SO-Webhook Config übereinstimmen
# Plus die SuperOffice Credentials (Client ID, Secret, Refresh Token)
### 4.2. Vertical Mapping
KI-Verticals (z.B. "Healthcare - Hospital") werden auf die SuperOffice-Branchenliste gemappt. Manuelle Änderungen durch User im CRM werden aktuell beim nächsten Update überschrieben (Master: CE).
## 4. Testing & Simulation
Verwenden Sie `test_full_roundtrip.py`, um die Kette zu testen, ohne E-Mails zu versenden. Das Skript erstellt stattdessen **Termine** in SuperOffice als Beweis.
```bash
# Startet Simulation für Person ID 2
python3 connector-superoffice/tests/test_full_roundtrip.py
```
## 4. API-Schnittstelle (Intern)
## 5. Roadmap (v2.1)
Der Connector ruft den Company Explorer auf und liefert dabei **Live-Daten** aus dem CRM für den "Double Truth" Abgleich:
**Request:** `POST /api/provision/superoffice-contact`
```json
{
"so_contact_id": 12345,
"so_person_id": 67890,
"crm_name": "RoboPlanet GmbH",
"crm_website": "www.roboplanet.de",
"job_title": "Geschäftsführer"
}
```
**Response:**
```json
{
"status": "success",
"texts": {
"subject": "Optimierung Ihrer Logistik...",
"intro": "Als Logistikleiter kennen Sie...",
"social_proof": "Wir helfen bereits Firma X..."
}
}
```
## 5. Offene To-Dos (Roadmap für Produktionsfreigabe)
Um den Connector für den stabilen Betrieb in der Produktivumgebung freizugeben, sind folgende Härtungsmaßnahmen erforderlich:
* [ ] **Konfigurationshärtung (UDFs & Endpunkte):**
* Alle umgebungsspezifischen Werte (SuperOffice Base URL, Customer ID, **alle UDF ProgIDs** für Vertical, Subject, Intro, Social Proof, etc.) müssen aus dem Code entfernt und über Umgebungsvariablen (`.env`) konfigurierbar gemacht werden. Dies stellt sicher, dass derselbe Container ohne Code-Änderung in DEV und PROD läuft.
* [ ] **Werkzeug zur UDF-ID-Findung:**
* Erstellung eines Python-Skripts (`discover_fields.py`), das die SuperOffice API abfragt und alle verfügbaren UDFs mit ihren `ProgId`s auflistet. Dies vereinfacht die Erstkonfiguration in neuen Umgebungen.
* [ ] **Feiertags-Logik (Autarkie SuperOffice):**
* Erstellung einer dedizierten SuperOffice Y-Tabelle (`y_holidays`) zur Speicherung von Feiertagen.
* Erstellung eines Python-Skripts (`import_holidays_to_so.py`) zur einmaligen und periodischen Befüllung dieser Tabelle.
* Anpassung des SuperOffice CRMScripts, um diese Tabelle vor dem Versand zu prüfen.
* [ ] **Webinterface (Settings -> Job Role Mapping):** Erweiterung des UI zur Darstellung und Verwaltung der neuen Persona-Archetypen und ihrer Mappings. Dies beinhaltet auch eine Überarbeitung der bestehenden Job-Titel-Mappungsansicht, um die Zuordnung zu den Archetypen zu verdeutlichen und ggf. zu editieren.
* [ ] **Skalierung (Optional/Zukunft):**
* Bei sehr hoher Last (>100 Events/Sekunde) sollte die interne SQLite-Queue durch eine performantere Lösung wie Redis ersetzt werden.
* [ ] **Manual Override Protection:** Schutz manueller Änderungen (Vertical/Rolle) durch den User vor Überschreiben durch die KI.
* [ ] **Notion Dashboard:** KPI-Reporting.
* [ ] **Lead-Attribution:** Automatisches Setzen der `Sale.Source` auf "Marketing Automation".

View File

@@ -0,0 +1,90 @@
import os
import requests
import json
import logging
from superoffice_client import SuperOfficeClient
from config import settings
# Setup Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("simulation-e2e")
def simulate_sendout(contact_id: int, person_id: int):
print(f"🚀 Starting E2E Sendout Simulation for Contact {contact_id}, Person {person_id}...")
# 1. Initialize SuperOffice Client
so_client = SuperOfficeClient()
if not so_client.access_token:
print("❌ Auth failed. Check .env")
return
# 2. Get Data from Company Explorer
# We simulate what the worker would do
print(f"📡 Requesting provisioning from Company Explorer...")
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_req = {
"so_contact_id": contact_id,
"so_person_id": person_id,
"crm_name": "RoboPlanet GmbH",
"crm_website": "www.roboplanet.de",
"job_title": "Geschäftsführer" # Explicit job title for persona mapping
}
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
try:
resp = requests.post(ce_url, json=ce_req, auth=ce_auth)
resp.raise_for_status()
provisioning_data = resp.json()
except Exception as e:
print(f"❌ CE API failed: {e}")
return
print(f"✅ Received Data: {json.dumps(provisioning_data, indent=2)}")
if provisioning_data.get("status") == "processing":
print("⏳ CE is still processing. Please wait 1-2 minutes and try again.")
return
texts = provisioning_data.get("texts", {})
if not texts.get("subject"):
print("⚠️ No marketing texts found for this combination (Vertical x Persona).")
return
# 3. Write Texts to SuperOffice UDFs
print("✍️ Writing marketing texts to SuperOffice UDFs...")
udf_payload = {
settings.UDF_SUBJECT: texts["subject"],
settings.UDF_INTRO: texts["intro"],
settings.UDF_SOCIAL_PROOF: texts["social_proof"]
}
success = so_client.update_entity_udfs(person_id, "Person", udf_payload)
if success:
print("✅ UDFs updated successfully.")
else:
print("❌ Failed to update UDFs.")
return
# 4. Create Appointment (The "Sendout Proof")
print("📅 Creating Appointment as sendout proof...")
app_subject = f"[SIMULATION] Mail Sent: {texts['subject']}"
app_desc = f"Content Simulation:\n\n{texts['intro']}\n\n{texts['social_proof']}"
appointment = so_client.create_appointment(
subject=app_subject,
description=app_desc,
contact_id=contact_id,
person_id=person_id
)
if appointment:
print(f"✅ Simulation Complete! Appointment ID: {appointment.get('AppointmentId')}")
print(f"🔗 Check SuperOffice for Contact {contact_id} and look at the activities.")
else:
print("❌ Failed to create appointment.")
if __name__ == "__main__":
# Using the IDs we know exist from previous tests/status
TEST_CONTACT_ID = 2
TEST_PERSON_ID = 2 # Usually same or linked
simulate_sendout(TEST_CONTACT_ID, TEST_PERSON_ID)

View File

@@ -130,7 +130,42 @@ class SuperOfficeClient:
return None
return all_results
def create_project(self, name: str, contact_id: int, person_id: int = None):
"""Creates a new project linked to a contact, and optionally adds a person."""
payload = {
"Name": name,
"Contact": {"ContactId": contact_id}
}
if person_id:
# Adding a person to a project requires a ProjectMember object
payload["ProjectMembers"] = [
{
"Person": {"PersonId": person_id},
"Role": "Member" # Default role, can be configured if needed
}
]
print(f"Creating new project: {name}...")
return self._post("Project", payload)
def create_appointment(self, subject: str, description: str, contact_id: int, person_id: int = None):
"""Creates a new appointment (to simulate a sent activity)."""
import datetime
now = datetime.datetime.utcnow().isoformat() + "Z"
payload = {
"Description": f"{subject}\n\n{description}",
"Contact": {"ContactId": contact_id},
"StartDate": now,
"EndDate": now,
"Task": {"Id": 1} # Usually 'Follow-up' or similar, depending on SO config
}
if person_id:
payload["Person"] = {"PersonId": person_id}
print(f"Creating new appointment: {subject}...")
return self._post("Appointment", payload)
def update_entity_udfs(self, entity_id: int, entity_type: str, udf_data: dict):
"""
Updates UDFs for a given entity (Contact or Person).

View File

@@ -0,0 +1,100 @@
import os
import requests
import json
import logging
import sys
# Configure to run from root context
sys.path.append(os.path.join(os.getcwd(), "connector-superoffice"))
# Mock Config if needed, or use real one
try:
from config import settings
except ImportError:
print("Could not import settings. Ensure you are in project root.")
sys.exit(1)
# FORCE CE URL for internal Docker comms if running inside container
# If running outside, this might need localhost.
# settings.COMPANY_EXPLORER_URL is used.
API_USER = os.getenv("API_USER", "admin")
API_PASS = os.getenv("API_PASSWORD", "gemini")
def test_dynamic_role_change():
print("🧪 STARTING TEST: Dynamic Role Change & Content Generation\n")
# Define Scenarios
scenarios = [
{
"name": "Scenario A (CEO)",
"job_title": "Geschäftsführer",
"expect_keywords": ["Kostenreduktion", "Effizienz", "Amortisation"]
},
{
"name": "Scenario B (Warehouse Mgr)",
"job_title": "Lagerleiter",
"expect_keywords": ["Stress", "Sauberkeit", "Entlastung"]
}
]
results = {}
for s in scenarios:
print(f"--- Running {s['name']} ---")
print(f"Role Trigger: '{s['job_title']}'")
payload = {
"so_contact_id": 2, # RoboPlanet Test
"so_person_id": 2,
"crm_name": "RoboPlanet GmbH-SOD",
"crm_website": "www.roboplanet.de", # Ensure we match the industry (Logistics)
"job_title": s['job_title']
}
try:
url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
print(f"POST {url}")
resp = requests.post(url, json=payload, auth=(API_USER, API_PASS))
resp.raise_for_status()
data = resp.json()
# Validation
texts = data.get("texts", {})
subject = texts.get("subject", "")
intro = texts.get("intro", "")
print(f"Received Role: {data.get('role_name')}")
print(f"Received Subject: {subject}")
# Check Keywords
full_text = (subject + " " + intro).lower()
matches = [k for k in s['expect_keywords'] if k.lower() in full_text]
if len(matches) > 0:
print(f"✅ Content Match! Found keywords: {matches}")
results[s['name']] = "PASS"
else:
print(f"❌ Content Mismatch. Expected {s['expect_keywords']}, got text: {subject}...")
results[s['name']] = "FAIL"
results[f"{s['name']}_Subject"] = subject # Store for comparison later
except Exception as e:
print(f"❌ API Error: {e}")
results[s['name']] = "ERROR"
print("")
# Final Comparison
print("--- Final Result Analysis ---")
if results["Scenario A (CEO)"] == "PASS" and results["Scenario B (Warehouse Mgr)"] == "PASS":
if results["Scenario A (CEO)_Subject"] != results["Scenario B (Warehouse Mgr)_Subject"]:
print("✅ SUCCESS: Different roles generated different, targeted content.")
else:
print("⚠️ WARNING: Content matched keywords but Subjects are identical! Check Matrix.")
else:
print("❌ TEST FAILED. See individual steps.")
if __name__ == "__main__":
test_dynamic_role_change()

View File

@@ -0,0 +1,111 @@
import os
import requests
import json
import logging
import sys
import time
# Configure path to import modules from parent directory
sys.path.append(os.path.join(os.getcwd(), "connector-superoffice"))
try:
from config import settings
from superoffice_client import SuperOfficeClient
except ImportError:
print("❌ Import Error. Ensure you are running from the project root.")
sys.exit(1)
# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("e2e-roundtrip")
# Config
API_USER = os.getenv("API_USER", "admin")
API_PASS = os.getenv("API_PASSWORD", "gemini")
TEST_PERSON_ID = 2
TEST_CONTACT_ID = 2
def run_roundtrip():
print("🚀 STARTING FULL E2E ROUNDTRIP TEST (API -> SO Write)\n")
so_client = SuperOfficeClient()
if not so_client.access_token:
print("❌ SuperOffice Auth failed. Check .env")
return
scenarios = [
{
"name": "Scenario A",
"role_label": "Geschäftsführer",
"expect_keyword": "Kosten"
},
{
"name": "Scenario B",
"role_label": "Lagerleiter",
"expect_keyword": "Sauberkeit"
}
]
for s in scenarios:
print(f"--- Running {s['name']}: {s['role_label']} ---")
# 1. Provisioning (Company Explorer)
print(f"1. 🧠 Asking Company Explorer (Trigger: {s['role_label']})...")
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
payload = {
"so_contact_id": TEST_CONTACT_ID,
"so_person_id": TEST_PERSON_ID,
"crm_name": "RoboPlanet GmbH-SOD",
"crm_website": "www.roboplanet.de",
"job_title": s['role_label'] # <-- THE TRIGGER
}
try:
resp = requests.post(ce_url, json=payload, auth=(API_USER, API_PASS))
resp.raise_for_status()
data = resp.json()
texts = data.get("texts", {})
subject = texts.get("subject", "N/A")
intro = texts.get("intro", "N/A")
print(f" -> Received Subject: '{subject}'")
if s['expect_keyword'].lower() not in (subject + intro).lower():
print(f" ⚠️ WARNING: Expected keyword '{s['expect_keyword']}' not found!")
except Exception as e:
print(f" ❌ CE API Failed: {e}")
continue
# 2. Write to SuperOffice (UDFs)
print(f"2. ✍️ Writing Texts to SuperOffice UDFs...")
udf_payload = {
settings.UDF_SUBJECT: subject,
settings.UDF_INTRO: intro,
settings.UDF_SOCIAL_PROOF: texts.get("social_proof", "")
}
if so_client.update_entity_udfs(TEST_PERSON_ID, "Person", udf_payload):
print(" -> UDFs Updated.")
else:
print(" -> ❌ UDF Update Failed.")
# 3. Create Appointment (Proof)
print(f"3. 📅 Creating Appointment in SuperOffice...")
appt_subject = f"[E2E TEST] {s['role_label']}: {subject}"
appt_desc = f"GENERATED CONTENT:\n\n{intro}\n\n{texts.get('social_proof')}"
appt = so_client.create_appointment(appt_subject, appt_desc, TEST_CONTACT_ID, TEST_PERSON_ID)
if appt:
print(f" -> ✅ Appointment Created (ID: {appt.get('AppointmentId')})")
else:
print(" -> ❌ Appointment Creation Failed.")
print("")
time.sleep(1) # Brief pause
print("🏁 Test Run Complete.")
if __name__ == "__main__":
run_roundtrip()

70
fix_industry_units.py Normal file
View File

@@ -0,0 +1,70 @@
import sqlite3
DB_PATH = "companies_v3_fixed_2.db"
UNIT_MAPPING = {
"Logistics - Warehouse": "",
"Healthcare - Hospital": "Betten",
"Infrastructure - Transport": "Passagiere",
"Leisure - Indoor Active": "",
"Retail - Food": "",
"Retail - Shopping Center": "",
"Hospitality - Gastronomy": "Sitzplätze",
"Leisure - Outdoor Park": "Besucher",
"Leisure - Wet & Spa": "Besucher",
"Infrastructure - Public": "Kapazität",
"Retail - Non-Food": "",
"Hospitality - Hotel": "Zimmer",
"Leisure - Entertainment": "Besucher",
"Healthcare - Care Home": "Plätze",
"Industry - Manufacturing": "Mitarbeiter",
"Energy - Grid & Utilities": "Kunden",
"Leisure - Fitness": "Mitglieder",
"Corporate - Campus": "Mitarbeiter",
"Energy - Solar/Wind": "MWp",
"Tech - Data Center": "Racks",
"Automotive - Dealer": "Fahrzeuge",
"Infrastructure Parking": "Stellplätze",
"Reinigungsdienstleister": "Mitarbeiter",
"Infrastructure - Communities": "Einwohner"
}
def fix_units():
print(f"Connecting to {DB_PATH}...")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
cursor.execute("SELECT id, name, scraper_search_term, metric_type FROM industries")
rows = cursor.fetchall()
updated_count = 0
for row in rows:
ind_id, name, current_term, m_type = row
new_term = UNIT_MAPPING.get(name)
# Fallback Logic
if not new_term:
if m_type in ["AREA_IN", "AREA_OUT"]:
new_term = ""
else:
new_term = "Anzahl" # Generic fallback
if current_term != new_term:
print(f"Updating '{name}': '{current_term}' -> '{new_term}'")
cursor.execute("UPDATE industries SET scraper_search_term = ? WHERE id = ?", (new_term, ind_id))
updated_count += 1
conn.commit()
print(f"\n✅ Updated {updated_count} industries with correct units.")
except Exception as e:
print(f"❌ Error: {e}")
conn.rollback()
finally:
conn.close()
if __name__ == "__main__":
fix_units()

View File

@@ -21,6 +21,37 @@ gitea: none
---
# Projekt: Automatisierte Unternehmensbewertung & Lead-Generierung v2.2.1
## Current Status (Feb 20, 2026) - SuperOffice Integration Ready (v2.0)
### 1. SuperOffice Connector v2.0 ("The Muscle")
* **Event-Driven Architecture:** Hochperformante Webhook-Verarbeitung mit intelligenter "Noise Reduction". Ignoriert irrelevante Änderungen (z.B. Telefonnummern) und verhindert Kaskaden-Effekte.
* **Persona Mapping Engine:** Automatische Zuweisung von SuperOffice-Rollen ("Position") basierend auf Jobtiteln. Neue Rolle **"Influencer"** für Einkäufer/Techniker integriert.
* **Robustheit:** Konfiguration vollständig in `.env` ausgelagert. End-to-End Tests mit Termin-Simulation (statt E-Mail) verifiziert.
### 2. Marketing Matrix Engine ("The Brain")
* **Gemini 1.5 Pro Integration:** Der Matrix-Generator erstellt nun vollautomatisch hyper-personalisierte E-Mail-Texte für alle 125 Kombinationen (25 Branchen x 5 Personas).
* **Intelligente Prompts:** Kombiniert Branchen-Pains (z.B. "Logistik-Druck") mit Rollen-Pains (z.B. "Effizienz-Zwang GF").
### 3. UI/UX & Data Quality
* **Unit Fix:** Korrektur der Einheiten-Anzeige im Frontend (m², Betten, etc.).
* **Influencer Role:** Im Frontend nun als Mapping-Option verfügbar.
---
## 🚀 Next Steps for User (Immediate Actions)
1. **Content Generierung (Matrix füllen):**
Lassen Sie den Generator einmal laufen, um die Texte für alle Branchen zu erstellen (Dauer: ca. 10 Min).
```bash
export PYTHONPATH=$PYTHONPATH:/app/company-explorer
python3 company-explorer/backend/scripts/generate_matrix.py --live
```
2. **Produktions-Deployment:**
Folgen Sie der Anleitung in `connector-superoffice/README.md`, um die App im Developer Portal zu registrieren und den Webhook anzulegen.
---
## 1. Projektübersicht & Architektur
Dieses Projekt ist eine modulare "Lead Enrichment Factory", die darauf ausgelegt ist, Unternehmensdaten aus einem D365-CRM-System automatisiert anzureichern, zu analysieren und für Marketing- & Vertriebszwecke aufzubereiten.

64
seed_test_matrix.py Normal file
View File

@@ -0,0 +1,64 @@
import sqlite3
import datetime
DB_PATH = "companies_v3_fixed_2.db"
def seed_matrix():
print(f"Connecting to {DB_PATH}...")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Configuration of Test Scenarios
scenarios = [
{
"ind_id": 1, # Logistics
"pers_id": 3, # Wirtschaftlicher Entscheider (GF)
"subject": "Kostenreduktion in Ihrer Intralogistik durch autonome Reinigung",
"intro": "als Geschäftsführer wissen Sie: Effizienz ist der Schlüssel. Unsere Roboter senken Ihre Reinigungskosten um bis zu 30% und amortisieren sich in unter 12 Monaten.",
"proof": "Referenzkunden wie DB Schenker und DHL setzen bereits auf unsere Flotte und konnten ihre Prozesskosten signifikant senken."
},
{
"ind_id": 1, # Logistics
"pers_id": 1, # Operativer Entscheider (Lagerleiter maps here!)
"subject": "Weniger Stress mit der Sauberkeit in Ihren Hallen",
"intro": "kennen Sie das Problem: Die Reinigungskräfte fallen aus und der Staub legt sich auf die Ware. Unsere autonomen Systeme reinigen nachts, zuverlässig und ohne, dass Sie sich darum kümmern müssen.",
"proof": "Lagerleiter bei Fiege berichten von einer deutlichen Entlastung des Teams und saubereren Böden ohne Mehraufwand."
}
]
try:
now = datetime.datetime.utcnow().isoformat()
for s in scenarios:
# Check existance
cursor.execute(
"SELECT id FROM marketing_matrix WHERE industry_id = ? AND persona_id = ?",
(s['ind_id'], s['pers_id'])
)
existing = cursor.fetchone()
if existing:
print(f"Updating Matrix for Ind {s['ind_id']} / Pers {s['pers_id']}...")
cursor.execute("""
UPDATE marketing_matrix
SET subject = ?, intro = ?, social_proof = ?, updated_at = ?
WHERE id = ?
""", (s['subject'], s['intro'], s['proof'], now, existing[0]))
else:
print(f"Inserting Matrix for Ind {s['ind_id']} / Pers {s['pers_id']}...")
cursor.execute("""
INSERT INTO marketing_matrix (industry_id, persona_id, subject, intro, social_proof, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (s['ind_id'], s['pers_id'], s['subject'], s['intro'], s['proof'], now))
conn.commit()
print("✅ Matrix updated with realistic test data.")
except Exception as e:
print(f"❌ Error: {e}")
conn.rollback()
finally:
conn.close()
if __name__ == "__main__":
seed_matrix()