import sqlite3 import json from datetime import datetime, timedelta import os DB_PATH = os.getenv("DB_PATH", "connector_queue.db") class JobQueue: def __init__(self): self._init_db() def _init_db(self): with sqlite3.connect(DB_PATH, timeout=30) as conn: # Enable WAL mode for better concurrency conn.execute("PRAGMA journal_mode=WAL") conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_type TEXT, payload TEXT, entity_name TEXT, status TEXT DEFAULT 'PENDING', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_msg TEXT, next_try_at TIMESTAMP ) """) # Migration for existing DBs try: conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE jobs ADD COLUMN entity_name TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT") except sqlite3.OperationalError: pass def add_job(self, event_type: str, payload: dict): with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute( "INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)", (event_type, json.dumps(payload), 'PENDING') ) def update_entity_name(self, job_id, name, associate_name=None): with sqlite3.connect(DB_PATH, timeout=30) as conn: if associate_name: conn.execute( "UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?", (str(name), str(associate_name), job_id) ) else: conn.execute( "UPDATE jobs SET entity_name = ?, updated_at = datetime('now') WHERE id = ?", (str(name), job_id) ) def get_next_job(self): """ Atomically fetches the next pending job where next_try_at is reached. """ job = None with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT id, event_type, payload, created_at FROM jobs WHERE status = 'PENDING' AND (next_try_at IS NULL OR next_try_at <= datetime('now')) ORDER BY created_at ASC LIMIT 1 """) row = cursor.fetchone() if row: job = dict(row) # Mark as processing cursor.execute( "UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?", (job['id'],) ) conn.commit() if job: try: job['payload'] = json.loads(job['payload']) except Exception as e: logger.error(f"Failed to parse payload for job {job['id']}: {e}") return None return job def retry_job_later(self, job_id, delay_seconds=60, error_msg=None): next_try = datetime.utcnow() + timedelta(seconds=delay_seconds) with sqlite3.connect(DB_PATH, timeout=30) as conn: if error_msg: conn.execute( "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?", (next_try, str(error_msg), job_id) ) else: conn.execute( "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?", (next_try, job_id) ) def complete_job(self, job_id): with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute( "UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?", (job_id,) ) def skip_job(self, job_id, reason): """ Marks a job as SKIPPED (intentionally not processed). Reason is stored in error_msg for visibility. """ with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute( "UPDATE jobs SET status = 'SKIPPED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", (str(reason), job_id) ) def mark_as_deleted(self, job_id, reason): """ Marks a job as DELETED (entity no longer exists in source system). """ with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute( "UPDATE jobs SET status = 'DELETED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", (str(reason), job_id) ) def fail_job(self, job_id, error_msg): with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute( "UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", (str(error_msg), job_id) ) def get_stats(self): with sqlite3.connect(DB_PATH, timeout=30) as conn: cursor = conn.cursor() cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status") return dict(cursor.fetchall()) def get_recent_jobs(self, limit=50): with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT id, event_type, status, created_at, updated_at, error_msg, payload, entity_name, associate_name FROM jobs ORDER BY updated_at DESC, created_at DESC LIMIT ? """, (limit,)) rows = cursor.fetchall() results = [] for row in rows: r = dict(row) try: r['payload'] = json.loads(r['payload']) except: pass results.append(r) return results def get_account_summary(self, limit=1000): """ Groups recent jobs into logical 'Sync-Runs' using time-gap clustering. If a job for the same ID is more than 15 mins apart, it's a new run. """ jobs = self.get_recent_jobs(limit=limit) runs = [] # Temporary storage to track the latest run for each ID # Format: { 'C123': [run_obj1, run_obj2, ...] } id_to_runs = {} # Jobs are sorted by updated_at DESC (newest first) for job in jobs: payload = job.get('payload', {}) c_id = payload.get('ContactId') p_id = payload.get('PersonId') if not c_id and payload.get('PrimaryKey') and 'contact' in job['event_type'].lower(): c_id = payload.get('PrimaryKey') if not p_id and payload.get('PrimaryKey') and 'person' in job['event_type'].lower(): p_id = payload.get('PrimaryKey') if not c_id and not p_id: continue entity_id = f"P{p_id}" if p_id else f"C{c_id}" job_time = datetime.strptime(job['updated_at'], "%Y-%m-%d %H:%M:%S") target_run = None # Check if we can attach this job to an existing (newer) run cluster if entity_id in id_to_runs: for run in id_to_runs[entity_id]: run_latest_time = datetime.strptime(run['updated_at'], "%Y-%m-%d %H:%M:%S") # If this job is within 15 mins of the run's activity if abs((run_latest_time - job_time).total_seconds()) < 900: target_run = run break if not target_run: # Start a new run cluster target_run = { "id": f"{entity_id}_{job['id']}", # Unique ID for this run row "entity_id": entity_id, "contact_id": c_id, "person_id": p_id, "name": job.get('entity_name') or "Unknown", "associate": job.get('associate_name') or "", "last_event": job['event_type'], "status": job['status'], "created_at": job['created_at'], "updated_at": job['updated_at'], "error_msg": job['error_msg'], "job_count": 0, "duration": "0s", "phases": { "received": "pending", "enriching": "pending", "syncing": "pending", "completed": "pending" } } runs.append(target_run) if entity_id not in id_to_runs: id_to_runs[entity_id] = [] id_to_runs[entity_id].append(target_run) # Update the run with job info target_run["job_count"] += 1 # Update oldest start time (since we iterate newest -> oldest) target_run["created_at"] = job["created_at"] # Calculate Duration for this run try: start = datetime.strptime(target_run["created_at"], "%Y-%m-%d %H:%M:%S") end = datetime.strptime(target_run["updated_at"], "%Y-%m-%d %H:%M:%S") diff = end - start seconds = int(diff.total_seconds()) target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s" except: pass # Resolve Name & Associate (if not already set from a newer job in this cluster) if target_run["name"] == "Unknown": name = job.get('entity_name') or payload.get('Name') or payload.get('crm_name') or payload.get('FullName') or payload.get('ContactName') if not name and payload.get('Firstname'): name = f"{payload.get('Firstname')} {payload.get('Lastname', '')}".strip() if name: target_run["name"] = name if not target_run["associate"] and job.get('associate_name'): target_run["associate"] = job['associate_name'] # Update Status based on the jobs in the run # Update Status based on the jobs in the run # Priority: FAILED > PROCESSING > COMPLETED > SKIPPED > PENDING status_priority = {"FAILED": 4, "PROCESSING": 3, "COMPLETED": 2, "SKIPPED": 1, "PENDING": 0} current_prio = status_priority.get(target_run["status"], -1) new_prio = status_priority.get(job["status"], -1) # CRITICAL: We only update the status if the new job has a HIGHER priority # Example: If current is COMPLETED (2) and new is SKIPPED (1), we keep COMPLETED. if new_prio > current_prio: target_run["status"] = job["status"] target_run["error_msg"] = job["error_msg"] # Set visual phases based on status if job["status"] == "COMPLETED": target_run["phases"] = {"received": "completed", "enriching": "completed", "syncing": "completed", "completed": "completed"} elif job["status"] == "FAILED": target_run["phases"] = {"received": "completed", "enriching": "failed", "syncing": "pending", "completed": "pending"} elif job["status"] == "PROCESSING": target_run["phases"] = {"received": "completed", "enriching": "processing", "syncing": "pending", "completed": "pending"} # Note: SKIPPED (1) and PENDING (0) will use the target_run's initial phases or keep previous ones. # SPECIAL CASE: If we already have COMPLETED but a new job is SKIPPED, we might want to keep the error_msg empty # to avoid showing "Skipped Echo" on a successful row. if target_run["status"] == "COMPLETED" and job["status"] == "SKIPPED": pass # Keep everything from the successful run # Final cleanup for r in runs: if r["name"] == "Unknown": r["name"] = f"Entity {r['entity_id']}" return runs