Finalize SuperOffice production migration and multi-campaign architecture (v1.8)

This commit is contained in:
2026-02-27 15:09:52 +00:00
parent 89fe284554
commit 41e5696c57
18 changed files with 460 additions and 311 deletions

View File

@@ -201,18 +201,18 @@ Um die Zertifizierung für den SuperOffice App Store zu erhalten, mussten kritis
* **Lösung:** Umstellung auf **`PATCH`**. Wir senden nun nur noch die *tatsächlich geänderten Felder* (Delta).
* **Implementierung:** Der Worker baut nun ein `patch_payload` (z.B. `{'Position': {'Id': 123}}`) und nutzt den dedizierten PATCH-Endpunkt. Dies wurde explizit von SuperOffice für die Zertifizierung gefordert.
### 11. Production Environment (Live Feb 26, 2026)
### 11. Production Environment (Live Feb 27, 2026)
Nach erfolgreicher Zertifizierung durch SuperOffice wurde der Connector auf die Produktionsumgebung umgestellt.
* **Tenant:** `Cust26720`
* **Environment:** `online3` (zuvor `sod`)
* **Endpoint:** `https://online3.superoffice.com/Cust26720/api/v1`
* **Authentication:** Umstellung auf Produktions-Client-ID und -Secret.
* **Authentication:** Umstellung auf Produktions-Client-ID und -Secret erfolgreich verifiziert (Health Check OK).
**Wichtig:** SuperOffice nutzt Load-Balancing. Die Subdomain (`online3`) kann sich theoretisch ändern. Die Anwendung prüft dies dynamisch, aber die Basis-Konfiguration sollte den aktuellen Tenant-Status widerspiegeln.
### 12. Lessons Learned: Production Migration (Feb 26, 2026)
### 12. Lessons Learned: Production Migration (Feb 27, 2026)
Der Wechsel von der Staging-Umgebung (`sod`) zur Produktion (`onlineX`) brachte spezifische technische Hürden mit sich:
@@ -224,24 +224,59 @@ Der Wechsel von der Staging-Umgebung (`sod`) zur Produktion (`onlineX`) brachte
* **Problem:** In der Staging-Umgebung lautet der API-Host meist `app-sod.superoffice.com`. In der Produktion wird das `app-` Präfix oft nicht verwendet oder führt zu Zertifikatsfehlern.
* **Lösung:** Der `SuperOfficeClient` wurde so flexibilisiert, dass er in der Produktion direkt auf `{env}.superoffice.com` zugreift.
3. **Refresh Token Lebenszyklus:**
* Ein Refresh Token ist an die **Client ID** gebunden. Beim Wechsel der App-Umgebung (Staging -> Produktion) muss zwingend ein neuer Refresh Token über den Auth-Flow generiert werden.
3. **Environment Variables Persistence:**
* **Problem:** Docker-Container behalten Umgebungsvariablen oft im Cache ("Shadow Configuration"), selbst wenn die `.env`-Datei geändert wurde.
* **Lösung:** Zwingendes `docker-compose up -d --force-recreate` nach Credentials-Änderungen.
### 13. Post-Migration To-Dos (Manual & Discovery)
### 13. Post-Migration Configuration (Cust26720)
Nach dem technischen Switch müssen folgende Schritte in der SuperOffice-Produktionsumgebung (`Cust26720`) manuell durchgeführt werden:
Die Konfiguration in der `.env` Datei wurde für die Produktion wie folgt finalisiert:
| Funktion | UDF / ID | Entity |
| :--- | :--- | :--- |
| **Subject** | `SuperOffice:19` | Person |
| **Intro Text** | `SuperOffice:20` | Person |
| **Social Proof** | `SuperOffice:21` | Person |
| **Unsubscribe** | `SuperOffice:22` | Person |
| **Campaign Tag** | `SuperOffice:23` | Person |
| **Opener Primary** | `SuperOffice:86` | Contact |
| **Opener Sec.** | `SuperOffice:87` | Contact |
| **Vertical** | `SuperOffice:83` | Contact |
| **Summary** | `SuperOffice:84` | Contact |
| **Last Update** | `SuperOffice:85` | Contact |
### 14. Kampagnen-Steuerung (Usage)
Das System unterstützt mehrere Outreach-Varianten über das Feld **`MA_Campaign`** (Person).
1. **Standard:** Bleibt das Feld leer, werden die Standard-Texte ("standard") für Kaltakquise geladen.
2. **Spezifisch:** Wird ein Wert gewählt (z.B. "Messe 2026"), sucht der Connector gezielt nach Matrix-Einträgen mit diesem Tag.
3. **Fallback:** Existiert für die gewählte Kampagne kein spezifischer Text für das Vertical/Persona, wird automatisch auf "standard" zurückgegriffen.
### 15. Advanced Implementation Details (v1.8)
Mit der Version 1.8 des Workers wurden kritische Optimierungen für den produktiven Betrieb (online3) implementiert, um API-Stabilität und Datenintegrität zu gewährleisten.
#### A. Atomic PATCH Strategy
Um "Race Conditions" und unnötigen API-Traffic zu vermeiden, bündelt der Worker alle Änderungen an einem Kontakt-Objekt in einem einzigen **Atomic PATCH**.
* **Betroffene Felder:** `Address` (Postal & Street), `OrgNr` (VAT), `Urls` (Website) und alle `UserDefinedFields`.
* **Vorteil:** Entweder alle Daten werden konsistent übernommen, oder der Call schlägt kontrolliert fehl. Dies verhindert, dass Teil-Updates (z.B. nur die Adresse) von nachfolgenden UDF-Updates überschrieben werden.
#### B. REST Website-Sync (The `Urls` Array)
SuperOffice REST akzeptiert kein direktes Update auf `UrlAddress` via PATCH. Stattdessen muss das `Urls` Array manipuliert werden.
* **Logik:** Der Worker prüft, ob die KI-entdeckte Website bereits im Array vorhanden ist. Wenn nicht, wird sie als neues Objekt mit der Beschreibung `"AI Discovered"` an den Anfang der Liste gestellt.
* **Format:** `"Urls": [{"Value": "https://...", "Description": "AI Discovered"}]`.
#### C. Kampagnen-Auflösung via `:DisplayText`
Um den Klarnamen einer Kampagne (z.B. "Messe 2026") statt der internen ID (z.B. `[I:123]`) zu erhalten, nutzt der Worker eine OData-Optimierung.
* **Technik:** Im `$select` Parameter wird das Feld `SuperOffice:23:DisplayText` angefordert.
* **Ergebnis:** Der Worker erhält direkt den sauberen String, der zur Steuerung der Textvarianten im Company Explorer dient. Zusätzliche API-Abfragen zur Listenauflösung entfallen.
#### D. Feldlängen & Truncation
Standard-UDF-Textfelder in SuperOffice sind oft auf **254 Zeichen** begrenzt. Da das AI-Dossier (Summary) deutlich länger sein kann, kürzt der Worker den Text hart auf **132 Zeichen** (+ "..."). Dies stellt sicher, dass der gesamte `PATCH` Request nicht aufgrund eines "Field Overflow" von der SuperOffice-Validierung abgelehnt wird.
---
1. **UDFs anlegen (Admin):**
* Erstellen der benutzerdefinierten Felder an **Contact** (Opener, Industry, Summary, Status) und **Person** (Subject, Bridge, Proof).
* Anlegen der Liste für **Verticals** (Branchen) in den Einstellungen.
2. **Discovery (IDs ermitteln):**
* Ausführen von `python3 connector-superoffice/discover_fields.py`, um die neuen `ProgIDs` (z.B. `SuperOffice:12`) und Listen-IDs zu ermitteln.
3. **Konfiguration aktualisieren:**
* Eintragen der neuen IDs in die `.env` Datei (`UDF_SUBJECT`, `VERTICAL_MAP_JSON` etc.).
4. **Webhook registrieren:**
* Ausführen von `python3 connector-superoffice/register_webhook.py`, um den Live-Webhook auf `https://floke-ai.duckdns.org/...` zu schalten.
5. **E-Mail Templates:**
* Templates in SuperOffice so anpassen, dass sie die neuen UDF-Variablen (z.B. `{udf:SuperOffice:X}`) nutzen.
## Appendix: The "First Sentence" Prompt
This is the core logic used to generate the company-specific opener.

