feat(connector): [31e88f42] Implement robust de-duplication at ingress
This commit addresses the issue of duplicate jobs being created by the SuperOffice connector. The root cause was identified as a race condition where SuperOffice would send multiple webhooks in quick succession for the same entity, leading to multiple identical jobs in the queue. The solution involves several layers of improvement: 1. **Ingress De-duplication:** The now checks for existing jobs for the same entity *before* adding a new job to the queue. This is the primary fix and prevents duplicates at the source. 2. **DB Schema Enhancement:** The table schema in was extended with an column to allow for reliable and efficient checking of duplicate entities. 3. **Improved Logging:** The log messages in for job retries (e.g., when waiting for the Company Explorer) have been made more descriptive to avoid confusion and false alarms.
This commit is contained in:
@@ -243,10 +243,16 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue):
|
||||
try:
|
||||
auth_tuple = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
|
||||
resp = requests.post(ce_url, json=ce_req, auth=auth_tuple)
|
||||
if resp.status_code == 404: return ("RETRY", "CE 404")
|
||||
if resp.status_code == 404:
|
||||
msg = f"Company Explorer API returned 404 for '{crm_name}'"
|
||||
logger.warning(msg)
|
||||
return ("RETRY", msg)
|
||||
resp.raise_for_status()
|
||||
provisioning_data = resp.json()
|
||||
if provisioning_data.get("status") == "processing": return ("RETRY", "CE processing")
|
||||
if provisioning_data.get("status") == "processing":
|
||||
msg = f"Company Explorer is still processing '{crm_name}'. This is expected for new companies. Will retry."
|
||||
logger.info(msg)
|
||||
return ("RETRY", msg)
|
||||
except Exception as e:
|
||||
raise Exception(f"Company Explorer API failed: {e}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user