From 65aaff3936e5d7e60b2e89ff9c0dce99a6ac4a38 Mon Sep 17 00:00:00 2001 From: Floke Date: Mon, 9 Mar 2026 12:50:32 +0000 Subject: [PATCH] feat(connector): [31e88f42] Implement robust de-duplication at ingress This commit addresses the issue of duplicate jobs being created by the SuperOffice connector. The root cause was identified as a race condition where SuperOffice would send multiple webhooks in quick succession for the same entity, leading to multiple identical jobs in the queue. The solution involves several layers of improvement: 1. **Ingress De-duplication:** The now checks for existing jobs for the same entity *before* adding a new job to the queue. This is the primary fix and prevents duplicates at the source. 2. **DB Schema Enhancement:** The table schema in was extended with an column to allow for reliable and efficient checking of duplicate entities. 3. **Improved Logging:** The log messages in for job retries (e.g., when waiting for the Company Explorer) have been made more descriptive to avoid confusion and false alarms. --- connector-superoffice/queue_manager.py | 58 +++++++++++++++++++++----- connector-superoffice/webhook_app.py | 5 +++ connector-superoffice/worker.py | 10 ++++- 3 files changed, 61 insertions(+), 12 deletions(-) diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index 269822cf..175fe004 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -44,6 +44,9 @@ class JobQueue: try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT") except sqlite3.OperationalError: pass + try: conn.execute("ALTER TABLE jobs ADD COLUMN entity_id INTEGER") + except sqlite3.OperationalError: pass + try: conn.execute("ALTER TABLE jobs ADD COLUMN retry_count INTEGER DEFAULT 0") except sqlite3.OperationalError: pass conn.commit() @@ -53,22 +56,57 @@ class JobQueue: raise def add_job(self, event_type: str, payload: dict): + entity_id = payload.get("PrimaryKey") with sqlite3.connect(DB_PATH, timeout=30) as conn: try: conn.execute( - "INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)", - (event_type, json.dumps(payload), 'PENDING') + "INSERT INTO jobs (event_type, payload, status, entity_id) VALUES (?, ?, ?, ?)", + (event_type, json.dumps(payload), 'PENDING', entity_id) ) conn.commit() - logger.debug(f"Job added: {event_type}") + logger.debug(f"Job added: {event_type} for entity {entity_id}") except Exception as e: - logger.error(f"❌ Failed to add job: {e}", exc_info=True) - conn.rollback() - raise - def update_entity_name(self, job_id, name, associate_name=None): - with sqlite3.connect(DB_PATH, timeout=30) as conn: - try: - if associate_name: + logger.error(f"❌ Failed to add job: {e}", exc_info=True) + conn.rollback() + raise + + def is_duplicate_pending(self, event_type: str, payload: dict) -> bool: + """ + Checks if a job with the same event_type and entity_id is already PENDING. + This is to prevent adding duplicate jobs from webhooks that are sent multiple times. + """ + # We only want to de-duplicate creation events, as change events can be validly stacked. + if "created" not in event_type.lower(): + return False + + entity_id = payload.get("PrimaryKey") + if not entity_id: + return False # Cannot de-duplicate without a key + + five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) + with sqlite3.connect(DB_PATH, timeout=30) as conn: + try: + cursor = conn.cursor() + cursor.execute(""" + SELECT id FROM jobs + WHERE event_type = ? + AND entity_id = ? + AND status = 'PENDING' + AND created_at >= ? + """, (event_type, entity_id, five_minutes_ago)) + + if existing_job := cursor.fetchone(): + logger.warning(f"Found PENDING duplicate of job {existing_job[0]} for event '{event_type}' and entity '{entity_id}'. Ignoring new webhook.") + return True + return False + except Exception as e: + logger.error(f"❌ Failed to check for PENDING duplicate jobs for entity '{entity_id}': {e}", exc_info=True) + return False # Fail safe: better to have a duplicate than to miss an event. + + def update_entity_name(self, job_id, name, associate_name=None): + with sqlite3.connect(DB_PATH, timeout=30) as conn: + try: + if associate_name: conn.execute( "UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?", (str(name), str(associate_name), job_id) diff --git a/connector-superoffice/webhook_app.py b/connector-superoffice/webhook_app.py index 25df72a7..91f6d469 100644 --- a/connector-superoffice/webhook_app.py +++ b/connector-superoffice/webhook_app.py @@ -34,6 +34,11 @@ async def receive_webhook(request: Request, background_tasks: BackgroundTasks): event_type = payload.get("Event", "unknown") + # --- DEDUPLICATION AT INGRESS (Added March 2026) --- + # Before adding a job, check if an identical one is already pending. + if queue.is_duplicate_pending(event_type, payload): + return {"status": "skipped_duplicate"} + # Add to local Queue queue.add_job(event_type, payload) diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py index f9fe2523..181bafbc 100644 --- a/connector-superoffice/worker.py +++ b/connector-superoffice/worker.py @@ -243,10 +243,16 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue): try: auth_tuple = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini")) resp = requests.post(ce_url, json=ce_req, auth=auth_tuple) - if resp.status_code == 404: return ("RETRY", "CE 404") + if resp.status_code == 404: + msg = f"Company Explorer API returned 404 for '{crm_name}'" + logger.warning(msg) + return ("RETRY", msg) resp.raise_for_status() provisioning_data = resp.json() - if provisioning_data.get("status") == "processing": return ("RETRY", "CE processing") + if provisioning_data.get("status") == "processing": + msg = f"Company Explorer is still processing '{crm_name}'. This is expected for new companies. Will retry." + logger.info(msg) + return ("RETRY", msg) except Exception as e: raise Exception(f"Company Explorer API failed: {e}")