View File

@@ -34,6 +34,12 @@ class Settings:
self.UDF_VERTICAL = os.getenv("UDF_VERTICAL", "SuperOffice:5")
self.UDF_OPENER = os.getenv("UDF_OPENER", "SuperOffice:6")
self.UDF_OPENER_SECONDARY = os.getenv("UDF_OPENER_SECONDARY", "SuperOffice:7")
self.UDF_CAMPAIGN = os.getenv("UDF_CAMPAIGN", "SuperOffice:23") # Default from discovery
self.UDF_UNSUBSCRIBE_LINK = os.getenv("UDF_UNSUBSCRIBE_LINK", "SuperOffice:22")
self.UDF_SUMMARY = os.getenv("UDF_SUMMARY", "SuperOffice:84")
self.UDF_LAST_UPDATE = os.getenv("UDF_LAST_UPDATE", "SuperOffice:85")
self.UDF_LAST_OUTREACH = os.getenv("UDF_LAST_OUTREACH", "SuperOffice:88")
# Global instance
settings = Settings()

View File

@@ -0,0 +1,29 @@
import os
import sys
# Füge das aktuelle Verzeichnis zum Python-Pfad hinzu, damit config.py gefunden wird.
sys.path.append(os.getcwd())
try:
from config import settings
except ImportError:
print("Error: Could not import 'settings' from 'config.py'.")
sys.exit(1)
print("--- SuperOffice Configuration Debug ---")
print(f"Environment: {settings.SO_ENVIRONMENT}")
print(f"Client ID: {settings.SO_CLIENT_ID[:5]}... (Length: {len(settings.SO_CLIENT_ID)})")
# Secret nicht ausgeben, nur ob gesetzt
if settings.SO_CLIENT_SECRET:
print(f"Client Secret Set: Yes (Length: {len(settings.SO_CLIENT_SECRET)})")
else:
print("Client Secret Set: No")
if settings.SO_REFRESH_TOKEN:
print(f"Refresh Token Set: Yes (Length: {len(settings.SO_REFRESH_TOKEN)})")
else:
print("Refresh Token Set: No")
print(f"Context Identifier: {settings.SO_CONTEXT_IDENTIFIER}")
print(f"Redirect URI: {settings.SO_REDIRECT_URI}")
print("---------------------------------------")

