[2ff88f42] Finalize SuperOffice Connector: Centralized Config, Added Position/Role Mapping Logic, and Discovery Tools

This commit is contained in:
2026-02-20 07:20:26 +00:00
parent f65df42f55
commit e1071fc73c
6 changed files with 428 additions and 355 deletions

View File

@@ -5,6 +5,7 @@ import requests
import json
from queue_manager import JobQueue
from superoffice_client import SuperOfficeClient
from config import settings
# Setup Logging
logging.basicConfig(
@@ -13,17 +14,9 @@ logging.basicConfig(
)
logger = logging.getLogger("connector-worker")
# Config
COMPANY_EXPLORER_URL = os.getenv("COMPANY_EXPLORER_URL", "http://company-explorer:8000")
# Poll Interval
POLL_INTERVAL = 5 # Seconds
# UDF Mapping (DEV) - Should be moved to config later
UDF_MAPPING = {
"subject": "SuperOffice:5",
"intro": "SuperOffice:6",
"social_proof": "SuperOffice:7"
}
def process_job(job, so_client: SuperOfficeClient):
"""
Core logic for processing a single job.
@@ -59,7 +52,6 @@ def process_job(job, so_client: SuperOfficeClient):
# --- Cascading Logic ---
# If a company changes, we want to update all its persons eventually.
# We do this by adding "person.changed" jobs for each person to the queue.
if "contact" in event_low and not person_id:
logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.")
try:
@@ -88,7 +80,7 @@ def process_job(job, so_client: SuperOfficeClient):
logger.warning(f"Failed to fetch contact details for {contact_id}: {e}")
# 2. Call Company Explorer Provisioning API
ce_url = f"{COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_req = {
"so_contact_id": contact_id,
"so_person_id": person_id,
@@ -97,6 +89,7 @@ def process_job(job, so_client: SuperOfficeClient):
"crm_website": crm_website
}
# Simple Basic Auth for internal API
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
try:
@@ -112,28 +105,22 @@ def process_job(job, so_client: SuperOfficeClient):
logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.")
return "RETRY"
if provisioning_data.get("status") == "processing":
logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.")
return "RETRY"
except requests.exceptions.RequestException as e:
raise Exception(f"Company Explorer API failed: {e}")
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}") # DEBUG
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
# 2b. Sync Vertical to SuperOffice (Company Level)
vertical_name = provisioning_data.get("vertical_name")
if vertical_name:
# Mappings from README
VERTICAL_MAP = {
"Logistics - Warehouse": 23,
"Healthcare - Hospital": 24,
"Infrastructure - Transport": 25,
"Leisure - Indoor Active": 26
}
vertical_id = VERTICAL_MAP.get(vertical_name)
try:
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
except:
vertical_map = {}
logger.error("Failed to parse VERTICAL_MAP_JSON from config.")
vertical_id = vertical_map.get(vertical_name)
if vertical_id:
logger.info(f"Identified Vertical '{vertical_name}' -> ID {vertical_id}")
@@ -141,7 +128,10 @@ def process_job(job, so_client: SuperOfficeClient):
# Check current value to avoid loops
current_contact = so_client.get_contact(contact_id)
current_udfs = current_contact.get("UserDefinedFields", {})
current_val = current_udfs.get("SuperOffice:5", "")
# Use Config UDF key
udf_key = settings.UDF_VERTICAL
current_val = current_udfs.get(udf_key, "")
# Normalize SO list ID format (e.g., "[I:26]" -> "26")
if current_val and current_val.startswith("[I:"):
@@ -149,7 +139,7 @@ def process_job(job, so_client: SuperOfficeClient):
if str(current_val) != str(vertical_id):
logger.info(f"Updating Contact {contact_id} Vertical: {current_val} -> {vertical_id}")
so_client.update_entity_udfs(contact_id, "Contact", {"SuperOffice:5": str(vertical_id)})
so_client.update_entity_udfs(contact_id, "Contact", {udf_key: str(vertical_id)})
else:
logger.info(f"Vertical for Contact {contact_id} already in sync ({vertical_id}).")
except Exception as e:
@@ -159,49 +149,66 @@ def process_job(job, so_client: SuperOfficeClient):
# 2c. Sync Website (Company Level)
# TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue)
"""
website = provisioning_data.get("website")
if website and website != "k.A.":
try:
# Re-fetch contact to ensure we work on latest version (Optimistic Concurrency)
contact_data = so_client.get_contact(contact_id)
current_url = contact_data.get("UrlAddress", "")
# Normalize for comparison
def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else ""
if norm(current_url) != norm(website):
logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}")
# Re-enable via config if needed
if settings.ENABLE_WEBSITE_SYNC:
website = provisioning_data.get("website")
if website and website != "k.A.":
try:
contact_data = so_client.get_contact(contact_id)
current_url = contact_data.get("UrlAddress", "")
# Update Urls collection (Rank 1)
new_urls = []
if "Urls" in contact_data:
found = False
for u in contact_data["Urls"]:
if u.get("Rank") == 1:
u["Value"] = website
found = True
new_urls.append(u)
if not found:
new_urls.append({"Value": website, "Rank": 1, "Description": "Website"})
contact_data["Urls"] = new_urls
def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else ""
if norm(current_url) != norm(website):
logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}")
# Update Urls collection (Rank 1)
new_urls = []
if "Urls" in contact_data:
found = False
for u in contact_data["Urls"]:
if u.get("Rank") == 1:
u["Value"] = website
found = True
new_urls.append(u)
if not found:
new_urls.append({"Value": website, "Rank": 1, "Description": "Website"})
contact_data["Urls"] = new_urls
else:
contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}]
if not current_url:
contact_data["UrlAddress"] = website
so_client._put(f"Contact/{contact_id}", contact_data)
else:
contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}]
# Also set main field if empty
if not current_url:
contact_data["UrlAddress"] = website
logger.info(f"Website for Contact {contact_id} already in sync.")
except Exception as e:
logger.error(f"Failed to sync website for Contact {contact_id}: {e}")
# Write back full object
so_client._put(f"Contact/{contact_id}", contact_data)
else:
logger.info(f"Website for Contact {contact_id} already in sync.")
except Exception as e:
logger.error(f"Failed to sync website for Contact {contact_id}: {e}")
"""
# 2d. Sync Person Position (Role) - if Person exists
role_name = provisioning_data.get("role_name")
if person_id and role_name:
try:
persona_map = json.loads(settings.PERSONA_MAP_JSON)
except:
persona_map = {}
logger.error("Failed to parse PERSONA_MAP_JSON from config.")
position_id = persona_map.get(role_name)
if position_id:
logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}")
try:
success = so_client.update_person_position(person_id, int(position_id))
if not success:
logger.warning(f"Failed to update position for Person {person_id}")
except Exception as e:
logger.error(f"Error syncing position for Person {person_id}: {e}")
else:
logger.info(f"Role '{role_name}' has no mapped Position ID in config. Skipping update.")
# 3. Update SuperOffice (Only if person_id is present)
# 3. Update SuperOffice Texts (Only if person_id is present)
if not person_id:
logger.info("Sync complete (Company only). No texts to write back.")
return "SUCCESS"
@@ -212,9 +219,9 @@ def process_job(job, so_client: SuperOfficeClient):
return "SUCCESS"
udf_update = {}
if texts.get("subject"): udf_update[UDF_MAPPING["subject"]] = texts["subject"]
if texts.get("intro"): udf_update[UDF_MAPPING["intro"]] = texts["intro"]
if texts.get("social_proof"): udf_update[UDF_MAPPING["social_proof"]] = texts["social_proof"]
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
if udf_update:
# Loop Prevention
@@ -250,6 +257,8 @@ def run_worker():
while not so_client:
try:
so_client = SuperOfficeClient()
if not so_client.access_token: # Check if auth worked
raise Exception("Auth failed")
except Exception as e:
logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...")
time.sleep(30)