import sqlite3 import json from datetime import datetime, timedelta import os DB_PATH = os.getenv("DB_PATH", "connector_queue.db") class JobQueue: def __init__(self): self._init_db() def _init_db(self): with sqlite3.connect(DB_PATH) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_type TEXT, payload TEXT, status TEXT DEFAULT 'PENDING', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_msg TEXT, next_try_at TIMESTAMP ) """) # Migration for existing DBs try: conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP") except sqlite3.OperationalError: pass def add_job(self, event_type: str, payload: dict): with sqlite3.connect(DB_PATH) as conn: conn.execute( "INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)", (event_type, json.dumps(payload), 'PENDING') ) def get_next_job(self): """ Atomically fetches the next pending job where next_try_at is reached. """ job = None with sqlite3.connect(DB_PATH) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Lock the job cursor.execute("BEGIN EXCLUSIVE") try: cursor.execute(""" SELECT id, event_type, payload, created_at FROM jobs WHERE status = 'PENDING' AND (next_try_at IS NULL OR next_try_at <= datetime('now')) ORDER BY created_at ASC LIMIT 1 """) row = cursor.fetchone() if row: job = dict(row) # Mark as processing cursor.execute( "UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?", (job['id'],) ) conn.commit() else: conn.rollback() # No job found except Exception: conn.rollback() raise if job: job['payload'] = json.loads(job['payload']) return job def retry_job_later(self, job_id, delay_seconds=60): next_try = datetime.utcnow() + timedelta(seconds=delay_seconds) with sqlite3.connect(DB_PATH) as conn: conn.execute( "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?", (next_try, job_id) ) def complete_job(self, job_id): with sqlite3.connect(DB_PATH) as conn: conn.execute( "UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?", (job_id,) ) def fail_job(self, job_id, error_msg): with sqlite3.connect(DB_PATH) as conn: conn.execute( "UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", (str(error_msg), job_id) ) def get_stats(self): with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status") return dict(cursor.fetchall()) def get_recent_jobs(self, limit=50): with sqlite3.connect(DB_PATH) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT id, event_type, status, created_at, updated_at, error_msg, payload FROM jobs ORDER BY updated_at DESC, created_at DESC LIMIT ? """, (limit,)) rows = cursor.fetchall() results = [] for row in rows: r = dict(row) try: r['payload'] = json.loads(r['payload']) except: pass results.append(r) return results