View File

@@ -15,52 +15,32 @@ def discover():
return
# 1. Discover UDFs (User Defined Fields)
print("\n--- 1. User Defined Fields (UDFs) ---")
# Contact UDFs
print("\n--- 1. User Defined Fields (UDFs) Definitions ---")
try:
print("Fetching a sample Contact to inspect UDFs...")
contacts = client.search("Contact?$top=1")
if contacts:
# Inspect keys of first result
first_contact = contacts[0]
# Try to find ID
c_id = first_contact.get('ContactId') or first_contact.get('PrimaryKey')
# Fetch Metadata about UDFs to get Labels
udf_info = client._get("UserDefinedFieldInfo")
if udf_info:
print(f"Found {len(udf_info)} UDF definitions.")
if c_id:
c = client.get_contact(c_id)
udfs = c.get("UserDefinedFields", {})
print(f"Found {len(udfs)} UDFs on Contact {c_id}:")
for k, v in udfs.items():
print(f" - Key (ProgId): {k} | Value: {v}")
else:
print(f"⚠️ Could not find ID in search result: {first_contact.keys()}")
else:
print("⚠️ No contacts found. Cannot inspect Contact UDFs.")
print("\nFetching a sample Person to inspect UDFs...")
persons = client.search("Person?$top=1")
if persons:
first_person = persons[0]
p_id = first_person.get('PersonId') or first_person.get('PrimaryKey')
# Filter for Contact and Person UDFs
contact_udfs = [u for u in udf_info if u['UDTargetEntityName'] == 'Contact']
person_udfs = [u for u in udf_info if u['UDTargetEntityName'] == 'Person']
if p_id:
p = client.get_person(p_id)
udfs = p.get("UserDefinedFields", {})
print(f"Found {len(udfs)} UDFs on Person {p_id}:")
for k, v in udfs.items():
print(f" - Key (ProgId): {k} | Value: {v}")
else:
print(f"⚠️ Could not find ID in search result: {first_person.keys()}")
print(f"\n--- CONTACT UDFs ({len(contact_udfs)}) ---")
for u in contact_udfs:
print(f" - Label: '{u['FieldLabel']}' | ProgId: '{u['ProgId']}' | Type: {u['UDFieldType']}")
print(f"\n--- PERSON UDFs ({len(person_udfs)}) ---")
for u in person_udfs:
print(f" - Label: '{u['FieldLabel']}' | ProgId: '{u['ProgId']}' | Type: {u['UDFieldType']}")
else:
print("⚠️ No persons found. Cannot inspect Person UDFs.")
print("❌ Could not fetch UserDefinedFieldInfo.")
except Exception as e:
print(f"❌ Error inspecting UDFs: {e}")
print(f"❌ Error fetching UDF Info: {e}")
# 2. Discover Lists (MDO Providers)
print("\n--- 2. MDO Lists (Positions, Business/Industry) ---")
print("\n--- 2. Sample Data Inspection ---")
lists_to_check = ["position", "business"]

View File

