This commit addresses the issue of duplicate jobs being created by the SuperOffice connector. The root cause was identified as a race condition where SuperOffice would send multiple webhooks in quick succession for the same entity, leading to multiple identical jobs in the queue. The solution involves several layers of improvement: 1. **Ingress De-duplication:** The now checks for existing jobs for the same entity *before* adding a new job to the queue. This is the primary fix and prevents duplicates at the source. 2. **DB Schema Enhancement:** The table schema in was extended with an column to allow for reliable and efficient checking of duplicate entities. 3. **Improved Logging:** The log messages in for job retries (e.g., when waiting for the Company Explorer) have been made more descriptive to avoid confusion and false alarms.
359 lines
16 KiB
Python
359 lines
16 KiB
Python
import time
|
|
import logging
|
|
import os
|
|
import requests
|
|
import json
|
|
from datetime import datetime
|
|
from queue_manager import JobQueue
|
|
from superoffice_client import SuperOfficeClient, ContactNotFoundException, SuperOfficeAuthenticationError
|
|
from config import settings
|
|
|
|
# Setup Logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger("connector-worker")
|
|
|
|
# Poll Interval
|
|
POLL_INTERVAL = 5 # Seconds
|
|
|
|
def safe_get_udfs(entity_data):
|
|
"""
|
|
Safely retrieves UserDefinedFields from an entity dictionary.
|
|
Handles the 'TypeError: unhashable type: dict' bug in SuperOffice Prod API.
|
|
"""
|
|
if not entity_data: return {}
|
|
try:
|
|
return entity_data.get("UserDefinedFields", {})
|
|
except TypeError:
|
|
logger.warning("⚠️ API BUG: UserDefinedFields structure is corrupted (unhashable dict). Treating as empty.")
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"Error reading UDFs: {e}")
|
|
return {}
|
|
|
|
def clean_text_for_so(text, limit=200):
|
|
"""Clean and truncate text for SuperOffice UDF compatibility."""
|
|
if not text or text == "null": return ""
|
|
# Strip whitespace and truncate to safe limit
|
|
return str(text).strip()[:limit]
|
|
|
|
def run_worker():
|
|
queue = JobQueue()
|
|
so_client = None
|
|
self_associate_id = None
|
|
|
|
while not so_client:
|
|
try:
|
|
so_client = SuperOfficeClient()
|
|
if not so_client.access_token: raise Exception("Auth failed")
|
|
|
|
# Dynamic ID Fetch for Echo Prevention
|
|
try:
|
|
me = so_client._get("Associate/Me")
|
|
if me:
|
|
self_associate_id = me.get("AssociateId")
|
|
logger.info(f"✅ Worker Identity Confirmed: Associate ID {self_associate_id}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not fetch own Associate ID: {e}. Echo prevention might be limited to ID 528.")
|
|
self_associate_id = 528 # Fallback
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Failed to initialize SO Client. Retrying in 30s...")
|
|
time.sleep(30)
|
|
|
|
logger.info("Worker started. Polling queue...")
|
|
while True:
|
|
try:
|
|
job = queue.get_next_job()
|
|
if job:
|
|
try:
|
|
# Pass self_id to process_job
|
|
job['self_associate_id'] = self_associate_id
|
|
|
|
status, msg = process_job(job, so_client, queue)
|
|
|
|
if status == "RETRY":
|
|
queue.retry_job_later(job['id'], job['retry_count'], delay_seconds=120, error_msg=msg)
|
|
elif status == "FAILED":
|
|
queue.fail_job(job['id'], msg or "Job failed status")
|
|
elif status == "SKIPPED":
|
|
queue.skip_job(job['id'], msg or "Skipped")
|
|
elif status == "DELETED":
|
|
queue.mark_as_deleted(job['id'], msg or "Deleted in SuperOffice")
|
|
else:
|
|
queue.complete_job(job['id'])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
|
|
queue.fail_job(job['id'], str(e))
|
|
else:
|
|
time.sleep(POLL_INTERVAL)
|
|
except Exception as e:
|
|
logger.error(f"Worker loop error: {e}")
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
def process_job(job, so_client: SuperOfficeClient, queue: JobQueue):
|
|
"""
|
|
Core logic for processing a single job.
|
|
Returns: (STATUS, MESSAGE)
|
|
STATUS: 'SUCCESS', 'SKIPPED', 'DELETED', 'RETRY', 'FAILED'
|
|
"""
|
|
logger.info(f"--- [WORKER v2.1.1 - FULL LOGIC RESTORED] Processing Job {job['id']} ({job['event_type']}) ---")
|
|
payload = job['payload']
|
|
event_low = job['event_type'].lower()
|
|
|
|
# --- Strict Whitelist Filter ---
|
|
# A job is only processed if the webhook indicates a change to a relevant field.
|
|
# This is the primary defense against noise and echoes.
|
|
|
|
changes = [c.lower() for c in payload.get("Changes", [])]
|
|
|
|
# Define fields that are significant enough to trigger a full re-assessment.
|
|
trigger_fields = ["name", "urladdress", "urls"]
|
|
|
|
# The ProgID for the Vertical UDF. A change to this field MUST trigger a resync.
|
|
vertical_udf_key = settings.UDF_VERTICAL.lower() if settings.UDF_VERTICAL else "superoffice:83"
|
|
|
|
# Check if any of the trigger fields or the specific vertical UDF is in the changes list.
|
|
has_trigger_field = any(f in changes for f in trigger_fields) or vertical_udf_key in changes
|
|
|
|
# For '...created' events, we always process. For all other events, we check the whitelist.
|
|
if "created" not in event_low and not has_trigger_field:
|
|
msg = f"⏭️ Skipping event '{event_low}': No relevant field changes in {changes}."
|
|
logger.info(msg)
|
|
return ("SKIPPED", msg)
|
|
|
|
# 0. ID Extraction & Early Exit for irrelevant jobs
|
|
person_id = None
|
|
contact_id = None
|
|
job_title = payload.get("JobTitle")
|
|
|
|
field_values = payload.get("FieldValues", {})
|
|
if "person_id" in field_values:
|
|
person_id = int(field_values["person_id"])
|
|
if "contact_id" in field_values:
|
|
contact_id = int(field_values["contact_id"])
|
|
if "title" in field_values and not job_title:
|
|
job_title = field_values["title"]
|
|
|
|
if not person_id:
|
|
if "PersonId" in payload:
|
|
person_id = int(payload["PersonId"])
|
|
elif "PrimaryKey" in payload and "person" in event_low:
|
|
person_id = int(payload["PrimaryKey"])
|
|
|
|
if not contact_id:
|
|
if "ContactId" in payload:
|
|
contact_id = int(payload["ContactId"])
|
|
elif "PrimaryKey" in payload and "contact" in event_low:
|
|
contact_id = int(payload["PrimaryKey"])
|
|
|
|
if not person_id and not contact_id:
|
|
msg = f"Skipping job: No ContactId or PersonId identified."
|
|
logger.warning(msg)
|
|
return ("SKIPPED", msg)
|
|
|
|
# Fallback Lookup
|
|
if person_id and (not job_title or not contact_id):
|
|
try:
|
|
person_details = so_client.get_person(person_id, select=["JobTitle", "Title", "Contact/ContactId"])
|
|
if person_details:
|
|
if not job_title: job_title = person_details.get("JobTitle") or person_details.get("Title")
|
|
if not contact_id:
|
|
contact_obj = person_details.get("Contact")
|
|
if contact_obj and isinstance(contact_obj, dict): contact_id = contact_obj.get("ContactId")
|
|
except ContactNotFoundException:
|
|
return ("DELETED", f"Person {person_id} not found.")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
|
|
|
|
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
|
|
return ("SKIPPED", f"Irrelevant event type: {job['event_type']}")
|
|
|
|
if not contact_id: raise ValueError(f"No ContactId found.")
|
|
|
|
logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}")
|
|
|
|
# 1b. Fetch full contact details
|
|
crm_name, crm_website, crm_industry_name, contact_details, campaign_tag = None, None, None, None, None
|
|
|
|
try:
|
|
contact_details = so_client.get_contact(contact_id, select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr", "Associate"])
|
|
crm_name = contact_details.get("Name", "Unknown")
|
|
assoc = contact_details.get("Associate") or {}
|
|
aname = assoc.get("Name", "").upper().strip()
|
|
queue.update_entity_name(job['id'], crm_name, associate_name=aname)
|
|
|
|
# --- DE-DUPLICATION SHIELD (Added March 2026) ---
|
|
if "contact.created" in event_low:
|
|
if queue.check_for_recent_duplicate(crm_name, job['id']):
|
|
msg = f"Duplicate 'contact.created' event for '{crm_name}'. This job will be skipped."
|
|
logger.info(f"🛡️ {msg}")
|
|
return ("SKIPPED", msg)
|
|
|
|
# ROBOPLANET FILTER
|
|
is_robo = False
|
|
if aname in settings.ROBOPLANET_WHITELIST:
|
|
is_robo = True
|
|
else:
|
|
try:
|
|
aid = assoc.get("AssociateId")
|
|
if aid and int(aid) in settings.ROBOPLANET_WHITELIST:
|
|
is_robo = True
|
|
except (ValueError, TypeError): pass
|
|
|
|
if not is_robo:
|
|
msg = f"Skipped, Wackler. Contact {contact_id} ('{crm_name}'): Owner '{aname}' is not in Roboplanet whitelist."
|
|
logger.info(f"⏭️ {msg}")
|
|
return ("SKIPPED", msg)
|
|
|
|
crm_website = contact_details.get("UrlAddress")
|
|
|
|
# Campaign Tag
|
|
if person_id:
|
|
try:
|
|
person_details = so_client.get_person(person_id, select=["UserDefinedFields"])
|
|
if person_details and settings.UDF_CAMPAIGN:
|
|
udfs = safe_get_udfs(person_details)
|
|
campaign_tag = udfs.get(f"{settings.UDF_CAMPAIGN}:DisplayText") or udfs.get(settings.UDF_CAMPAIGN)
|
|
except Exception: pass
|
|
|
|
# Current Vertical
|
|
if settings.UDF_VERTICAL:
|
|
udfs = safe_get_udfs(contact_details)
|
|
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
|
|
if so_vertical_val:
|
|
val_str = str(so_vertical_val).replace("[I:" ,"").replace("]","")
|
|
try:
|
|
v_map = json.loads(settings.VERTICAL_MAP_JSON)
|
|
crm_industry_name = {str(v): k for k, v in v_map.items()}.get(val_str)
|
|
except Exception: pass
|
|
|
|
except ContactNotFoundException:
|
|
return ("DELETED", f"Contact {contact_id} not found.")
|
|
except Exception as e:
|
|
raise Exception(f"SuperOffice API Failure: {e}")
|
|
|
|
# --- 3. Company Explorer Provisioning ---
|
|
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
|
|
ce_req = {"so_contact_id": contact_id, "so_person_id": person_id, "job_title": job_title, "crm_name": crm_name, "crm_website": crm_website, "crm_industry_name": crm_industry_name, "campaign_tag": campaign_tag}
|
|
|
|
try:
|
|
auth_tuple = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
|
|
resp = requests.post(ce_url, json=ce_req, auth=auth_tuple)
|
|
if resp.status_code == 404:
|
|
msg = f"Company Explorer API returned 404 for '{crm_name}'"
|
|
logger.warning(msg)
|
|
return ("RETRY", msg)
|
|
resp.raise_for_status()
|
|
provisioning_data = resp.json()
|
|
if provisioning_data.get("status") == "processing":
|
|
msg = f"Company Explorer is still processing '{crm_name}'. This is expected for new companies. Will retry."
|
|
logger.info(msg)
|
|
return ("RETRY", msg)
|
|
except Exception as e:
|
|
raise Exception(f"Company Explorer API failed: {e}")
|
|
|
|
# Fetch fresh Contact for comparison
|
|
contact_data = so_client.get_contact(contact_id)
|
|
if not contact_data: return ("SKIPPED", "Contact deleted post-analysis")
|
|
current_udfs = safe_get_udfs(contact_data)
|
|
contact_patch = {}
|
|
|
|
# --- A. Vertical Sync ---
|
|
vertical_name = provisioning_data.get("vertical_name")
|
|
if vertical_name:
|
|
v_map = json.loads(settings.VERTICAL_MAP_JSON)
|
|
vertical_id = v_map.get(vertical_name)
|
|
if vertical_id:
|
|
current_val = str(current_udfs.get(settings.UDF_VERTICAL, "")).replace("[I:" ,"").replace("]","")
|
|
if current_val != str(vertical_id):
|
|
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_VERTICAL] = str(vertical_id)
|
|
|
|
# --- B. Address & VAT Sync ---
|
|
ce_city, ce_street, ce_zip, ce_vat = provisioning_data.get("address_city"), provisioning_data.get("address_street"), provisioning_data.get("address_zip"), provisioning_data.get("vat_id")
|
|
if ce_city or ce_street or ce_zip:
|
|
for type_key in ["Postal", "Street"]:
|
|
cur_addr = (contact_data.get("Address") or {}).get(type_key, {})
|
|
if ce_city and cur_addr.get("City") != ce_city: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["City"] = ce_city
|
|
if ce_street and cur_addr.get("Address1") != ce_street: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Address1"] = ce_street
|
|
if ce_zip and cur_addr.get("Zipcode") != ce_zip: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Zipcode"] = ce_zip
|
|
|
|
if ce_vat and contact_data.get("OrgNr") != ce_vat:
|
|
contact_patch["OrgNr"] = ce_vat
|
|
|
|
# --- C. AI Openers & Summary Sync ---
|
|
p_opener = clean_text_for_so(provisioning_data.get("opener"), 200)
|
|
s_opener = clean_text_for_so(provisioning_data.get("opener_secondary"), 200)
|
|
ai_sum = clean_text_for_so(provisioning_data.get("summary"), 132)
|
|
|
|
if p_opener and current_udfs.get(settings.UDF_OPENER) != p_opener:
|
|
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER] = p_opener
|
|
|
|
if s_opener and current_udfs.get(settings.UDF_OPENER_SECONDARY) != s_opener:
|
|
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER_SECONDARY] = s_opener
|
|
|
|
if ai_sum and current_udfs.get(settings.UDF_SUMMARY) != ai_sum:
|
|
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_SUMMARY] = ai_sum
|
|
|
|
# --- D. Timestamps ---
|
|
if settings.UDF_LAST_UPDATE and contact_patch:
|
|
now_so = f"[D:{datetime.now().strftime('%m/%d/%Y %H:%M:%S')}]"
|
|
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_LAST_UPDATE] = now_so
|
|
|
|
if ce_website := provisioning_data.get("website"):
|
|
current_urls = contact_data.get("Urls") or []
|
|
if not any(u.get("Value") == ce_website for u in current_urls):
|
|
contact_patch.setdefault("Urls", []).append({"Value": ce_website, "Description": "AI Discovered"})
|
|
|
|
if contact_patch:
|
|
logger.info(f"🚀 Pushing combined PATCH for Contact {contact_id}: {list(contact_patch.get('UserDefinedFields', {}).keys())}")
|
|
so_client.patch_contact(contact_id, contact_patch)
|
|
else:
|
|
logger.info(f"✅ No changes detected for Contact {contact_id}.")
|
|
|
|
# 2d. Sync Person Position
|
|
role_name = provisioning_data.get("role_name")
|
|
if person_id and role_name:
|
|
try:
|
|
persona_map = json.loads(settings.PERSONA_MAP_JSON)
|
|
position_id = persona_map.get(role_name)
|
|
if position_id:
|
|
so_client.update_person_position(person_id, int(position_id))
|
|
except Exception as e:
|
|
logger.error(f"Error syncing position: {e}")
|
|
|
|
# 3. Update SuperOffice Texts (Person)
|
|
if person_id:
|
|
texts = provisioning_data.get("texts", {})
|
|
unsubscribe_link = provisioning_data.get("unsubscribe_link")
|
|
|
|
udf_update = {}
|
|
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
|
|
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
|
|
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
|
|
if unsubscribe_link and settings.UDF_UNSUBSCRIBE_LINK:
|
|
udf_update[settings.UDF_UNSUBSCRIBE_LINK] = unsubscribe_link
|
|
|
|
if udf_update:
|
|
logger.info(f"Applying text update to Person {person_id}.")
|
|
so_client.update_entity_udfs(person_id, "Person", udf_update)
|
|
|
|
# --- 4. Create Email Simulation Appointment ---
|
|
try:
|
|
opener = provisioning_data.get("opener") or ""
|
|
intro = texts.get("intro") or ""
|
|
proof = texts.get("social_proof") or ""
|
|
subject = texts.get("subject", "No Subject")
|
|
email_body = f"Betreff: {subject}\n\n{opener}\n\n{intro}\n\n{proof}\n\n(Generated via Gemini Marketing Engine)"
|
|
so_client.create_appointment(f"KI: {subject}", email_body, contact_id, person_id)
|
|
except Exception as e:
|
|
logger.error(f"Failed simulation: {e}")
|
|
|
|
return ("SUCCESS", "Processing complete")
|
|
|
|
if __name__ == "__main__":
|
|
run_worker()
|