Finalize SuperOffice production migration and multi-campaign architecture (v1.8)
This commit is contained in:
@@ -201,18 +201,18 @@ Um die Zertifizierung für den SuperOffice App Store zu erhalten, mussten kritis
|
||||
* **Lösung:** Umstellung auf **`PATCH`**. Wir senden nun nur noch die *tatsächlich geänderten Felder* (Delta).
|
||||
* **Implementierung:** Der Worker baut nun ein `patch_payload` (z.B. `{'Position': {'Id': 123}}`) und nutzt den dedizierten PATCH-Endpunkt. Dies wurde explizit von SuperOffice für die Zertifizierung gefordert.
|
||||
|
||||
### 11. Production Environment (Live Feb 26, 2026)
|
||||
### 11. Production Environment (Live Feb 27, 2026)
|
||||
|
||||
Nach erfolgreicher Zertifizierung durch SuperOffice wurde der Connector auf die Produktionsumgebung umgestellt.
|
||||
|
||||
* **Tenant:** `Cust26720`
|
||||
* **Environment:** `online3` (zuvor `sod`)
|
||||
* **Endpoint:** `https://online3.superoffice.com/Cust26720/api/v1`
|
||||
* **Authentication:** Umstellung auf Produktions-Client-ID und -Secret.
|
||||
* **Authentication:** Umstellung auf Produktions-Client-ID und -Secret erfolgreich verifiziert (Health Check OK).
|
||||
|
||||
**Wichtig:** SuperOffice nutzt Load-Balancing. Die Subdomain (`online3`) kann sich theoretisch ändern. Die Anwendung prüft dies dynamisch, aber die Basis-Konfiguration sollte den aktuellen Tenant-Status widerspiegeln.
|
||||
|
||||
### 12. Lessons Learned: Production Migration (Feb 26, 2026)
|
||||
### 12. Lessons Learned: Production Migration (Feb 27, 2026)
|
||||
|
||||
Der Wechsel von der Staging-Umgebung (`sod`) zur Produktion (`onlineX`) brachte spezifische technische Hürden mit sich:
|
||||
|
||||
@@ -224,24 +224,59 @@ Der Wechsel von der Staging-Umgebung (`sod`) zur Produktion (`onlineX`) brachte
|
||||
* **Problem:** In der Staging-Umgebung lautet der API-Host meist `app-sod.superoffice.com`. In der Produktion wird das `app-` Präfix oft nicht verwendet oder führt zu Zertifikatsfehlern.
|
||||
* **Lösung:** Der `SuperOfficeClient` wurde so flexibilisiert, dass er in der Produktion direkt auf `{env}.superoffice.com` zugreift.
|
||||
|
||||
3. **Refresh Token Lebenszyklus:**
|
||||
* Ein Refresh Token ist an die **Client ID** gebunden. Beim Wechsel der App-Umgebung (Staging -> Produktion) muss zwingend ein neuer Refresh Token über den Auth-Flow generiert werden.
|
||||
3. **Environment Variables Persistence:**
|
||||
* **Problem:** Docker-Container behalten Umgebungsvariablen oft im Cache ("Shadow Configuration"), selbst wenn die `.env`-Datei geändert wurde.
|
||||
* **Lösung:** Zwingendes `docker-compose up -d --force-recreate` nach Credentials-Änderungen.
|
||||
|
||||
### 13. Post-Migration To-Dos (Manual & Discovery)
|
||||
### 13. Post-Migration Configuration (Cust26720)
|
||||
|
||||
Nach dem technischen Switch müssen folgende Schritte in der SuperOffice-Produktionsumgebung (`Cust26720`) manuell durchgeführt werden:
|
||||
Die Konfiguration in der `.env` Datei wurde für die Produktion wie folgt finalisiert:
|
||||
|
||||
| Funktion | UDF / ID | Entity |
|
||||
| :--- | :--- | :--- |
|
||||
| **Subject** | `SuperOffice:19` | Person |
|
||||
| **Intro Text** | `SuperOffice:20` | Person |
|
||||
| **Social Proof** | `SuperOffice:21` | Person |
|
||||
| **Unsubscribe** | `SuperOffice:22` | Person |
|
||||
| **Campaign Tag** | `SuperOffice:23` | Person |
|
||||
| **Opener Primary** | `SuperOffice:86` | Contact |
|
||||
| **Opener Sec.** | `SuperOffice:87` | Contact |
|
||||
| **Vertical** | `SuperOffice:83` | Contact |
|
||||
| **Summary** | `SuperOffice:84` | Contact |
|
||||
| **Last Update** | `SuperOffice:85` | Contact |
|
||||
|
||||
### 14. Kampagnen-Steuerung (Usage)
|
||||
|
||||
Das System unterstützt mehrere Outreach-Varianten über das Feld **`MA_Campaign`** (Person).
|
||||
|
||||
1. **Standard:** Bleibt das Feld leer, werden die Standard-Texte ("standard") für Kaltakquise geladen.
|
||||
2. **Spezifisch:** Wird ein Wert gewählt (z.B. "Messe 2026"), sucht der Connector gezielt nach Matrix-Einträgen mit diesem Tag.
|
||||
3. **Fallback:** Existiert für die gewählte Kampagne kein spezifischer Text für das Vertical/Persona, wird automatisch auf "standard" zurückgegriffen.
|
||||
|
||||
### 15. Advanced Implementation Details (v1.8)
|
||||
|
||||
Mit der Version 1.8 des Workers wurden kritische Optimierungen für den produktiven Betrieb (online3) implementiert, um API-Stabilität und Datenintegrität zu gewährleisten.
|
||||
|
||||
#### A. Atomic PATCH Strategy
|
||||
Um "Race Conditions" und unnötigen API-Traffic zu vermeiden, bündelt der Worker alle Änderungen an einem Kontakt-Objekt in einem einzigen **Atomic PATCH**.
|
||||
* **Betroffene Felder:** `Address` (Postal & Street), `OrgNr` (VAT), `Urls` (Website) und alle `UserDefinedFields`.
|
||||
* **Vorteil:** Entweder alle Daten werden konsistent übernommen, oder der Call schlägt kontrolliert fehl. Dies verhindert, dass Teil-Updates (z.B. nur die Adresse) von nachfolgenden UDF-Updates überschrieben werden.
|
||||
|
||||
#### B. REST Website-Sync (The `Urls` Array)
|
||||
SuperOffice REST akzeptiert kein direktes Update auf `UrlAddress` via PATCH. Stattdessen muss das `Urls` Array manipuliert werden.
|
||||
* **Logik:** Der Worker prüft, ob die KI-entdeckte Website bereits im Array vorhanden ist. Wenn nicht, wird sie als neues Objekt mit der Beschreibung `"AI Discovered"` an den Anfang der Liste gestellt.
|
||||
* **Format:** `"Urls": [{"Value": "https://...", "Description": "AI Discovered"}]`.
|
||||
|
||||
#### C. Kampagnen-Auflösung via `:DisplayText`
|
||||
Um den Klarnamen einer Kampagne (z.B. "Messe 2026") statt der internen ID (z.B. `[I:123]`) zu erhalten, nutzt der Worker eine OData-Optimierung.
|
||||
* **Technik:** Im `$select` Parameter wird das Feld `SuperOffice:23:DisplayText` angefordert.
|
||||
* **Ergebnis:** Der Worker erhält direkt den sauberen String, der zur Steuerung der Textvarianten im Company Explorer dient. Zusätzliche API-Abfragen zur Listenauflösung entfallen.
|
||||
|
||||
#### D. Feldlängen & Truncation
|
||||
Standard-UDF-Textfelder in SuperOffice sind oft auf **254 Zeichen** begrenzt. Da das AI-Dossier (Summary) deutlich länger sein kann, kürzt der Worker den Text hart auf **132 Zeichen** (+ "..."). Dies stellt sicher, dass der gesamte `PATCH` Request nicht aufgrund eines "Field Overflow" von der SuperOffice-Validierung abgelehnt wird.
|
||||
|
||||
---
|
||||
|
||||
1. **UDFs anlegen (Admin):**
|
||||
* Erstellen der benutzerdefinierten Felder an **Contact** (Opener, Industry, Summary, Status) und **Person** (Subject, Bridge, Proof).
|
||||
* Anlegen der Liste für **Verticals** (Branchen) in den Einstellungen.
|
||||
2. **Discovery (IDs ermitteln):**
|
||||
* Ausführen von `python3 connector-superoffice/discover_fields.py`, um die neuen `ProgIDs` (z.B. `SuperOffice:12`) und Listen-IDs zu ermitteln.
|
||||
3. **Konfiguration aktualisieren:**
|
||||
* Eintragen der neuen IDs in die `.env` Datei (`UDF_SUBJECT`, `VERTICAL_MAP_JSON` etc.).
|
||||
4. **Webhook registrieren:**
|
||||
* Ausführen von `python3 connector-superoffice/register_webhook.py`, um den Live-Webhook auf `https://floke-ai.duckdns.org/...` zu schalten.
|
||||
5. **E-Mail Templates:**
|
||||
* Templates in SuperOffice so anpassen, dass sie die neuen UDF-Variablen (z.B. `{udf:SuperOffice:X}`) nutzen.
|
||||
|
||||
## Appendix: The "First Sentence" Prompt
|
||||
This is the core logic used to generate the company-specific opener.
|
||||
|
||||
@@ -34,6 +34,12 @@ class Settings:
|
||||
self.UDF_VERTICAL = os.getenv("UDF_VERTICAL", "SuperOffice:5")
|
||||
self.UDF_OPENER = os.getenv("UDF_OPENER", "SuperOffice:6")
|
||||
self.UDF_OPENER_SECONDARY = os.getenv("UDF_OPENER_SECONDARY", "SuperOffice:7")
|
||||
self.UDF_CAMPAIGN = os.getenv("UDF_CAMPAIGN", "SuperOffice:23") # Default from discovery
|
||||
self.UDF_UNSUBSCRIBE_LINK = os.getenv("UDF_UNSUBSCRIBE_LINK", "SuperOffice:22")
|
||||
self.UDF_SUMMARY = os.getenv("UDF_SUMMARY", "SuperOffice:84")
|
||||
self.UDF_LAST_UPDATE = os.getenv("UDF_LAST_UPDATE", "SuperOffice:85")
|
||||
self.UDF_LAST_OUTREACH = os.getenv("UDF_LAST_OUTREACH", "SuperOffice:88")
|
||||
|
||||
|
||||
# Global instance
|
||||
settings = Settings()
|
||||
29
connector-superoffice/debug_config_check.py
Normal file
29
connector-superoffice/debug_config_check.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Füge das aktuelle Verzeichnis zum Python-Pfad hinzu, damit config.py gefunden wird.
|
||||
sys.path.append(os.getcwd())
|
||||
|
||||
try:
|
||||
from config import settings
|
||||
except ImportError:
|
||||
print("Error: Could not import 'settings' from 'config.py'.")
|
||||
sys.exit(1)
|
||||
|
||||
print("--- SuperOffice Configuration Debug ---")
|
||||
print(f"Environment: {settings.SO_ENVIRONMENT}")
|
||||
print(f"Client ID: {settings.SO_CLIENT_ID[:5]}... (Length: {len(settings.SO_CLIENT_ID)})")
|
||||
# Secret nicht ausgeben, nur ob gesetzt
|
||||
if settings.SO_CLIENT_SECRET:
|
||||
print(f"Client Secret Set: Yes (Length: {len(settings.SO_CLIENT_SECRET)})")
|
||||
else:
|
||||
print("Client Secret Set: No")
|
||||
|
||||
if settings.SO_REFRESH_TOKEN:
|
||||
print(f"Refresh Token Set: Yes (Length: {len(settings.SO_REFRESH_TOKEN)})")
|
||||
else:
|
||||
print("Refresh Token Set: No")
|
||||
|
||||
print(f"Context Identifier: {settings.SO_CONTEXT_IDENTIFIER}")
|
||||
print(f"Redirect URI: {settings.SO_REDIRECT_URI}")
|
||||
print("---------------------------------------")
|
||||
@@ -15,52 +15,32 @@ def discover():
|
||||
return
|
||||
|
||||
# 1. Discover UDFs (User Defined Fields)
|
||||
print("\n--- 1. User Defined Fields (UDFs) ---")
|
||||
|
||||
# Contact UDFs
|
||||
print("\n--- 1. User Defined Fields (UDFs) Definitions ---")
|
||||
try:
|
||||
print("Fetching a sample Contact to inspect UDFs...")
|
||||
contacts = client.search("Contact?$top=1")
|
||||
if contacts:
|
||||
# Inspect keys of first result
|
||||
first_contact = contacts[0]
|
||||
# Try to find ID
|
||||
c_id = first_contact.get('ContactId') or first_contact.get('PrimaryKey')
|
||||
# Fetch Metadata about UDFs to get Labels
|
||||
udf_info = client._get("UserDefinedFieldInfo")
|
||||
if udf_info:
|
||||
print(f"Found {len(udf_info)} UDF definitions.")
|
||||
|
||||
if c_id:
|
||||
c = client.get_contact(c_id)
|
||||
udfs = c.get("UserDefinedFields", {})
|
||||
print(f"Found {len(udfs)} UDFs on Contact {c_id}:")
|
||||
for k, v in udfs.items():
|
||||
print(f" - Key (ProgId): {k} | Value: {v}")
|
||||
else:
|
||||
print(f"⚠️ Could not find ID in search result: {first_contact.keys()}")
|
||||
else:
|
||||
print("⚠️ No contacts found. Cannot inspect Contact UDFs.")
|
||||
|
||||
print("\nFetching a sample Person to inspect UDFs...")
|
||||
persons = client.search("Person?$top=1")
|
||||
if persons:
|
||||
first_person = persons[0]
|
||||
p_id = first_person.get('PersonId') or first_person.get('PrimaryKey')
|
||||
# Filter for Contact and Person UDFs
|
||||
contact_udfs = [u for u in udf_info if u['UDTargetEntityName'] == 'Contact']
|
||||
person_udfs = [u for u in udf_info if u['UDTargetEntityName'] == 'Person']
|
||||
|
||||
if p_id:
|
||||
p = client.get_person(p_id)
|
||||
udfs = p.get("UserDefinedFields", {})
|
||||
print(f"Found {len(udfs)} UDFs on Person {p_id}:")
|
||||
for k, v in udfs.items():
|
||||
print(f" - Key (ProgId): {k} | Value: {v}")
|
||||
else:
|
||||
print(f"⚠️ Could not find ID in search result: {first_person.keys()}")
|
||||
print(f"\n--- CONTACT UDFs ({len(contact_udfs)}) ---")
|
||||
for u in contact_udfs:
|
||||
print(f" - Label: '{u['FieldLabel']}' | ProgId: '{u['ProgId']}' | Type: {u['UDFieldType']}")
|
||||
|
||||
print(f"\n--- PERSON UDFs ({len(person_udfs)}) ---")
|
||||
for u in person_udfs:
|
||||
print(f" - Label: '{u['FieldLabel']}' | ProgId: '{u['ProgId']}' | Type: {u['UDFieldType']}")
|
||||
|
||||
else:
|
||||
print("⚠️ No persons found. Cannot inspect Person UDFs.")
|
||||
print("❌ Could not fetch UserDefinedFieldInfo.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error inspecting UDFs: {e}")
|
||||
print(f"❌ Error fetching UDF Info: {e}")
|
||||
|
||||
# 2. Discover Lists (MDO Providers)
|
||||
print("\n--- 2. MDO Lists (Positions, Business/Industry) ---")
|
||||
print("\n--- 2. Sample Data Inspection ---")
|
||||
|
||||
lists_to_check = ["position", "business"]
|
||||
|
||||
|
||||
@@ -3,11 +3,18 @@ from dotenv import load_dotenv
|
||||
import urllib.parse
|
||||
|
||||
def generate_url():
|
||||
load_dotenv(dotenv_path="/home/node/clawd/.env")
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
import urllib.parse
|
||||
|
||||
client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
|
||||
redirect_uri = "https://devnet-tools.superoffice.com/openid/callback" # Das muss im Portal so registriert sein
|
||||
state = "12345"
|
||||
# Try current and parent dir
|
||||
load_dotenv()
|
||||
load_dotenv(dotenv_path="../.env")
|
||||
|
||||
client_id = os.getenv("SO_CLIENT_ID")
|
||||
# MUST match what is registered in the SuperOffice Developer Portal for this Client ID
|
||||
redirect_uri = os.getenv("SO_REDIRECT_URI", "http://localhost")
|
||||
state = "roboplanet_prod_init"
|
||||
|
||||
if not client_id:
|
||||
print("Fehler: Keine SO_CLIENT_ID in der .env gefunden!")
|
||||
@@ -17,19 +24,24 @@ def generate_url():
|
||||
"client_id": client_id,
|
||||
"redirect_uri": redirect_uri,
|
||||
"response_type": "code",
|
||||
"scope": "openid offline_access", # Wichtig für Refresh Token
|
||||
"scope": "openid", # Basic scope
|
||||
"state": state
|
||||
}
|
||||
|
||||
base_url = "https://sod.superoffice.com/login/common/oauth/authorize"
|
||||
# Use online.superoffice.com for Production
|
||||
base_url = "https://online.superoffice.com/login/common/oauth/authorize"
|
||||
auth_url = f"{base_url}?{urllib.parse.urlencode(params)}"
|
||||
|
||||
print("\nBitte öffne diese URL im Browser:")
|
||||
print("\n--- PRODUKTIV-AUTH-LINK ---")
|
||||
print(f"Mandant: {os.getenv('SO_CONTEXT_IDENTIFIER', 'Cust26720')}")
|
||||
print(f"Client ID: {client_id[:5]}...")
|
||||
print("-" * 60)
|
||||
print(auth_url)
|
||||
print("-" * 60)
|
||||
print("\nNach dem Login wirst du auf eine Seite weitergeleitet, die nicht lädt (localhost).")
|
||||
print("Kopiere die URL aus der Adresszeile und gib mir den Wert nach '?code='.")
|
||||
print("\n1. Öffne diesen Link im Browser.")
|
||||
print("2. Logge dich in deinen ECHTEN Mandanten ein (Cust26720).")
|
||||
print("3. Nach der Bestätigung kopiere die URL aus der Adresszeile.")
|
||||
print("4. Paste die URL hier in den Chat.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
generate_url()
|
||||
|
||||
@@ -28,7 +28,11 @@ class AuthHandler:
|
||||
return self._refresh_access_token()
|
||||
|
||||
def _refresh_access_token(self):
|
||||
url = f"https://{self.env}.superoffice.com/login/common/oauth/tokens"
|
||||
# OAuth token endpoint is ALWAYS online.superoffice.com for production,
|
||||
# or sod.superoffice.com for sandbox.
|
||||
token_domain = "online.superoffice.com" if "online" in self.env.lower() else "sod.superoffice.com"
|
||||
url = f"https://{token_domain}/login/common/oauth/tokens"
|
||||
|
||||
data = {
|
||||
"grant_type": "refresh_token",
|
||||
"client_id": self.client_id,
|
||||
@@ -38,12 +42,12 @@ class AuthHandler:
|
||||
}
|
||||
try:
|
||||
resp = requests.post(url, data=data)
|
||||
resp.raise_for_status()
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"❌ Token Refresh Failed (Status {resp.status_code}): {resp.text}")
|
||||
return None
|
||||
|
||||
logger.info("Access token refreshed successfully.")
|
||||
return resp.json().get("access_token")
|
||||
except requests.exceptions.HTTPError as e:
|
||||
logger.error(f"❌ Token Refresh Error (Status: {e.response.status_code}): {e.response.text}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Connection Error during token refresh: {e}")
|
||||
return None
|
||||
@@ -53,7 +57,8 @@ class SuperOfficeClient:
|
||||
self.auth_handler = auth_handler
|
||||
self.env = os.getenv("SO_ENVIRONMENT", "sod")
|
||||
self.cust_id = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774")
|
||||
self.base_url = f"https://app-{self.env}.superoffice.com/{self.cust_id}/api/v1"
|
||||
# API base URL: online3.superoffice.com is valid here
|
||||
self.base_url = f"https://{self.env}.superoffice.com/{self.cust_id}/api/v1"
|
||||
self.access_token = self.auth_handler.get_access_token()
|
||||
if not self.access_token:
|
||||
raise Exception("Failed to obtain access token during SuperOfficeClient initialization.")
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import sys
|
||||
import os
|
||||
from superoffice_client import SuperOfficeClient
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Configuration
|
||||
WEBHOOK_NAME = "Gemini Connector Hook"
|
||||
TARGET_URL = "https://floke-ai.duckdns.org/connector/webhook?token=changeme" # Token match .env
|
||||
WEBHOOK_NAME = "Gemini Connector Production"
|
||||
TARGET_URL = f"https://floke-ai.duckdns.org/connector/webhook?token={os.getenv('WEBHOOK_TOKEN', 'changeme')}"
|
||||
EVENTS = [
|
||||
"contact.created",
|
||||
"contact.changed",
|
||||
@@ -13,6 +14,7 @@ EVENTS = [
|
||||
]
|
||||
|
||||
def register():
|
||||
load_dotenv("../.env")
|
||||
print("🚀 Initializing SuperOffice Client...")
|
||||
try:
|
||||
client = SuperOfficeClient()
|
||||
|
||||
@@ -43,7 +43,11 @@ class SuperOfficeClient:
|
||||
|
||||
def _refresh_access_token(self):
|
||||
"""Refreshes and returns a new access token."""
|
||||
url = "https://online.superoffice.com/login/common/oauth/tokens"
|
||||
# OAuth token endpoint is ALWAYS online.superoffice.com for production,
|
||||
# or sod.superoffice.com for sandbox.
|
||||
token_domain = "online.superoffice.com" if "online" in self.env.lower() else "sod.superoffice.com"
|
||||
url = f"https://{token_domain}/login/common/oauth/tokens"
|
||||
|
||||
logger.debug(f"DEBUG: Refresh URL: '{url}' (Env: '{self.env}')")
|
||||
|
||||
data = {
|
||||
@@ -56,11 +60,18 @@ class SuperOfficeClient:
|
||||
|
||||
try:
|
||||
resp = requests.post(url, data=data)
|
||||
|
||||
# Catch non-JSON responses early
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"❌ Token Refresh Failed (Status {resp.status_code})")
|
||||
logger.error(f"Response Body: {resp.text[:500]}")
|
||||
return None
|
||||
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("access_token")
|
||||
except requests.exceptions.HTTPError as e:
|
||||
logger.error(f"❌ Token Refresh Error (Status: {e.response.status_code}): {e.response.text}")
|
||||
logger.debug(f"Response Headers: {e.response.headers}")
|
||||
except requests.exceptions.JSONDecodeError:
|
||||
logger.error(f"❌ Token Refresh Error: Received non-JSON response from {url}")
|
||||
logger.debug(f"Raw Response: {resp.text[:500]}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Connection Error during token refresh: {e}")
|
||||
|
||||
@@ -3,6 +3,7 @@ import logging
|
||||
import os
|
||||
import requests
|
||||
import json
|
||||
from datetime import datetime
|
||||
from queue_manager import JobQueue
|
||||
from superoffice_client import SuperOfficeClient
|
||||
from config import settings
|
||||
@@ -21,16 +22,15 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
"""
|
||||
Core logic for processing a single job.
|
||||
"""
|
||||
logger.info(f"Processing Job {job['id']} ({job['event_type']})")
|
||||
logger.info(f"--- [WORKER v1.8] Processing Job {job['id']} ({job['event_type']}) ---")
|
||||
payload = job['payload']
|
||||
event_low = job['event_type'].lower()
|
||||
|
||||
# 1. Extract IDs Early (Crucial for logging and logic)
|
||||
# 1. Extract IDs Early
|
||||
person_id = None
|
||||
contact_id = None
|
||||
job_title = payload.get("JobTitle")
|
||||
|
||||
# Try getting IDs from FieldValues (more reliable for Webhooks)
|
||||
field_values = payload.get("FieldValues", {})
|
||||
if "person_id" in field_values:
|
||||
person_id = int(field_values["person_id"])
|
||||
@@ -39,7 +39,6 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
if "title" in field_values and not job_title:
|
||||
job_title = field_values["title"]
|
||||
|
||||
# Fallback to older payload structure if not found
|
||||
if not person_id:
|
||||
if "PersonId" in payload:
|
||||
person_id = int(payload["PersonId"])
|
||||
@@ -53,7 +52,6 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
contact_id = int(payload["PrimaryKey"])
|
||||
|
||||
# Fallback/Deep Lookup & Fetch JobTitle if missing
|
||||
# Only fetch if we are missing critical info AND have a person_id
|
||||
if person_id and (not job_title or not contact_id):
|
||||
try:
|
||||
person_details = so_client.get_person(
|
||||
@@ -63,15 +61,12 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
if person_details:
|
||||
if not job_title:
|
||||
job_title = person_details.get("JobTitle") or person_details.get("Title")
|
||||
|
||||
# Robust extraction of ContactId
|
||||
if not contact_id:
|
||||
contact_obj = person_details.get("Contact")
|
||||
if contact_obj and isinstance(contact_obj, dict):
|
||||
contact_id = contact_obj.get("ContactId")
|
||||
elif "ContactId" in person_details: # Sometimes flat
|
||||
elif "ContactId" in person_details:
|
||||
contact_id = person_details.get("ContactId")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
|
||||
|
||||
@@ -80,61 +75,17 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
logger.info(f"Skipping irrelevant event type: {job['event_type']}")
|
||||
return "SUCCESS"
|
||||
|
||||
changes = [c.lower() for c in payload.get("Changes", [])]
|
||||
if changes:
|
||||
relevant_contact = ["name", "department", "urladdress", "number1", "number2", "userdefinedfields"]
|
||||
if settings.UDF_VERTICAL:
|
||||
relevant_contact.append(settings.UDF_VERTICAL.lower())
|
||||
|
||||
relevant_person = ["jobtitle", "position", "title", "userdefinedfields", "person_id"]
|
||||
technical_fields = ["updated", "updated_associate_id", "contact_id", "person_id", "registered", "registered_associate_id"]
|
||||
actual_changes = [c for c in changes if c not in technical_fields]
|
||||
|
||||
is_relevant = False
|
||||
|
||||
if "contact" in event_low:
|
||||
logger.info(f"Checking relevance for Contact {contact_id or 'Unknown'}. Changes: {actual_changes}")
|
||||
if any(f in actual_changes for f in relevant_contact):
|
||||
is_relevant = True
|
||||
elif "urls" in actual_changes:
|
||||
is_relevant = True
|
||||
|
||||
if "person" in event_low:
|
||||
logger.info(f"Checking relevance for Person {person_id or 'Unknown'}. Changes: {actual_changes}")
|
||||
if any(f in actual_changes for f in relevant_person):
|
||||
is_relevant = True
|
||||
|
||||
if not is_relevant:
|
||||
logger.info(f"Skipping technical/irrelevant changes: {changes}")
|
||||
return "SUCCESS"
|
||||
else:
|
||||
logger.info("Change is deemed RELEVANT. Proceeding...")
|
||||
|
||||
if not contact_id:
|
||||
raise ValueError(f"Could not identify ContactId in payload: {payload}")
|
||||
|
||||
logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}")
|
||||
|
||||
# --- Cascading Logic ---
|
||||
if "contact" in event_low and not person_id:
|
||||
logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.")
|
||||
try:
|
||||
persons = so_client.search(f"Person?$select=PersonId&$filter=contact/contactId eq {contact_id}")
|
||||
if persons:
|
||||
q = JobQueue()
|
||||
for p in persons:
|
||||
p_id = p.get("PersonId")
|
||||
if p_id:
|
||||
logger.info(f"Cascading: Enqueueing job for Person {p_id}")
|
||||
q.add_job("person.changed", {"PersonId": p_id, "ContactId": contact_id, "Source": "Cascade"})
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to cascade to persons for contact {contact_id}: {e}")
|
||||
|
||||
# 1b. Fetch full contact details for 'Double Truth' check (Master Data Sync)
|
||||
# 1b. Fetch full contact details for 'Double Truth' check
|
||||
crm_name = None
|
||||
crm_website = None
|
||||
crm_industry_name = None
|
||||
contact_details = None
|
||||
campaign_tag = None
|
||||
|
||||
try:
|
||||
contact_details = so_client.get_contact(
|
||||
@@ -146,31 +97,46 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
|
||||
crm_name = contact_details.get("Name")
|
||||
crm_website = contact_details.get("UrlAddress")
|
||||
if not crm_website and "Urls" in contact_details and contact_details["Urls"]:
|
||||
crm_website = contact_details["Urls"][0].get("Value")
|
||||
|
||||
# --- Fetch Person UDFs for Campaign Tag ---
|
||||
if person_id:
|
||||
try:
|
||||
# We fetch the person again specifically for UDFs to ensure we get DisplayTexts
|
||||
person_details = so_client.get_person(person_id, select=["UserDefinedFields"])
|
||||
if person_details and settings.UDF_CAMPAIGN:
|
||||
udfs = person_details.get("UserDefinedFields", {})
|
||||
# SuperOffice REST returns DisplayText for lists as 'ProgID:DisplayText'
|
||||
display_key = f"{settings.UDF_CAMPAIGN}:DisplayText"
|
||||
campaign_tag = udfs.get(display_key)
|
||||
|
||||
if not campaign_tag:
|
||||
# Fallback to manual resolution if DisplayText is missing
|
||||
raw_tag = udfs.get(settings.UDF_CAMPAIGN, "")
|
||||
if raw_tag:
|
||||
campaign_tag = str(raw_tag).strip()
|
||||
|
||||
if campaign_tag:
|
||||
logger.info(f"🎯 CAMPAIGN DETECTED: '{campaign_tag}'")
|
||||
else:
|
||||
logger.info("ℹ️ No Campaign Tag found (Field is empty).")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not fetch campaign tag: {e}")
|
||||
|
||||
if settings.UDF_VERTICAL:
|
||||
udfs = contact_details.get("UserDefinedFields", {})
|
||||
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
|
||||
|
||||
if so_vertical_val:
|
||||
val_str = str(so_vertical_val)
|
||||
if val_str.startswith("[I:"):
|
||||
val_str = val_str.split(":")[1].strip("]")
|
||||
|
||||
val_str = str(so_vertical_val).replace("[I:","").replace("]","")
|
||||
try:
|
||||
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
|
||||
vertical_map_rev = {str(v): k for k, v in vertical_map.items()}
|
||||
if val_str in vertical_map_rev:
|
||||
crm_industry_name = vertical_map_rev[val_str]
|
||||
logger.info(f"Detected CRM Vertical Override: {so_vertical_val} -> {crm_industry_name}")
|
||||
except Exception as ex:
|
||||
logger.error(f"Error mapping vertical ID {val_str}: {ex}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch contact details for {contact_id}: {e}")
|
||||
# Critical failure: Without contact details, we cannot provision correctly.
|
||||
# Raising exception triggers a retry.
|
||||
raise Exception(f"SuperOffice API Failure: {e}")
|
||||
|
||||
# --- 3. Company Explorer Provisioning ---
|
||||
@@ -181,7 +147,8 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
"job_title": job_title,
|
||||
"crm_name": crm_name,
|
||||
"crm_website": crm_website,
|
||||
"crm_industry_name": crm_industry_name
|
||||
"crm_industry_name": crm_industry_name,
|
||||
"campaign_tag": campaign_tag
|
||||
}
|
||||
|
||||
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
|
||||
@@ -189,34 +156,20 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
try:
|
||||
resp = requests.post(ce_url, json=ce_req, auth=ce_auth)
|
||||
if resp.status_code == 404:
|
||||
logger.warning(f"Company Explorer returned 404. Retrying later.")
|
||||
return "RETRY"
|
||||
|
||||
resp.raise_for_status()
|
||||
provisioning_data = resp.json()
|
||||
|
||||
if provisioning_data.get("status") == "processing":
|
||||
logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.")
|
||||
return "RETRY"
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"Company Explorer API failed: {e}")
|
||||
|
||||
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
|
||||
|
||||
# Fetch fresh Contact Data for comparison
|
||||
try:
|
||||
contact_data = so_client.get_contact(contact_id)
|
||||
if not contact_data:
|
||||
logger.error(f"Contact {contact_id} not found in SuperOffice.")
|
||||
return "FAILED"
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch contact {contact_id}: {e}")
|
||||
return "RETRY"
|
||||
|
||||
contact_data = so_client.get_contact(contact_id)
|
||||
if not contact_data: return "FAILED"
|
||||
contact_patch = {}
|
||||
|
||||
if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {}
|
||||
|
||||
# --- A. Vertical Sync ---
|
||||
vertical_name = provisioning_data.get("vertical_name")
|
||||
@@ -226,13 +179,11 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
vertical_id = vertical_map.get(vertical_name)
|
||||
if vertical_id:
|
||||
udf_key = settings.UDF_VERTICAL
|
||||
current_val = contact_data["UserDefinedFields"].get(udf_key, "")
|
||||
if current_val and str(current_val).startswith("[I:"):
|
||||
current_val = str(current_val).split(":")[1].strip("]")
|
||||
|
||||
if str(current_val) != str(vertical_id):
|
||||
logger.info(f"Change detected: Vertical {current_val} -> {vertical_id}")
|
||||
contact_patch.setdefault("UserDefinedFields", {})[udf_key] = str(vertical_id)
|
||||
current_val = contact_data.get("UserDefinedFields", {}).get(udf_key, "")
|
||||
if str(current_val).replace("[I:","").replace("]","") != str(vertical_id):
|
||||
logger.info(f"Change detected: Vertical -> {vertical_id}")
|
||||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||||
contact_patch["UserDefinedFields"][udf_key] = str(vertical_id)
|
||||
except Exception as e:
|
||||
logger.error(f"Vertical sync error: {e}")
|
||||
|
||||
@@ -243,154 +194,92 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
ce_vat = provisioning_data.get("vat_id")
|
||||
|
||||
if ce_city or ce_street or ce_zip:
|
||||
if "Address" not in contact_data or contact_data["Address"] is None:
|
||||
contact_data["Address"] = {"Street": {}, "Postal": {}}
|
||||
|
||||
addr_obj = contact_data["Address"]
|
||||
if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {}
|
||||
if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {}
|
||||
for type_key in ["Postal", "Street"]:
|
||||
cur_addr = (contact_data.get("Address") or {}).get(type_key, {})
|
||||
if ce_city and cur_addr.get("City") != ce_city: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["City"] = ce_city
|
||||
if ce_street and cur_addr.get("Address1") != ce_street: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Address1"] = ce_street
|
||||
if ce_zip and cur_addr.get("Zipcode") != ce_zip: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Zipcode"] = ce_zip
|
||||
|
||||
def update_addr_patch(field_name, new_val, log_name):
|
||||
if new_val:
|
||||
for type_key in ["Postal", "Street"]:
|
||||
cur = addr_obj[type_key].get(field_name, "")
|
||||
if cur != new_val:
|
||||
logger.info(f"Change detected: {type_key} {log_name} '{cur}' -> '{new_val}'")
|
||||
contact_patch.setdefault("Address", {}).setdefault(type_key, {})[field_name] = new_val
|
||||
if ce_vat and contact_data.get("OrgNr") != ce_vat:
|
||||
contact_patch["OrgNr"] = ce_vat
|
||||
|
||||
update_addr_patch("City", ce_city, "City")
|
||||
update_addr_patch("Address1", ce_street, "Street")
|
||||
update_addr_patch("Zipcode", ce_zip, "Zip")
|
||||
|
||||
if ce_vat:
|
||||
current_vat = contact_data.get("OrgNr", "")
|
||||
if current_vat != ce_vat:
|
||||
logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'")
|
||||
contact_patch["OrgNr"] = ce_vat
|
||||
|
||||
# --- C. AI Openers Sync ---
|
||||
# --- C. AI Openers & Summary Sync ---
|
||||
ce_opener = provisioning_data.get("opener")
|
||||
ce_opener_secondary = provisioning_data.get("opener_secondary")
|
||||
ce_summary = provisioning_data.get("summary")
|
||||
|
||||
if ce_opener and ce_opener != "null":
|
||||
current_opener = contact_data["UserDefinedFields"].get(settings.UDF_OPENER, "")
|
||||
if current_opener != ce_opener:
|
||||
logger.info("Change detected: Primary Opener")
|
||||
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER] = ce_opener
|
||||
if ce_opener and ce_opener != "null" and contact_data.get("UserDefinedFields", {}).get(settings.UDF_OPENER) != ce_opener:
|
||||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||||
contact_patch["UserDefinedFields"][settings.UDF_OPENER] = ce_opener
|
||||
if ce_opener_secondary and ce_opener_secondary != "null" and contact_data.get("UserDefinedFields", {}).get(settings.UDF_OPENER_SECONDARY) != ce_opener_secondary:
|
||||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||||
contact_patch["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
|
||||
|
||||
if ce_opener_secondary and ce_opener_secondary != "null":
|
||||
current_opener_sec = contact_data["UserDefinedFields"].get(settings.UDF_OPENER_SECONDARY, "")
|
||||
if current_opener_sec != ce_opener_secondary:
|
||||
logger.info("Change detected: Secondary Opener")
|
||||
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
|
||||
if ce_summary and ce_summary != "null":
|
||||
short_summary = (ce_summary[:132] + "...") if len(ce_summary) > 135 else ce_summary
|
||||
if contact_data.get("UserDefinedFields", {}).get(settings.UDF_SUMMARY) != short_summary:
|
||||
logger.info("Change detected: AI Summary")
|
||||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||||
contact_patch["UserDefinedFields"][settings.UDF_SUMMARY] = short_summary
|
||||
|
||||
# --- D. Apply Updates (Single PATCH Transaction) ---
|
||||
# --- D. Timestamps & Website Sync ---
|
||||
if settings.UDF_LAST_UPDATE:
|
||||
now_so = f"[D:{datetime.now().strftime('%m/%d/%Y %H:%M:%S')}]"
|
||||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||||
contact_patch["UserDefinedFields"][settings.UDF_LAST_UPDATE] = now_so
|
||||
|
||||
ce_website = provisioning_data.get("website")
|
||||
if ce_website and (not contact_data.get("Urls") or settings.ENABLE_WEBSITE_SYNC):
|
||||
current_urls = contact_data.get("Urls") or []
|
||||
if not any(u.get("Value") == ce_website for u in current_urls):
|
||||
logger.info(f"Syncing Website: {ce_website}")
|
||||
if "Urls" not in contact_patch: contact_patch["Urls"] = []
|
||||
contact_patch["Urls"] = [{"Value": ce_website, "Description": "AI Discovered"}] + current_urls
|
||||
|
||||
# --- E. Apply Updates (Single PATCH) ---
|
||||
if contact_patch:
|
||||
logger.info(f"Pushing combined PATCH updates for Contact {contact_id}...")
|
||||
try:
|
||||
so_client.patch_contact(contact_id, contact_patch)
|
||||
logger.info("✅ Contact Update Successful (PATCH).")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update Contact {contact_id}: {e}")
|
||||
raise
|
||||
else:
|
||||
logger.info(f"No changes detected for Contact {contact_id}. Skipping update.")
|
||||
logger.info(f"Pushing combined PATCH for Contact {contact_id}: {list(contact_patch.keys())}")
|
||||
so_client.patch_contact(contact_id, contact_patch)
|
||||
logger.info("✅ Contact Update Successful.")
|
||||
|
||||
# 2d. Sync Person Position (Role) - if Person exists
|
||||
# 2d. Sync Person Position
|
||||
role_name = provisioning_data.get("role_name")
|
||||
if person_id and role_name:
|
||||
try:
|
||||
persona_map = json.loads(settings.PERSONA_MAP_JSON)
|
||||
position_id = persona_map.get(role_name)
|
||||
if position_id:
|
||||
logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}")
|
||||
so_client.update_person_position(person_id, int(position_id))
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing position for Person {person_id}: {e}")
|
||||
logger.error(f"Error syncing position: {e}")
|
||||
|
||||
# 3. Update SuperOffice Texts (Only if person_id is present)
|
||||
if not person_id:
|
||||
logger.info("Sync complete (Company only).")
|
||||
return "SUCCESS"
|
||||
|
||||
texts = provisioning_data.get("texts", {})
|
||||
if not any(texts.values()):
|
||||
logger.info("No texts returned from Matrix yet.")
|
||||
return "SUCCESS"
|
||||
|
||||
udf_update = {}
|
||||
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
|
||||
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
|
||||
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
|
||||
|
||||
if udf_update:
|
||||
try:
|
||||
current_person = so_client.get_person(person_id)
|
||||
current_udfs = current_person.get("UserDefinedFields", {})
|
||||
needs_update = False
|
||||
for key, new_val in udf_update.items():
|
||||
if current_udfs.get(key, "") != new_val:
|
||||
needs_update = True
|
||||
break
|
||||
# 3. Update SuperOffice Texts (Person)
|
||||
if person_id:
|
||||
texts = provisioning_data.get("texts", {})
|
||||
unsubscribe_link = provisioning_data.get("unsubscribe_link")
|
||||
|
||||
udf_update = {}
|
||||
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
|
||||
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
|
||||
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
|
||||
if unsubscribe_link and settings.UDF_UNSUBSCRIBE_LINK:
|
||||
udf_update[settings.UDF_UNSUBSCRIBE_LINK] = unsubscribe_link
|
||||
|
||||
if udf_update:
|
||||
logger.info(f"Applying text update to Person {person_id}.")
|
||||
so_client.update_entity_udfs(person_id, "Person", udf_update)
|
||||
|
||||
# Simulation Trigger: Either texts changed, OR it's a direct manual trigger
|
||||
if needs_update or (person_id and not "Source" in payload):
|
||||
if needs_update:
|
||||
logger.info(f"Applying text update to Person {person_id}.")
|
||||
so_client.update_entity_udfs(person_id, "Person", udf_update)
|
||||
else:
|
||||
logger.info(f"Texts already in sync for Person {person_id}, but triggering simulation.")
|
||||
|
||||
# --- 4. Create Email Simulation Appointment ---
|
||||
try:
|
||||
opener = provisioning_data.get("opener", "")
|
||||
intro = texts.get("intro", "")
|
||||
proof = texts.get("social_proof", "")
|
||||
subject = texts.get("subject", "No Subject")
|
||||
|
||||
salutation = "Hallo"
|
||||
p_data = so_client.get_person(person_id)
|
||||
if p_data:
|
||||
fname = p_data.get("Firstname", "")
|
||||
lname = p_data.get("Lastname", "")
|
||||
if fname or lname:
|
||||
salutation = f"Hallo {fname} {lname}".strip()
|
||||
# --- 4. Create Email Simulation Appointment ---
|
||||
try:
|
||||
opener = provisioning_data.get("opener") or ""
|
||||
intro = texts.get("intro") or ""
|
||||
proof = texts.get("social_proof") or ""
|
||||
subject = texts.get("subject", "No Subject")
|
||||
email_body = f"Betreff: {subject}\n\n{opener}\n\n{intro}\n\n{proof}\n\n(Generated via Gemini Marketing Engine)"
|
||||
so_client.create_appointment(f"KI: {subject}", email_body, contact_id, person_id)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed simulation: {e}")
|
||||
|
||||
cta = (
|
||||
"H\u00e4tten Sie am kommenden Mittwoch gegen 11 Uhr kurz Zeit, f\u00fcr einen kurzen Austausch hierzu?\n"
|
||||
"Gerne k\u00f6nnen Sie auch einen alternativen Termin in meinem Kalender buchen. (bookings Link)"
|
||||
)
|
||||
|
||||
email_body = (
|
||||
f"{salutation},\n\n"
|
||||
f"{opener.strip()}\n\n"
|
||||
f"{intro.strip()}\n\n"
|
||||
f"{cta.strip()}\n\n"
|
||||
f"{proof.strip()}\n\n"
|
||||
"(Generated via Gemini Marketing Engine)"
|
||||
)
|
||||
|
||||
from datetime import datetime
|
||||
now_str = datetime.now().strftime("%H:%M")
|
||||
appt_title = f"[{now_str}] KI: {subject}"
|
||||
|
||||
so_client.create_appointment(
|
||||
subject=appt_title,
|
||||
description=email_body,
|
||||
contact_id=contact_id,
|
||||
person_id=person_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create email simulation appointment: {e}")
|
||||
|
||||
else:
|
||||
logger.info(f"Skipping update for Person {person_id}: Values match.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during Person update: {e}")
|
||||
raise
|
||||
|
||||
logger.info("Job successfully processed.")
|
||||
return "SUCCESS"
|
||||
|
||||
def run_worker():
|
||||
@@ -401,7 +290,7 @@ def run_worker():
|
||||
so_client = SuperOfficeClient()
|
||||
if not so_client.access_token: raise Exception("Auth failed")
|
||||
except Exception as e:
|
||||
logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...")
|
||||
logger.critical(f"Failed to initialize SO Client. Retrying in 30s...")
|
||||
time.sleep(30)
|
||||
|
||||
logger.info("Worker started. Polling queue...")
|
||||
@@ -411,12 +300,9 @@ def run_worker():
|
||||
if job:
|
||||
try:
|
||||
result = process_job(job, so_client)
|
||||
if result == "RETRY":
|
||||
queue.retry_job_later(job['id'], delay_seconds=120, error_msg="CE is processing...")
|
||||
elif result == "FAILED":
|
||||
queue.fail_job(job['id'], "Job failed with FAILED status")
|
||||
else:
|
||||
queue.complete_job(job['id'])
|
||||
if result == "RETRY": queue.retry_job_later(job['id'], delay_seconds=120)
|
||||
elif result == "FAILED": queue.fail_job(job['id'], "Job failed status")
|
||||
else: queue.complete_job(job['id'])
|
||||
except Exception as e:
|
||||
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
|
||||
queue.fail_job(job['id'], str(e))
|
||||
@@ -427,4 +313,4 @@ def run_worker():
|
||||
time.sleep(POLL_INTERVAL)
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_worker()
|
||||
run_worker()
|
||||
Reference in New Issue
Block a user