@@ -3,11 +3,18 @@ from dotenv import load_dotenv
import urllib.parse
def generate_url():
load_dotenv(dotenv_path="/home/node/clawd/.env")
import os
from dotenv import load_dotenv
import urllib.parse
client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
redirect_uri = "https://devnet-tools.superoffice.com/openid/callback" # Das muss im Portal so registriert sein
state = "12345"
# Try current and parent dir
load_dotenv()
load_dotenv(dotenv_path="../.env")
client_id = os.getenv("SO_CLIENT_ID")
# MUST match what is registered in the SuperOffice Developer Portal for this Client ID
redirect_uri = os.getenv("SO_REDIRECT_URI", "http://localhost")
state = "roboplanet_prod_init"
if not client_id:
print("Fehler: Keine SO_CLIENT_ID in der .env gefunden!")
@@ -17,19 +24,24 @@ def generate_url():
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": "openid offline_access", # Wichtig für Refresh Token
"scope": "openid", # Basic scope
"state": state
}
base_url = "https://sod.superoffice.com/login/common/oauth/authorize"
# Use online.superoffice.com for Production
base_url = "https://online.superoffice.com/login/common/oauth/authorize"
auth_url = f"{base_url}?{urllib.parse.urlencode(params)}"
print("\nBitte öffne diese URL im Browser:")
print("\n--- PRODUKTIV-AUTH-LINK ---")
print(f"Mandant: {os.getenv('SO_CONTEXT_IDENTIFIER', 'Cust26720')}")
print(f"Client ID: {client_id[:5]}...")
print("-" * 60)
print(auth_url)
print("-" * 60)
print("\nNach dem Login wirst du auf eine Seite weitergeleitet, die nicht lädt (localhost).")
print("Kopiere die URL aus der Adresszeile und gib mir den Wert nach '?code='.")
print("\n1. Öffne diesen Link im Browser.")
print("2. Logge dich in deinen ECHTEN Mandanten ein (Cust26720).")
print("3. Nach der Bestätigung kopiere die URL aus der Adresszeile.")
print("4. Paste die URL hier in den Chat.")
if __name__ == "__main__":
generate_url()

View File

@@ -28,7 +28,11 @@ class AuthHandler:
return self._refresh_access_token()
def _refresh_access_token(self):
url = f"https://{self.env}.superoffice.com/login/common/oauth/tokens"
# OAuth token endpoint is ALWAYS online.superoffice.com for production,
# or sod.superoffice.com for sandbox.
token_domain = "online.superoffice.com" if "online" in self.env.lower() else "sod.superoffice.com"
url = f"https://{token_domain}/login/common/oauth/tokens"
data = {
"grant_type": "refresh_token",
"client_id": self.client_id,
@@ -38,12 +42,12 @@ class AuthHandler:
}
try:
resp = requests.post(url, data=data)
resp.raise_for_status()
if resp.status_code != 200:
logger.error(f"❌ Token Refresh Failed (Status {resp.status_code}): {resp.text}")
return None
logger.info("Access token refreshed successfully.")
return resp.json().get("access_token")
except requests.exceptions.HTTPError as e:
logger.error(f"❌ Token Refresh Error (Status: {e.response.status_code}): {e.response.text}")
return None
except Exception as e:
logger.error(f"❌ Connection Error during token refresh: {e}")
return None
@@ -53,7 +57,8 @@ class SuperOfficeClient:
self.auth_handler = auth_handler
self.env = os.getenv("SO_ENVIRONMENT", "sod")
self.cust_id = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774")
self.base_url = f"https://app-{self.env}.superoffice.com/{self.cust_id}/api/v1"
# API base URL: online3.superoffice.com is valid here
self.base_url = f"https://{self.env}.superoffice.com/{self.cust_id}/api/v1"
self.access_token = self.auth_handler.get_access_token()
if not self.access_token:
raise Exception("Failed to obtain access token during SuperOfficeClient initialization.")

View File

