diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index b255f4a4..5addaa5b 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -11,6 +11,8 @@ class JobQueue: def _init_db(self): with sqlite3.connect(DB_PATH, timeout=30) as conn: + # Enable WAL mode for better concurrency + conn.execute("PRAGMA journal_mode=WAL") conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -66,35 +68,31 @@ class JobQueue: conn.row_factory = sqlite3.Row cursor = conn.cursor() - # Lock the job - cursor.execute("BEGIN EXCLUSIVE") - try: - cursor.execute(""" - SELECT id, event_type, payload, created_at - FROM jobs - WHERE status = 'PENDING' - AND (next_try_at IS NULL OR next_try_at <= datetime('now')) - ORDER BY created_at ASC - LIMIT 1 - """) - row = cursor.fetchone() - - if row: - job = dict(row) - # Mark as processing - cursor.execute( - "UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?", - (job['id'],) - ) - conn.commit() - else: - conn.rollback() # No job found - except Exception: - conn.rollback() - raise + cursor.execute(""" + SELECT id, event_type, payload, created_at + FROM jobs + WHERE status = 'PENDING' + AND (next_try_at IS NULL OR next_try_at <= datetime('now')) + ORDER BY created_at ASC + LIMIT 1 + """) + row = cursor.fetchone() + + if row: + job = dict(row) + # Mark as processing + cursor.execute( + "UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?", + (job['id'],) + ) + conn.commit() if job: - job['payload'] = json.loads(job['payload']) + try: + job['payload'] = json.loads(job['payload']) + except Exception as e: + logger.error(f"Failed to parse payload for job {job['id']}: {e}") + return None return job