diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index 175fe004..1103e389 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -66,47 +66,47 @@ class JobQueue: conn.commit() logger.debug(f"Job added: {event_type} for entity {entity_id}") except Exception as e: - logger.error(f"❌ Failed to add job: {e}", exc_info=True) - conn.rollback() - raise + logger.error(f"❌ Failed to add job: {e}", exc_info=True) + conn.rollback() + raise - def is_duplicate_pending(self, event_type: str, payload: dict) -> bool: - """ - Checks if a job with the same event_type and entity_id is already PENDING. - This is to prevent adding duplicate jobs from webhooks that are sent multiple times. - """ - # We only want to de-duplicate creation events, as change events can be validly stacked. - if "created" not in event_type.lower(): - return False + def is_duplicate_pending(self, event_type: str, payload: dict) -> bool: + """ + Checks if a job with the same event_type and entity_id is already PENDING. + This is to prevent adding duplicate jobs from webhooks that are sent multiple times. + """ + # We only want to de-duplicate creation events, as change events can be validly stacked. + if "created" not in event_type.lower(): + return False + + entity_id = payload.get("PrimaryKey") + if not entity_id: + return False # Cannot de-duplicate without a key + + five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) + with sqlite3.connect(DB_PATH, timeout=30) as conn: + try: + cursor = conn.cursor() + cursor.execute(""" + SELECT id FROM jobs + WHERE event_type = ? + AND entity_id = ? + AND status = 'PENDING' + AND created_at >= ? + """, (event_type, entity_id, five_minutes_ago)) - entity_id = payload.get("PrimaryKey") - if not entity_id: - return False # Cannot de-duplicate without a key - - five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) - with sqlite3.connect(DB_PATH, timeout=30) as conn: - try: - cursor = conn.cursor() - cursor.execute(""" - SELECT id FROM jobs - WHERE event_type = ? - AND entity_id = ? - AND status = 'PENDING' - AND created_at >= ? - """, (event_type, entity_id, five_minutes_ago)) - - if existing_job := cursor.fetchone(): - logger.warning(f"Found PENDING duplicate of job {existing_job[0]} for event '{event_type}' and entity '{entity_id}'. Ignoring new webhook.") - return True - return False - except Exception as e: - logger.error(f"❌ Failed to check for PENDING duplicate jobs for entity '{entity_id}': {e}", exc_info=True) - return False # Fail safe: better to have a duplicate than to miss an event. - - def update_entity_name(self, job_id, name, associate_name=None): - with sqlite3.connect(DB_PATH, timeout=30) as conn: - try: - if associate_name: + if existing_job := cursor.fetchone(): + logger.warning(f"Found PENDING duplicate of job {existing_job[0]} for event '{event_type}' and entity '{entity_id}'. Ignoring new webhook.") + return True + return False + except Exception as e: + logger.error(f"❌ Failed to check for PENDING duplicate jobs for entity '{entity_id}': {e}", exc_info=True) + return False # Fail safe: better to have a duplicate than to miss an event. + + def update_entity_name(self, job_id, name, associate_name=None): + with sqlite3.connect(DB_PATH, timeout=30) as conn: + try: + if associate_name: conn.execute( "UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?", (str(name), str(associate_name), job_id)