feat(connector): [31188f42] Finalize production optimizations, filtering, and dashboard enhancements

This commit is contained in:
2026-03-05 09:48:34 +00:00
parent 63a5d84791
commit f36e9902e8
29 changed files with 1178 additions and 89 deletions

View File

@@ -33,54 +33,26 @@ def safe_get_udfs(entity_data):
logger.error(f"Error reading UDFs: {e}")
return {}
def process_job(job, so_client: SuperOfficeClient):
def process_job(job, so_client: SuperOfficeClient, queue: JobQueue):
"""
Core logic for processing a single job.
Returns: (STATUS, MESSAGE)
STATUS: 'SUCCESS', 'SKIPPED', 'RETRY', 'FAILED'
"""
logger.info(f"--- [WORKER v1.8] Processing Job {job['id']} ({job['event_type']}) ---")
logger.info(f"--- [WORKER v1.9.1] Processing Job {job['id']} ({job['event_type']}) ---")
payload = job['payload']
event_low = job['event_type'].lower()
# --- CIRCUIT BREAKER: STOP INFINITE LOOPS ---
# Ignore webhooks triggered by our own API user (Associate 528)
changed_by = payload.get("ChangedByAssociateId")
if changed_by == 528:
msg = f"Skipping Echo: Event was triggered by our own API user (Associate 528)."
logger.info(f"⏭️ {msg}")
return ("SKIPPED", msg)
# --------------------------------------------
# 0. Noise Reduction: Filter irrelevant field changes
if job['event_type'] == 'contact.changed':
changes = payload.get('Changes', [])
changes_lower = [str(c).lower() for c in changes]
# Fields that trigger a re-analysis
relevant_fields = [
'name', 'urladdress', 'urls', 'orgnr', 'userdef_id', 'country_id'
]
# Identify which relevant field triggered the event
hit_fields = [f for f in relevant_fields if f in changes_lower]
if not hit_fields:
msg = f"Skipping 'contact.changed': No relevant fields affected. (Changes: {changes})"
logger.info(f"⏭️ {msg}")
return ("SKIPPED", msg)
else:
logger.info(f"🎯 Relevant change detected in fields: {hit_fields}")
if job['event_type'] == 'person.changed':
changes = payload.get('Changes', [])
changes_lower = [str(c).lower() for c in changes]
relevant_person_fields = [
'jobtitle', 'title', 'position_id', 'userdef_id'
]
hit_fields = [f for f in relevant_person_fields if f in changes_lower]
if not hit_fields:
msg = f"Skipping 'person.changed': No relevant fields affected. (Changes: {changes})"
logger.info(f"⏭️ {msg}")
return ("SKIPPED", msg)
else:
logger.info(f"🎯 Relevant change detected in fields: {hit_fields}")
# 1. Extract IDs Early
person_id = None
contact_id = None
job_title = payload.get("JobTitle")
@@ -143,14 +115,47 @@ def process_job(job, so_client: SuperOfficeClient):
campaign_tag = None
try:
# Request Associate details explicitly
contact_details = so_client.get_contact(
contact_id,
select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr"]
select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr", "Associate"]
)
if not contact_details:
raise ValueError(f"Contact {contact_id} not found (API returned None)")
# ABSOLUTE SAFETY CHECK
if contact_details is None:
raise ValueError(f"SuperOffice API returned None for Contact {contact_id}. Possible timeout or record locked.")
crm_name = contact_details.get("Name")
crm_name = contact_details.get("Name", "Unknown")
# Safely get Associate object
assoc = contact_details.get("Associate") or {}
aid = assoc.get("AssociateId")
aname = assoc.get("Name", "").upper().strip() if assoc.get("Name") else ""
# PERSIST DETAILS TO DASHBOARD early
queue.update_entity_name(job['id'], crm_name, associate_name=aname)
# --- ROBOPLANET FILTER LOGIC ---
# Check both numerical ID and shortname
is_robo = False
if aname in settings.ROBOPLANET_WHITELIST:
is_robo = True
else:
try:
if aid and int(aid) in settings.ROBOPLANET_WHITELIST:
is_robo = True
except (ValueError, TypeError):
pass
if not is_robo:
msg = f"Skipped, Wackler. Contact {contact_id} ('{crm_name}'): Owner '{aname}' is not in Roboplanet whitelist."
logger.info(f"⏭️ {msg}")
return ("SKIPPED", msg)
logger.info(f"✅ Filter Passed: Contact '{crm_name}' belongs to Roboplanet Associate '{aname}'.")
# -------------------------------
crm_website = contact_details.get("UrlAddress")
# --- Fetch Person UDFs for Campaign Tag ---
@@ -361,8 +366,8 @@ def run_worker():
job = queue.get_next_job()
if job:
try:
# process_job now returns a tuple (STATUS, MESSAGE)
status, msg = process_job(job, so_client)
# process_job now takes (job, client, queue)
status, msg = process_job(job, so_client, queue)
if status == "RETRY":
queue.retry_job_later(job['id'], delay_seconds=120, error_msg=msg)