280 lines
11 KiB
Python
280 lines
11 KiB
Python
import time
|
|
import logging
|
|
import os
|
|
import requests
|
|
import json
|
|
from queue_manager import JobQueue
|
|
from superoffice_client import SuperOfficeClient
|
|
|
|
# Setup Logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger("connector-worker")
|
|
|
|
# Config
|
|
COMPANY_EXPLORER_URL = os.getenv("COMPANY_EXPLORER_URL", "http://company-explorer:8000")
|
|
POLL_INTERVAL = 5 # Seconds
|
|
|
|
# UDF Mapping (DEV) - Should be moved to config later
|
|
UDF_MAPPING = {
|
|
"subject": "SuperOffice:5",
|
|
"intro": "SuperOffice:6",
|
|
"social_proof": "SuperOffice:7"
|
|
}
|
|
|
|
def process_job(job, so_client: SuperOfficeClient):
|
|
"""
|
|
Core logic for processing a single job.
|
|
"""
|
|
logger.info(f"Processing Job {job['id']} ({job['event_type']})")
|
|
payload = job['payload']
|
|
event_low = job['event_type'].lower()
|
|
|
|
# 1. Extract IDs from Webhook Payload
|
|
person_id = None
|
|
contact_id = None
|
|
|
|
if "PersonId" in payload:
|
|
person_id = int(payload["PersonId"])
|
|
elif "PrimaryKey" in payload and "person" in event_low:
|
|
person_id = int(payload["PrimaryKey"])
|
|
|
|
if "ContactId" in payload:
|
|
contact_id = int(payload["ContactId"])
|
|
elif "PrimaryKey" in payload and "contact" in event_low:
|
|
contact_id = int(payload["PrimaryKey"])
|
|
|
|
# Fallback/Deep Lookup
|
|
if not contact_id and person_id:
|
|
person_data = so_client.get_person(person_id)
|
|
if person_data and "Contact" in person_data:
|
|
contact_id = person_data["Contact"].get("ContactId")
|
|
|
|
if not contact_id:
|
|
raise ValueError(f"Could not identify ContactId in payload: {payload}")
|
|
|
|
logger.info(f"Target: Person {person_id}, Contact {contact_id}")
|
|
|
|
# --- Cascading Logic ---
|
|
# If a company changes, we want to update all its persons eventually.
|
|
# We do this by adding "person.changed" jobs for each person to the queue.
|
|
if "contact" in event_low and not person_id:
|
|
logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.")
|
|
try:
|
|
persons = so_client.search(f"Person?$filter=contact/contactId eq {contact_id}")
|
|
if persons:
|
|
q = JobQueue()
|
|
for p in persons:
|
|
p_id = p.get("PersonId")
|
|
if p_id:
|
|
logger.info(f"Cascading: Enqueueing job for Person {p_id}")
|
|
q.add_job("person.changed", {"PersonId": p_id, "ContactId": contact_id, "Source": "Cascade"})
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cascade to persons for contact {contact_id}: {e}")
|
|
|
|
# 1b. Fetch full contact details for 'Double Truth' check (Master Data Sync)
|
|
crm_name = None
|
|
crm_website = None
|
|
try:
|
|
contact_details = so_client.get_contact(contact_id)
|
|
if contact_details:
|
|
crm_name = contact_details.get("Name")
|
|
crm_website = contact_details.get("UrlAddress")
|
|
if not crm_website and "Urls" in contact_details and contact_details["Urls"]:
|
|
crm_website = contact_details["Urls"][0].get("Value")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch contact details for {contact_id}: {e}")
|
|
|
|
# 2. Call Company Explorer Provisioning API
|
|
ce_url = f"{COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
|
|
ce_req = {
|
|
"so_contact_id": contact_id,
|
|
"so_person_id": person_id,
|
|
"job_title": payload.get("JobTitle"),
|
|
"crm_name": crm_name,
|
|
"crm_website": crm_website
|
|
}
|
|
|
|
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
|
|
|
|
try:
|
|
resp = requests.post(ce_url, json=ce_req, auth=ce_auth)
|
|
if resp.status_code == 404:
|
|
logger.warning(f"Company Explorer returned 404. Retrying later.")
|
|
return "RETRY"
|
|
|
|
resp.raise_for_status()
|
|
provisioning_data = resp.json()
|
|
|
|
if provisioning_data.get("status") == "processing":
|
|
logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.")
|
|
return "RETRY"
|
|
|
|
if provisioning_data.get("status") == "processing":
|
|
logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.")
|
|
return "RETRY"
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
raise Exception(f"Company Explorer API failed: {e}")
|
|
|
|
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}") # DEBUG
|
|
|
|
# 2b. Sync Vertical to SuperOffice (Company Level)
|
|
vertical_name = provisioning_data.get("vertical_name")
|
|
|
|
if vertical_name:
|
|
# Mappings from README
|
|
VERTICAL_MAP = {
|
|
"Logistics - Warehouse": 23,
|
|
"Healthcare - Hospital": 24,
|
|
"Infrastructure - Transport": 25,
|
|
"Leisure - Indoor Active": 26
|
|
}
|
|
|
|
vertical_id = VERTICAL_MAP.get(vertical_name)
|
|
|
|
if vertical_id:
|
|
logger.info(f"Identified Vertical '{vertical_name}' -> ID {vertical_id}")
|
|
try:
|
|
# Check current value to avoid loops
|
|
current_contact = so_client.get_contact(contact_id)
|
|
current_udfs = current_contact.get("UserDefinedFields", {})
|
|
current_val = current_udfs.get("SuperOffice:5", "")
|
|
|
|
# Normalize SO list ID format (e.g., "[I:26]" -> "26")
|
|
if current_val and current_val.startswith("[I:"):
|
|
current_val = current_val.split(":")[1].strip("]")
|
|
|
|
if str(current_val) != str(vertical_id):
|
|
logger.info(f"Updating Contact {contact_id} Vertical: {current_val} -> {vertical_id}")
|
|
so_client.update_entity_udfs(contact_id, "Contact", {"SuperOffice:5": str(vertical_id)})
|
|
else:
|
|
logger.info(f"Vertical for Contact {contact_id} already in sync ({vertical_id}).")
|
|
except Exception as e:
|
|
logger.error(f"Failed to sync vertical for Contact {contact_id}: {e}")
|
|
else:
|
|
logger.warning(f"Vertical '{vertical_name}' not found in internal mapping.")
|
|
|
|
# 2c. Sync Website (Company Level)
|
|
# TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue)
|
|
"""
|
|
website = provisioning_data.get("website")
|
|
if website and website != "k.A.":
|
|
try:
|
|
# Re-fetch contact to ensure we work on latest version (Optimistic Concurrency)
|
|
contact_data = so_client.get_contact(contact_id)
|
|
current_url = contact_data.get("UrlAddress", "")
|
|
|
|
# Normalize for comparison
|
|
def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else ""
|
|
|
|
if norm(current_url) != norm(website):
|
|
logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}")
|
|
|
|
# Update Urls collection (Rank 1)
|
|
new_urls = []
|
|
if "Urls" in contact_data:
|
|
found = False
|
|
for u in contact_data["Urls"]:
|
|
if u.get("Rank") == 1:
|
|
u["Value"] = website
|
|
found = True
|
|
new_urls.append(u)
|
|
if not found:
|
|
new_urls.append({"Value": website, "Rank": 1, "Description": "Website"})
|
|
contact_data["Urls"] = new_urls
|
|
else:
|
|
contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}]
|
|
|
|
# Also set main field if empty
|
|
if not current_url:
|
|
contact_data["UrlAddress"] = website
|
|
|
|
# Write back full object
|
|
so_client._put(f"Contact/{contact_id}", contact_data)
|
|
else:
|
|
logger.info(f"Website for Contact {contact_id} already in sync.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to sync website for Contact {contact_id}: {e}")
|
|
"""
|
|
|
|
# 3. Update SuperOffice (Only if person_id is present)
|
|
if not person_id:
|
|
logger.info("Sync complete (Company only). No texts to write back.")
|
|
return "SUCCESS"
|
|
|
|
texts = provisioning_data.get("texts", {})
|
|
if not any(texts.values()):
|
|
logger.info("No texts returned from Matrix (yet). Skipping write-back.")
|
|
return "SUCCESS"
|
|
|
|
udf_update = {}
|
|
if texts.get("subject"): udf_update[UDF_MAPPING["subject"]] = texts["subject"]
|
|
if texts.get("intro"): udf_update[UDF_MAPPING["intro"]] = texts["intro"]
|
|
if texts.get("social_proof"): udf_update[UDF_MAPPING["social_proof"]] = texts["social_proof"]
|
|
|
|
if udf_update:
|
|
# Loop Prevention
|
|
try:
|
|
current_person = so_client.get_person(person_id)
|
|
current_udfs = current_person.get("UserDefinedFields", {})
|
|
needs_update = False
|
|
for key, new_val in udf_update.items():
|
|
if current_udfs.get(key, "") != new_val:
|
|
needs_update = True
|
|
break
|
|
|
|
if needs_update:
|
|
logger.info(f"Applying update to Person {person_id} (Changes detected).")
|
|
success = so_client.update_entity_udfs(person_id, "Person", udf_update)
|
|
if not success:
|
|
raise Exception("Failed to update SuperOffice UDFs")
|
|
else:
|
|
logger.info(f"Skipping update for Person {person_id}: Values match (Loop Prevention).")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during pre-update check: {e}")
|
|
raise
|
|
|
|
logger.info("Job successfully processed.")
|
|
return "SUCCESS"
|
|
|
|
def run_worker():
|
|
queue = JobQueue()
|
|
|
|
# Initialize SO Client with retry
|
|
so_client = None
|
|
while not so_client:
|
|
try:
|
|
so_client = SuperOfficeClient()
|
|
except Exception as e:
|
|
logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...")
|
|
time.sleep(30)
|
|
|
|
logger.info("Worker started. Polling queue...")
|
|
|
|
while True:
|
|
try:
|
|
job = queue.get_next_job()
|
|
if job:
|
|
try:
|
|
result = process_job(job, so_client)
|
|
if result == "RETRY":
|
|
queue.retry_job_later(job['id'], delay_seconds=120)
|
|
else:
|
|
queue.complete_job(job['id'])
|
|
except Exception as e:
|
|
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
|
|
queue.fail_job(job['id'], str(e))
|
|
else:
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Worker loop error: {e}")
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
if __name__ == "__main__":
|
|
run_worker() |