import sqlite3 import json from datetime import datetime, timedelta import os DB_PATH = os.getenv("DB_PATH", "connector_queue.db") class JobQueue: def __init__(self): self._init_db() def _init_db(self): with sqlite3.connect(DB_PATH) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_type TEXT, payload TEXT, status TEXT DEFAULT 'PENDING', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_msg TEXT, next_try_at TIMESTAMP ) """) # Migration for existing DBs try: conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP") except sqlite3.OperationalError: pass def add_job(self, event_type: str, payload: dict): with sqlite3.connect(DB_PATH) as conn: conn.execute( "INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)", (event_type, json.dumps(payload), 'PENDING') ) def get_next_job(self): """ Atomically fetches the next pending job where next_try_at is reached. """ job = None with sqlite3.connect(DB_PATH) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() # Lock the job cursor.execute("BEGIN EXCLUSIVE") try: cursor.execute(""" SELECT id, event_type, payload, created_at FROM jobs WHERE status = 'PENDING' AND (next_try_at IS NULL OR next_try_at <= datetime('now')) ORDER BY created_at ASC LIMIT 1 """) row = cursor.fetchone() if row: job = dict(row) # Mark as processing cursor.execute( "UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?", (job['id'],) ) conn.commit() else: conn.rollback() # No job found except Exception: conn.rollback() raise if job: job['payload'] = json.loads(job['payload']) return job def retry_job_later(self, job_id, delay_seconds=60, error_msg=None): next_try = datetime.utcnow() + timedelta(seconds=delay_seconds) with sqlite3.connect(DB_PATH) as conn: if error_msg: conn.execute( "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?", (next_try, str(error_msg), job_id) ) else: conn.execute( "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?", (next_try, job_id) ) def complete_job(self, job_id): with sqlite3.connect(DB_PATH) as conn: conn.execute( "UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?", (job_id,) ) def fail_job(self, job_id, error_msg): with sqlite3.connect(DB_PATH) as conn: conn.execute( "UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", (str(error_msg), job_id) ) def get_stats(self): with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status") return dict(cursor.fetchall()) def get_recent_jobs(self, limit=50): with sqlite3.connect(DB_PATH) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT id, event_type, status, created_at, updated_at, error_msg, payload FROM jobs ORDER BY updated_at DESC, created_at DESC LIMIT ? """, (limit,)) rows = cursor.fetchall() results = [] for row in rows: r = dict(row) try: r['payload'] = json.loads(r['payload']) except: pass results.append(r) return results def get_account_summary(self, limit=1000): """ Groups recent jobs by ContactId/PersonId and returns a summary status. """ jobs = self.get_recent_jobs(limit=limit) accounts = {} for job in jobs: payload = job.get('payload', {}) # Try to find IDs c_id = payload.get('ContactId') p_id = payload.get('PersonId') # Fallback for cascaded jobs or primary keys if not c_id and payload.get('PrimaryKey') and 'contact' in job['event_type'].lower(): c_id = payload.get('PrimaryKey') if not p_id and payload.get('PrimaryKey') and 'person' in job['event_type'].lower(): p_id = payload.get('PrimaryKey') if not c_id and not p_id: continue # Create a unique key for the entity key = f"P{p_id}" if p_id else f"C{c_id}" if key not in accounts: accounts[key] = { "id": key, "contact_id": c_id, "person_id": p_id, "name": "Unknown", "last_event": job['event_type'], "status": job['status'], "created_at": job['created_at'], # Oldest job in group (since we sort by DESC) "updated_at": job['updated_at'], # Most recent job "error_msg": job['error_msg'], "job_count": 0, "duration": "0s", "phases": { "received": "completed", "enriching": "pending", "syncing": "pending", "completed": "pending" } } acc = accounts[key] acc["job_count"] += 1 # Update duration try: # We want the absolute start (oldest created_at) # Since jobs are DESC, the last one we iterate through for a key is the oldest acc["created_at"] = job["created_at"] start = datetime.strptime(acc["created_at"], "%Y-%m-%d %H:%M:%S") end = datetime.strptime(acc["updated_at"], "%Y-%m-%d %H:%M:%S") diff = end - start seconds = int(diff.total_seconds()) if seconds < 60: acc["duration"] = f"{seconds}s" else: acc["duration"] = f"{seconds // 60}m {seconds % 60}s" except Exception: pass # Try to resolve 'Unknown' name from any job in the group if acc["name"] == "Unknown": name = payload.get('Name') or payload.get('crm_name') or payload.get('FullName') or payload.get('ContactName') if not name and payload.get('Firstname'): name = f"{payload.get('Firstname')} {payload.get('Lastname', '')}".strip() if name: acc["name"] = name # Update overall status based on most recent job # (Assuming jobs are sorted by updated_at DESC) if acc["job_count"] == 1: acc["status"] = job["status"] acc["updated_at"] = job["updated_at"] acc["error_msg"] = job["error_msg"] # Determine Phase if job["status"] == "COMPLETED": acc["phases"] = { "received": "completed", "enriching": "completed", "syncing": "completed", "completed": "completed" } elif job["status"] == "FAILED": acc["phases"]["received"] = "completed" acc["phases"]["enriching"] = "failed" elif job["status"] == "PROCESSING": acc["phases"]["received"] = "completed" acc["phases"]["enriching"] = "processing" elif job["status"] == "PENDING": acc["phases"]["received"] = "completed" # If it has an error msg like 'processing', it's in enriching if job["error_msg"] and "processing" in job["error_msg"].lower(): acc["phases"]["enriching"] = "processing" else: acc["phases"]["received"] = "processing" # Final cleanup for names for acc in accounts.values(): if acc["name"] == "Unknown": acc["name"] = f"Entity {acc['id']}" return list(accounts.values())