feat(connector): [31188f42] Finalize production optimizations, filtering, and dashboard enhancements
This commit is contained in:
@@ -16,6 +16,7 @@ class JobQueue:
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
event_type TEXT,
|
||||
payload TEXT,
|
||||
entity_name TEXT,
|
||||
status TEXT DEFAULT 'PENDING',
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
@@ -26,8 +27,15 @@ class JobQueue:
|
||||
# Migration for existing DBs
|
||||
try:
|
||||
conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP")
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
except sqlite3.OperationalError: pass
|
||||
|
||||
try:
|
||||
conn.execute("ALTER TABLE jobs ADD COLUMN entity_name TEXT")
|
||||
except sqlite3.OperationalError: pass
|
||||
|
||||
try:
|
||||
conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT")
|
||||
except sqlite3.OperationalError: pass
|
||||
|
||||
def add_job(self, event_type: str, payload: dict):
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
@@ -36,6 +44,19 @@ class JobQueue:
|
||||
(event_type, json.dumps(payload), 'PENDING')
|
||||
)
|
||||
|
||||
def update_entity_name(self, job_id, name, associate_name=None):
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
if associate_name:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?",
|
||||
(str(name), str(associate_name), job_id)
|
||||
)
|
||||
else:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET entity_name = ?, updated_at = datetime('now') WHERE id = ?",
|
||||
(str(name), job_id)
|
||||
)
|
||||
|
||||
def get_next_job(self):
|
||||
"""
|
||||
Atomically fetches the next pending job where next_try_at is reached.
|
||||
@@ -127,7 +148,7 @@ class JobQueue:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
SELECT id, event_type, status, created_at, updated_at, error_msg, payload
|
||||
SELECT id, event_type, status, created_at, updated_at, error_msg, payload, entity_name, associate_name
|
||||
FROM jobs
|
||||
ORDER BY updated_at DESC, created_at DESC
|
||||
LIMIT ?
|
||||
@@ -189,7 +210,8 @@ class JobQueue:
|
||||
"entity_id": entity_id,
|
||||
"contact_id": c_id,
|
||||
"person_id": p_id,
|
||||
"name": "Unknown",
|
||||
"name": job.get('entity_name') or "Unknown",
|
||||
"associate": job.get('associate_name') or "",
|
||||
"last_event": job['event_type'],
|
||||
"status": job['status'],
|
||||
"created_at": job['created_at'],
|
||||
@@ -224,19 +246,26 @@ class JobQueue:
|
||||
target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s"
|
||||
except: pass
|
||||
|
||||
# Resolve Name
|
||||
# Resolve Name & Associate (if not already set from a newer job in this cluster)
|
||||
if target_run["name"] == "Unknown":
|
||||
name = payload.get('Name') or payload.get('crm_name') or payload.get('FullName') or payload.get('ContactName')
|
||||
name = job.get('entity_name') or payload.get('Name') or payload.get('crm_name') or payload.get('FullName') or payload.get('ContactName')
|
||||
if not name and payload.get('Firstname'):
|
||||
name = f"{payload.get('Firstname')} {payload.get('Lastname', '')}".strip()
|
||||
if name: target_run["name"] = name
|
||||
|
||||
if not target_run["associate"] and job.get('associate_name'):
|
||||
target_run["associate"] = job['associate_name']
|
||||
|
||||
# Update Status based on the jobs in the run
|
||||
|
||||
# Update Status based on the jobs in the run
|
||||
# Priority: FAILED > PROCESSING > COMPLETED > SKIPPED > PENDING
|
||||
status_priority = {"FAILED": 4, "PROCESSING": 3, "COMPLETED": 2, "SKIPPED": 1, "PENDING": 0}
|
||||
current_prio = status_priority.get(target_run["status"], -1)
|
||||
new_prio = status_priority.get(job["status"], -1)
|
||||
|
||||
# CRITICAL: We only update the status if the new job has a HIGHER priority
|
||||
# Example: If current is COMPLETED (2) and new is SKIPPED (1), we keep COMPLETED.
|
||||
if new_prio > current_prio:
|
||||
target_run["status"] = job["status"]
|
||||
target_run["error_msg"] = job["error_msg"]
|
||||
@@ -244,12 +273,16 @@ class JobQueue:
|
||||
# Set visual phases based on status
|
||||
if job["status"] == "COMPLETED":
|
||||
target_run["phases"] = {"received": "completed", "enriching": "completed", "syncing": "completed", "completed": "completed"}
|
||||
elif job["status"] == "SKIPPED" and current_prio < 2: # Don't downgrade from COMPLETED
|
||||
target_run["phases"] = {"received": "completed", "enriching": "completed", "syncing": "completed", "completed": "completed"}
|
||||
elif job["status"] == "FAILED":
|
||||
target_run["phases"] = {"received": "completed", "enriching": "failed", "syncing": "pending", "completed": "pending"}
|
||||
elif job["status"] == "PROCESSING":
|
||||
target_run["phases"] = {"received": "completed", "enriching": "processing", "syncing": "pending", "completed": "pending"}
|
||||
# Note: SKIPPED (1) and PENDING (0) will use the target_run's initial phases or keep previous ones.
|
||||
|
||||
# SPECIAL CASE: If we already have COMPLETED but a new job is SKIPPED, we might want to keep the error_msg empty
|
||||
# to avoid showing "Skipped Echo" on a successful row.
|
||||
if target_run["status"] == "COMPLETED" and job["status"] == "SKIPPED":
|
||||
pass # Keep everything from the successful run
|
||||
|
||||
# Final cleanup
|
||||
for r in runs:
|
||||
|
||||
Reference in New Issue
Block a user