244 lines
9.4 KiB
Python
244 lines
9.4 KiB
Python
import sqlite3
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
|
|
DB_PATH = os.getenv("DB_PATH", "connector_queue.db")
|
|
|
|
class JobQueue:
|
|
def __init__(self):
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
event_type TEXT,
|
|
payload TEXT,
|
|
status TEXT DEFAULT 'PENDING',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
error_msg TEXT,
|
|
next_try_at TIMESTAMP
|
|
)
|
|
""")
|
|
# Migration for existing DBs
|
|
try:
|
|
conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
def add_job(self, event_type: str, payload: dict):
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
conn.execute(
|
|
"INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)",
|
|
(event_type, json.dumps(payload), 'PENDING')
|
|
)
|
|
|
|
def get_next_job(self):
|
|
"""
|
|
Atomically fetches the next pending job where next_try_at is reached.
|
|
"""
|
|
job = None
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
# Lock the job
|
|
cursor.execute("BEGIN EXCLUSIVE")
|
|
try:
|
|
cursor.execute("""
|
|
SELECT id, event_type, payload, created_at
|
|
FROM jobs
|
|
WHERE status = 'PENDING'
|
|
AND (next_try_at IS NULL OR next_try_at <= datetime('now'))
|
|
ORDER BY created_at ASC
|
|
LIMIT 1
|
|
""")
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
job = dict(row)
|
|
# Mark as processing
|
|
cursor.execute(
|
|
"UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?",
|
|
(job['id'],)
|
|
)
|
|
conn.commit()
|
|
else:
|
|
conn.rollback() # No job found
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
|
|
if job:
|
|
job['payload'] = json.loads(job['payload'])
|
|
|
|
return job
|
|
|
|
def retry_job_later(self, job_id, delay_seconds=60, error_msg=None):
|
|
next_try = datetime.utcnow() + timedelta(seconds=delay_seconds)
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
if error_msg:
|
|
conn.execute(
|
|
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?",
|
|
(next_try, str(error_msg), job_id)
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?",
|
|
(next_try, job_id)
|
|
)
|
|
|
|
def complete_job(self, job_id):
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
conn.execute(
|
|
"UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?",
|
|
(job_id,)
|
|
)
|
|
|
|
def fail_job(self, job_id, error_msg):
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
conn.execute(
|
|
"UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
|
|
(str(error_msg), job_id)
|
|
)
|
|
|
|
def get_stats(self):
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
|
|
return dict(cursor.fetchall())
|
|
|
|
def get_recent_jobs(self, limit=50):
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT id, event_type, status, created_at, updated_at, error_msg, payload
|
|
FROM jobs
|
|
ORDER BY updated_at DESC, created_at DESC
|
|
LIMIT ?
|
|
""", (limit,))
|
|
rows = cursor.fetchall()
|
|
results = []
|
|
for row in rows:
|
|
r = dict(row)
|
|
try:
|
|
r['payload'] = json.loads(r['payload'])
|
|
except:
|
|
pass
|
|
results.append(r)
|
|
return results
|
|
|
|
def get_account_summary(self, limit=1000):
|
|
"""
|
|
Groups recent jobs by ContactId/PersonId and returns a summary status.
|
|
"""
|
|
jobs = self.get_recent_jobs(limit=limit)
|
|
accounts = {}
|
|
|
|
for job in jobs:
|
|
payload = job.get('payload', {})
|
|
# Try to find IDs
|
|
c_id = payload.get('ContactId')
|
|
p_id = payload.get('PersonId')
|
|
|
|
# Fallback for cascaded jobs or primary keys
|
|
if not c_id and payload.get('PrimaryKey') and 'contact' in job['event_type'].lower():
|
|
c_id = payload.get('PrimaryKey')
|
|
if not p_id and payload.get('PrimaryKey') and 'person' in job['event_type'].lower():
|
|
p_id = payload.get('PrimaryKey')
|
|
|
|
if not c_id and not p_id:
|
|
continue
|
|
|
|
# Create a unique key for the entity
|
|
key = f"P{p_id}" if p_id else f"C{c_id}"
|
|
|
|
if key not in accounts:
|
|
accounts[key] = {
|
|
"id": key,
|
|
"contact_id": c_id,
|
|
"person_id": p_id,
|
|
"name": "Unknown",
|
|
"last_event": job['event_type'],
|
|
"status": job['status'],
|
|
"created_at": job['created_at'], # Oldest job in group (since we sort by DESC)
|
|
"updated_at": job['updated_at'], # Most recent job
|
|
"error_msg": job['error_msg'],
|
|
"job_count": 0,
|
|
"duration": "0s",
|
|
"phases": {
|
|
"received": "completed",
|
|
"enriching": "pending",
|
|
"syncing": "pending",
|
|
"completed": "pending"
|
|
}
|
|
}
|
|
|
|
acc = accounts[key]
|
|
acc["job_count"] += 1
|
|
|
|
# Update duration
|
|
try:
|
|
# We want the absolute start (oldest created_at)
|
|
# Since jobs are DESC, the last one we iterate through for a key is the oldest
|
|
acc["created_at"] = job["created_at"]
|
|
|
|
start = datetime.strptime(acc["created_at"], "%Y-%m-%d %H:%M:%S")
|
|
end = datetime.strptime(acc["updated_at"], "%Y-%m-%d %H:%M:%S")
|
|
diff = end - start
|
|
seconds = int(diff.total_seconds())
|
|
if seconds < 60:
|
|
acc["duration"] = f"{seconds}s"
|
|
else:
|
|
acc["duration"] = f"{seconds // 60}m {seconds % 60}s"
|
|
except Exception:
|
|
pass
|
|
|
|
# Try to resolve 'Unknown' name from any job in the group
|
|
if acc["name"] == "Unknown":
|
|
name = payload.get('Name') or payload.get('crm_name') or payload.get('FullName') or payload.get('ContactName')
|
|
if not name and payload.get('Firstname'):
|
|
name = f"{payload.get('Firstname')} {payload.get('Lastname', '')}".strip()
|
|
if name:
|
|
acc["name"] = name
|
|
|
|
# Update overall status based on most recent job
|
|
# (Assuming jobs are sorted by updated_at DESC)
|
|
if acc["job_count"] == 1:
|
|
acc["status"] = job["status"]
|
|
acc["updated_at"] = job["updated_at"]
|
|
acc["error_msg"] = job["error_msg"]
|
|
|
|
# Determine Phase
|
|
if job["status"] == "COMPLETED":
|
|
acc["phases"] = {
|
|
"received": "completed",
|
|
"enriching": "completed",
|
|
"syncing": "completed",
|
|
"completed": "completed"
|
|
}
|
|
elif job["status"] == "FAILED":
|
|
acc["phases"]["received"] = "completed"
|
|
acc["phases"]["enriching"] = "failed"
|
|
elif job["status"] == "PROCESSING":
|
|
acc["phases"]["received"] = "completed"
|
|
acc["phases"]["enriching"] = "processing"
|
|
elif job["status"] == "PENDING":
|
|
acc["phases"]["received"] = "completed"
|
|
# If it has an error msg like 'processing', it's in enriching
|
|
if job["error_msg"] and "processing" in job["error_msg"].lower():
|
|
acc["phases"]["enriching"] = "processing"
|
|
else:
|
|
acc["phases"]["received"] = "processing"
|
|
|
|
# Final cleanup for names
|
|
for acc in accounts.values():
|
|
if acc["name"] == "Unknown":
|
|
acc["name"] = f"Entity {acc['id']}"
|
|
|
|
return list(accounts.values())
|