feat(connector): [31e88f42] Implement robust de-duplication at ingress
This commit addresses the issue of duplicate jobs being created by the SuperOffice connector. The root cause was identified as a race condition where SuperOffice would send multiple webhooks in quick succession for the same entity, leading to multiple identical jobs in the queue. The solution involves several layers of improvement: 1. **Ingress De-duplication:** The now checks for existing jobs for the same entity *before* adding a new job to the queue. This is the primary fix and prevents duplicates at the source. 2. **DB Schema Enhancement:** The table schema in was extended with an column to allow for reliable and efficient checking of duplicate entities. 3. **Improved Logging:** The log messages in for job retries (e.g., when waiting for the Company Explorer) have been made more descriptive to avoid confusion and false alarms.
This commit is contained in:
@@ -44,6 +44,9 @@ class JobQueue:
|
||||
try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT")
|
||||
except sqlite3.OperationalError: pass
|
||||
|
||||
try: conn.execute("ALTER TABLE jobs ADD COLUMN entity_id INTEGER")
|
||||
except sqlite3.OperationalError: pass
|
||||
|
||||
try: conn.execute("ALTER TABLE jobs ADD COLUMN retry_count INTEGER DEFAULT 0")
|
||||
except sqlite3.OperationalError: pass
|
||||
conn.commit()
|
||||
@@ -53,22 +56,57 @@ class JobQueue:
|
||||
raise
|
||||
|
||||
def add_job(self, event_type: str, payload: dict):
|
||||
entity_id = payload.get("PrimaryKey")
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
try:
|
||||
conn.execute(
|
||||
"INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)",
|
||||
(event_type, json.dumps(payload), 'PENDING')
|
||||
"INSERT INTO jobs (event_type, payload, status, entity_id) VALUES (?, ?, ?, ?)",
|
||||
(event_type, json.dumps(payload), 'PENDING', entity_id)
|
||||
)
|
||||
conn.commit()
|
||||
logger.debug(f"Job added: {event_type}")
|
||||
logger.debug(f"Job added: {event_type} for entity {entity_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to add job: {e}", exc_info=True)
|
||||
conn.rollback()
|
||||
raise
|
||||
def update_entity_name(self, job_id, name, associate_name=None):
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
try:
|
||||
if associate_name:
|
||||
logger.error(f"❌ Failed to add job: {e}", exc_info=True)
|
||||
conn.rollback()
|
||||
raise
|
||||
|
||||
def is_duplicate_pending(self, event_type: str, payload: dict) -> bool:
|
||||
"""
|
||||
Checks if a job with the same event_type and entity_id is already PENDING.
|
||||
This is to prevent adding duplicate jobs from webhooks that are sent multiple times.
|
||||
"""
|
||||
# We only want to de-duplicate creation events, as change events can be validly stacked.
|
||||
if "created" not in event_type.lower():
|
||||
return False
|
||||
|
||||
entity_id = payload.get("PrimaryKey")
|
||||
if not entity_id:
|
||||
return False # Cannot de-duplicate without a key
|
||||
|
||||
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
SELECT id FROM jobs
|
||||
WHERE event_type = ?
|
||||
AND entity_id = ?
|
||||
AND status = 'PENDING'
|
||||
AND created_at >= ?
|
||||
""", (event_type, entity_id, five_minutes_ago))
|
||||
|
||||
if existing_job := cursor.fetchone():
|
||||
logger.warning(f"Found PENDING duplicate of job {existing_job[0]} for event '{event_type}' and entity '{entity_id}'. Ignoring new webhook.")
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to check for PENDING duplicate jobs for entity '{entity_id}': {e}", exc_info=True)
|
||||
return False # Fail safe: better to have a duplicate than to miss an event.
|
||||
|
||||
def update_entity_name(self, job_id, name, associate_name=None):
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
try:
|
||||
if associate_name:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?",
|
||||
(str(name), str(associate_name), job_id)
|
||||
|
||||
Reference in New Issue
Block a user