@@ -1,10 +1,11 @@
import sys
import os
from superoffice_client import SuperOfficeClient
from dotenv import load_dotenv
# Configuration
WEBHOOK_NAME = "Gemini Connector Hook"
TARGET_URL = "https://floke-ai.duckdns.org/connector/webhook?token=changeme" # Token match .env
WEBHOOK_NAME = "Gemini Connector Production"
TARGET_URL = f"https://floke-ai.duckdns.org/connector/webhook?token={os.getenv('WEBHOOK_TOKEN', 'changeme')}"
EVENTS = [
"contact.created",
"contact.changed",
@@ -13,6 +14,7 @@ EVENTS = [
]
def register():
load_dotenv("../.env")
print("🚀 Initializing SuperOffice Client...")
try:
client = SuperOfficeClient()

View File

@@ -43,7 +43,11 @@ class SuperOfficeClient:
def _refresh_access_token(self):
"""Refreshes and returns a new access token."""
url = "https://online.superoffice.com/login/common/oauth/tokens"
# OAuth token endpoint is ALWAYS online.superoffice.com for production,
# or sod.superoffice.com for sandbox.
token_domain = "online.superoffice.com" if "online" in self.env.lower() else "sod.superoffice.com"
url = f"https://{token_domain}/login/common/oauth/tokens"
logger.debug(f"DEBUG: Refresh URL: '{url}' (Env: '{self.env}')")
data = {
@@ -56,11 +60,18 @@ class SuperOfficeClient:
try:
resp = requests.post(url, data=data)
# Catch non-JSON responses early
if resp.status_code != 200:
logger.error(f"❌ Token Refresh Failed (Status {resp.status_code})")
logger.error(f"Response Body: {resp.text[:500]}")
return None
resp.raise_for_status()
return resp.json().get("access_token")
except requests.exceptions.HTTPError as e:
logger.error(f"❌ Token Refresh Error (Status: {e.response.status_code}): {e.response.text}")
logger.debug(f"Response Headers: {e.response.headers}")
except requests.exceptions.JSONDecodeError:
logger.error(f"❌ Token Refresh Error: Received non-JSON response from {url}")
logger.debug(f"Raw Response: {resp.text[:500]}")
return None
except Exception as e:
logger.error(f"❌ Connection Error during token refresh: {e}")

View File

@@ -3,6 +3,7 @@ import logging
import os
import requests
import json
from datetime import datetime
from queue_manager import JobQueue
from superoffice_client import SuperOfficeClient
from config import settings
@@ -21,16 +22,15 @@ def process_job(job, so_client: SuperOfficeClient):
"""
Core logic for processing a single job.
"""
logger.info(f"Processing Job {job['id']} ({job['event_type']})")
logger.info(f"--- [WORKER v1.8] Processing Job {job['id']} ({job['event_type']}) ---")
payload = job['payload']
event_low = job['event_type'].lower()
# 1. Extract IDs Early (Crucial for logging and logic)
# 1. Extract IDs Early
person_id = None
contact_id = None
job_title = payload.get("JobTitle")
# Try getting IDs from FieldValues (more reliable for Webhooks)
field_values = payload.get("FieldValues", {})
if "person_id" in field_values:
person_id = int(field_values["person_id"])
@@ -39,7 +39,6 @@ def process_job(job, so_client: SuperOfficeClient):
if "title" in field_values and not job_title:
job_title = field_values["title"]
# Fallback to older payload structure if not found
if not person_id:
if "PersonId" in payload:
person_id = int(payload["PersonId"])
@@ -53,7 +52,6 @@ def process_job(job, so_client: SuperOfficeClient):
contact_id = int(payload["PrimaryKey"])
# Fallback/Deep Lookup & Fetch JobTitle if missing
# Only fetch if we are missing critical info AND have a person_id
if person_id and (not job_title or not contact_id):
try:
person_details = so_client.get_person(
@@ -63,15 +61,12 @@ def process_job(job, so_client: SuperOfficeClient):
if person_details:
if not job_title:
job_title = person_details.get("JobTitle") or person_details.get("Title")
# Robust extraction of ContactId
if not contact_id:
contact_obj = person_details.get("Contact")
if contact_obj and isinstance(contact_obj, dict):
contact_id = contact_obj.get("ContactId")
elif "ContactId" in person_details: # Sometimes flat
elif "ContactId" in person_details:
contact_id = person_details.get("ContactId")
except Exception as e:
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
@@ -80,61 +75,17 @@ def process_job(job, so_client: SuperOfficeClient):
logger.info(f"Skipping irrelevant event type: {job['event_type']}")
return "SUCCESS"
changes = [c.lower() for c in payload.get("Changes", [])]
if changes:
relevant_contact = ["name", "department", "urladdress", "number1", "number2", "userdefinedfields"]
if settings.UDF_VERTICAL:
relevant_contact.append(settings.UDF_VERTICAL.lower())
relevant_person = ["jobtitle", "position", "title", "userdefinedfields", "person_id"]
technical_fields = ["updated", "updated_associate_id", "contact_id", "person_id", "registered", "registered_associate_id"]
actual_changes = [c for c in changes if c not in technical_fields]
is_relevant = False
if "contact" in event_low:
logger.info(f"Checking relevance for Contact {contact_id or 'Unknown'}. Changes: {actual_changes}")
if any(f in actual_changes for f in relevant_contact):
is_relevant = True
elif "urls" in actual_changes:
is_relevant = True
if "person" in event_low:
logger.info(f"Checking relevance for Person {person_id or 'Unknown'}. Changes: {actual_changes}")
if any(f in actual_changes for f in relevant_person):
is_relevant = True
if not is_relevant:
logger.info(f"Skipping technical/irrelevant changes: {changes}")
return "SUCCESS"
else:
logger.info("Change is deemed RELEVANT. Proceeding...")
if not contact_id:
raise ValueError(f"Could not identify ContactId in payload: {payload}")
logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}")
# --- Cascading Logic ---
if "contact" in event_low and not person_id:
logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.")
try:
persons = so_client.search(f"Person?$select=PersonId&$filter=contact/contactId eq {contact_id}")
if persons:
q = JobQueue()
for p in persons:
p_id = p.get("PersonId")
if p_id:
logger.info(f"Cascading: Enqueueing job for Person {p_id}")
q.add_job("person.changed", {"PersonId": p_id, "ContactId": contact_id, "Source": "Cascade"})
except Exception as e:
logger.warning(f"Failed to cascade to persons for contact {contact_id}: {e}")
# 1b. Fetch full contact details for 'Double Truth' check (Master Data Sync)
# 1b. Fetch full contact details for 'Double Truth' check
crm_name = None
crm_website = None
crm_industry_name = None
contact_details = None
campaign_tag = None
try:
contact_details = so_client.get_contact(
@@ -146,31 +97,46 @@ def process_job(job, so_client: SuperOfficeClient):
crm_name = contact_details.get("Name")
crm_website = contact_details.get("UrlAddress")
if not crm_website and "Urls" in contact_details and contact_details["Urls"]:
crm_website = contact_details["Urls"][0].get("Value")
# --- Fetch Person UDFs for Campaign Tag ---
if person_id:
try:
# We fetch the person again specifically for UDFs to ensure we get DisplayTexts
person_details = so_client.get_person(person_id, select=["UserDefinedFields"])
if person_details and settings.UDF_CAMPAIGN:
udfs = person_details.get("UserDefinedFields", {})
# SuperOffice REST returns DisplayText for lists as 'ProgID:DisplayText'
display_key = f"{settings.UDF_CAMPAIGN}:DisplayText"
campaign_tag = udfs.get(display_key)
if not campaign_tag:
# Fallback to manual resolution if DisplayText is missing
raw_tag = udfs.get(settings.UDF_CAMPAIGN, "")
if raw_tag:
campaign_tag = str(raw_tag).strip()
if campaign_tag:
logger.info(f"🎯 CAMPAIGN DETECTED: '{campaign_tag}'")
else:
logger.info(" No Campaign Tag found (Field is empty).")
except Exception as e:
logger.warning(f"Could not fetch campaign tag: {e}")
if settings.UDF_VERTICAL:
udfs = contact_details.get("UserDefinedFields", {})
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
if so_vertical_val:
val_str = str(so_vertical_val)
if val_str.startswith("[I:"):
val_str = val_str.split(":")[1].strip("]")
val_str = str(so_vertical_val).replace("[I:","").replace("]","")
try:
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
vertical_map_rev = {str(v): k for k, v in vertical_map.items()}
if val_str in vertical_map_rev:
crm_industry_name = vertical_map_rev[val_str]
logger.info(f"Detected CRM Vertical Override: {so_vertical_val} -> {crm_industry_name}")
except Exception as ex:
logger.error(f"Error mapping vertical ID {val_str}: {ex}")
except Exception as e:
logger.error(f"Failed to fetch contact details for {contact_id}: {e}")
# Critical failure: Without contact details, we cannot provision correctly.
# Raising exception triggers a retry.
raise Exception(f"SuperOffice API Failure: {e}")
# --- 3. Company Explorer Provisioning ---
@@ -181,7 +147,8 @@ def process_job(job, so_client: SuperOfficeClient):
"job_title": job_title,
"crm_name": crm_name,
"crm_website": crm_website,
"crm_industry_name": crm_industry_name
"crm_industry_name": crm_industry_name,
"campaign_tag": campaign_tag
}
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
@@ -189,34 +156,20 @@ def process_job(job, so_client: SuperOfficeClient):
try:
resp = requests.post(ce_url, json=ce_req, auth=ce_auth)
if resp.status_code == 404:
logger.warning(f"Company Explorer returned 404. Retrying later.")
return "RETRY"
resp.raise_for_status()
provisioning_data = resp.json()
if provisioning_data.get("status") == "processing":
logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.")
return "RETRY"
except requests.exceptions.RequestException as e:
raise Exception(f"Company Explorer API failed: {e}")
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
# Fetch fresh Contact Data for comparison
try:
contact_data = so_client.get_contact(contact_id)
if not contact_data:
logger.error(f"Contact {contact_id} not found in SuperOffice.")
return "FAILED"
except Exception as e:
logger.error(f"Failed to fetch contact {contact_id}: {e}")
return "RETRY"
contact_data = so_client.get_contact(contact_id)
if not contact_data: return "FAILED"
contact_patch = {}
if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {}
# --- A. Vertical Sync ---
vertical_name = provisioning_data.get("vertical_name")
@@ -226,13 +179,11 @@ def process_job(job, so_client: SuperOfficeClient):
vertical_id = vertical_map.get(vertical_name)
if vertical_id:
udf_key = settings.UDF_VERTICAL
current_val = contact_data["UserDefinedFields"].get(udf_key, "")
if current_val and str(current_val).startswith("[I:"):
current_val = str(current_val).split(":")[1].strip("]")
if str(current_val) != str(vertical_id):
logger.info(f"Change detected: Vertical {current_val} -> {vertical_id}")
contact_patch.setdefault("UserDefinedFields", {})[udf_key] = str(vertical_id)
current_val = contact_data.get("UserDefinedFields", {}).get(udf_key, "")
if str(current_val).replace("[I:","").replace("]","") != str(vertical_id):
logger.info(f"Change detected: Vertical -> {vertical_id}")
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][udf_key] = str(vertical_id)
except Exception as e:
logger.error(f"Vertical sync error: {e}")
@@ -243,154 +194,92 @@ def process_job(job, so_client: SuperOfficeClient):
ce_vat = provisioning_data.get("vat_id")
if ce_city or ce_street or ce_zip:
if "Address" not in contact_data or contact_data["Address"] is None:
contact_data["Address"] = {"Street": {}, "Postal": {}}
addr_obj = contact_data["Address"]
if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {}
if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {}
for type_key in ["Postal", "Street"]:
cur_addr = (contact_data.get("Address") or {}).get(type_key, {})
if ce_city and cur_addr.get("City") != ce_city: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["City"] = ce_city
if ce_street and cur_addr.get("Address1") != ce_street: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Address1"] = ce_street
if ce_zip and cur_addr.get("Zipcode") != ce_zip: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Zipcode"] = ce_zip
def update_addr_patch(field_name, new_val, log_name):
if new_val:
for type_key in ["Postal", "Street"]:
cur = addr_obj[type_key].get(field_name, "")
if cur != new_val:
logger.info(f"Change detected: {type_key} {log_name} '{cur}' -> '{new_val}'")
contact_patch.setdefault("Address", {}).setdefault(type_key, {})[field_name] = new_val
if ce_vat and contact_data.get("OrgNr") != ce_vat:
contact_patch["OrgNr"] = ce_vat
update_addr_patch("City", ce_city, "City")
update_addr_patch("Address1", ce_street, "Street")
update_addr_patch("Zipcode", ce_zip, "Zip")
if ce_vat:
current_vat = contact_data.get("OrgNr", "")
if current_vat != ce_vat:
logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'")
contact_patch["OrgNr"] = ce_vat
# --- C. AI Openers Sync ---
# --- C. AI Openers & Summary Sync ---
ce_opener = provisioning_data.get("opener")
ce_opener_secondary = provisioning_data.get("opener_secondary")
ce_summary = provisioning_data.get("summary")
if ce_opener and ce_opener != "null":
current_opener = contact_data["UserDefinedFields"].get(settings.UDF_OPENER, "")
if current_opener != ce_opener:
logger.info("Change detected: Primary Opener")
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER] = ce_opener
if ce_opener and ce_opener != "null" and contact_data.get("UserDefinedFields", {}).get(settings.UDF_OPENER) != ce_opener:
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_OPENER] = ce_opener
if ce_opener_secondary and ce_opener_secondary != "null" and contact_data.get("UserDefinedFields", {}).get(settings.UDF_OPENER_SECONDARY) != ce_opener_secondary:
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
if ce_opener_secondary and ce_opener_secondary != "null":
current_opener_sec = contact_data["UserDefinedFields"].get(settings.UDF_OPENER_SECONDARY, "")
if current_opener_sec != ce_opener_secondary:
logger.info("Change detected: Secondary Opener")
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
if ce_summary and ce_summary != "null":
short_summary = (ce_summary[:132] + "...") if len(ce_summary) > 135 else ce_summary
if contact_data.get("UserDefinedFields", {}).get(settings.UDF_SUMMARY) != short_summary:
logger.info("Change detected: AI Summary")
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_SUMMARY] = short_summary
# --- D. Apply Updates (Single PATCH Transaction) ---
# --- D. Timestamps & Website Sync ---
if settings.UDF_LAST_UPDATE:
now_so = f"[D:{datetime.now().strftime('%m/%d/%Y %H:%M:%S')}]"
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_LAST_UPDATE] = now_so
ce_website = provisioning_data.get("website")
if ce_website and (not contact_data.get("Urls") or settings.ENABLE_WEBSITE_SYNC):
current_urls = contact_data.get("Urls") or []
if not any(u.get("Value") == ce_website for u in current_urls):
logger.info(f"Syncing Website: {ce_website}")
if "Urls" not in contact_patch: contact_patch["Urls"] = []
contact_patch["Urls"] = [{"Value": ce_website, "Description": "AI Discovered"}] + current_urls
# --- E. Apply Updates (Single PATCH) ---
if contact_patch:
logger.info(f"Pushing combined PATCH updates for Contact {contact_id}...")
try:
so_client.patch_contact(contact_id, contact_patch)
logger.info("✅ Contact Update Successful (PATCH).")
except Exception as e:
logger.error(f"Failed to update Contact {contact_id}: {e}")
raise
else:
logger.info(f"No changes detected for Contact {contact_id}. Skipping update.")
logger.info(f"Pushing combined PATCH for Contact {contact_id}: {list(contact_patch.keys())}")
so_client.patch_contact(contact_id, contact_patch)
logger.info("✅ Contact Update Successful.")
# 2d. Sync Person Position (Role) - if Person exists
# 2d. Sync Person Position
role_name = provisioning_data.get("role_name")
if person_id and role_name:
try:
persona_map = json.loads(settings.PERSONA_MAP_JSON)
position_id = persona_map.get(role_name)
if position_id:
logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}")
so_client.update_person_position(person_id, int(position_id))
except Exception as e:
logger.error(f"Error syncing position for Person {person_id}: {e}")
logger.error(f"Error syncing position: {e}")
# 3. Update SuperOffice Texts (Only if person_id is present)
if not person_id:
logger.info("Sync complete (Company only).")
return "SUCCESS"
texts = provisioning_data.get("texts", {})
if not any(texts.values()):
logger.info("No texts returned from Matrix yet.")
return "SUCCESS"
udf_update = {}
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
if udf_update:
try:
current_person = so_client.get_person(person_id)
current_udfs = current_person.get("UserDefinedFields", {})
needs_update = False
for key, new_val in udf_update.items():
if current_udfs.get(key, "") != new_val:
needs_update = True
break
# 3. Update SuperOffice Texts (Person)
if person_id:
texts = provisioning_data.get("texts", {})
unsubscribe_link = provisioning_data.get("unsubscribe_link")
udf_update = {}
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
if unsubscribe_link and settings.UDF_UNSUBSCRIBE_LINK:
udf_update[settings.UDF_UNSUBSCRIBE_LINK] = unsubscribe_link
if udf_update:
logger.info(f"Applying text update to Person {person_id}.")
so_client.update_entity_udfs(person_id, "Person", udf_update)
# Simulation Trigger: Either texts changed, OR it's a direct manual trigger
if needs_update or (person_id and not "Source" in payload):
if needs_update:
logger.info(f"Applying text update to Person {person_id}.")
so_client.update_entity_udfs(person_id, "Person", udf_update)
else:
logger.info(f"Texts already in sync for Person {person_id}, but triggering simulation.")
# --- 4. Create Email Simulation Appointment ---
try:
opener = provisioning_data.get("opener", "")
intro = texts.get("intro", "")
proof = texts.get("social_proof", "")
subject = texts.get("subject", "No Subject")
salutation = "Hallo"
p_data = so_client.get_person(person_id)
if p_data:
fname = p_data.get("Firstname", "")
lname = p_data.get("Lastname", "")
if fname or lname:
salutation = f"Hallo {fname} {lname}".strip()
# --- 4. Create Email Simulation Appointment ---
try:
opener = provisioning_data.get("opener") or ""
intro = texts.get("intro") or ""
proof = texts.get("social_proof") or ""
subject = texts.get("subject", "No Subject")
email_body = f"Betreff: {subject}\n\n{opener}\n\n{intro}\n\n{proof}\n\n(Generated via Gemini Marketing Engine)"
so_client.create_appointment(f"KI: {subject}", email_body, contact_id, person_id)
except Exception as e:
logger.error(f"Failed simulation: {e}")
cta = (
"H\u00e4tten Sie am kommenden Mittwoch gegen 11 Uhr kurz Zeit, f\u00fcr einen kurzen Austausch hierzu?\n"
"Gerne k\u00f6nnen Sie auch einen alternativen Termin in meinem Kalender buchen. (bookings Link)"
)
email_body = (
f"{salutation},\n\n"
f"{opener.strip()}\n\n"
f"{intro.strip()}\n\n"
f"{cta.strip()}\n\n"
f"{proof.strip()}\n\n"
"(Generated via Gemini Marketing Engine)"
)
from datetime import datetime
now_str = datetime.now().strftime("%H:%M")
appt_title = f"[{now_str}] KI: {subject}"
so_client.create_appointment(
subject=appt_title,
description=email_body,
contact_id=contact_id,
person_id=person_id
)
except Exception as e:
logger.error(f"Failed to create email simulation appointment: {e}")
else:
logger.info(f"Skipping update for Person {person_id}: Values match.")
except Exception as e:
logger.error(f"Error during Person update: {e}")
raise
logger.info("Job successfully processed.")
return "SUCCESS"
def run_worker():
@@ -401,7 +290,7 @@ def run_worker():
so_client = SuperOfficeClient()
if not so_client.access_token: raise Exception("Auth failed")
except Exception as e:
logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...")
logger.critical(f"Failed to initialize SO Client. Retrying in 30s...")
time.sleep(30)
logger.info("Worker started. Polling queue...")
@@ -411,12 +300,9 @@ def run_worker():
if job:
try:
result = process_job(job, so_client)
if result == "RETRY":
queue.retry_job_later(job['id'], delay_seconds=120, error_msg="CE is processing...")
elif result == "FAILED":
queue.fail_job(job['id'], "Job failed with FAILED status")
else:
queue.complete_job(job['id'])
if result == "RETRY": queue.retry_job_later(job['id'], delay_seconds=120)
elif result == "FAILED": queue.fail_job(job['id'], "Job failed status")
else: queue.complete_job(job['id'])
except Exception as e:
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
queue.fail_job(job['id'], str(e))
@@ -427,4 +313,4 @@ def run_worker():
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
run_worker()
run_worker()