From f859d7450b7b6f59960705f8d108e7ff707b6cc8 Mon Sep 17 00:00:00 2001 From: Floke Date: Mon, 9 Mar 2026 08:39:35 +0000 Subject: [PATCH] fix(connector): [31e88f42] Implement de-duplication for contact.created SuperOffice sends two 'contact.created' webhooks for a single new contact. This caused the connector to process the same entity twice, leading to duplicate entries and logs. This commit introduces a de-duplication shield in the worker: - A new method is added to to check for jobs with the same company name that are either 'PROCESSING' or 'COMPLETED' within the last 5 minutes. - The worker now fetches the company name upon receiving a job, updates the job record with the name, and then calls the new de-duplication method. - If a duplicate event is detected, the job is skipped, preventing redundant processing. --- connector-superoffice/queue_manager.py | 25 +++++++++++++++++++++++++ connector-superoffice/worker.py | 7 +++++++ 2 files changed, 32 insertions(+) diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index 60b30900..8fe393e6 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -185,6 +185,31 @@ class JobQueue: except Exception as e: logger.critical(f"❌ CRITICAL: Failed to set job {job_id} to FAILED: {e}", exc_info=True) conn.rollback() + + def check_for_recent_duplicate(self, entity_name: str, current_job_id: int) -> bool: + """ + Checks if another job with the same entity_name has been processed or is processing recently. + """ + five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) + with sqlite3.connect(DB_PATH, timeout=30) as conn: + try: + cursor = conn.cursor() + cursor.execute(""" + SELECT id FROM jobs + WHERE entity_name = ? + AND id != ? + AND status IN ('COMPLETED', 'PROCESSING') + AND created_at >= ? + """, (entity_name, current_job_id, five_minutes_ago)) + + if cursor.fetchone(): + logger.warning(f"Found recent duplicate job for entity '{entity_name}' (related to job {current_job_id}).") + return True + return False + except Exception as e: + logger.error(f"❌ Failed to check for duplicate jobs for entity '{entity_name}': {e}", exc_info=True) + return False # Fail safe + def get_stats(self): with sqlite3.connect(DB_PATH, timeout=30) as conn: cursor = conn.cursor() diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py index 223d991c..08b8bb47 100644 --- a/connector-superoffice/worker.py +++ b/connector-superoffice/worker.py @@ -189,6 +189,13 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue): assoc = contact_details.get("Associate") or {} aname = assoc.get("Name", "").upper().strip() queue.update_entity_name(job['id'], crm_name, associate_name=aname) + + # --- DE-DUPLICATION SHIELD (Added March 2026) --- + if "contact.created" in event_low: + if queue.check_for_recent_duplicate(crm_name, job['id']): + msg = f"Duplicate 'contact.created' event for '{crm_name}'. This job will be skipped." + logger.info(f"🛡️ {msg}") + return ("SKIPPED", msg) # ROBOPLANET FILTER is_robo = False