Dashboard: Implement status prioritization in Sync-Runs (COMPLETED over SKIPPED) [31188f42]
This commit is contained in:
@@ -98,6 +98,17 @@ class JobQueue:
|
|||||||
(job_id,)
|
(job_id,)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def skip_job(self, job_id, reason):
|
||||||
|
"""
|
||||||
|
Marks a job as SKIPPED (intentionally not processed).
|
||||||
|
Reason is stored in error_msg for visibility.
|
||||||
|
"""
|
||||||
|
with sqlite3.connect(DB_PATH) as conn:
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE jobs SET status = 'SKIPPED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
|
||||||
|
(str(reason), job_id)
|
||||||
|
)
|
||||||
|
|
||||||
def fail_job(self, job_id, error_msg):
|
def fail_job(self, job_id, error_msg):
|
||||||
with sqlite3.connect(DB_PATH) as conn:
|
with sqlite3.connect(DB_PATH) as conn:
|
||||||
conn.execute(
|
conn.execute(
|
||||||
@@ -134,18 +145,21 @@ class JobQueue:
|
|||||||
|
|
||||||
def get_account_summary(self, limit=1000):
|
def get_account_summary(self, limit=1000):
|
||||||
"""
|
"""
|
||||||
Groups recent jobs by ContactId/PersonId and returns a summary status.
|
Groups recent jobs into logical 'Sync-Runs' using time-gap clustering.
|
||||||
|
If a job for the same ID is more than 15 mins apart, it's a new run.
|
||||||
"""
|
"""
|
||||||
jobs = self.get_recent_jobs(limit=limit)
|
jobs = self.get_recent_jobs(limit=limit)
|
||||||
accounts = {}
|
runs = []
|
||||||
|
# Temporary storage to track the latest run for each ID
|
||||||
|
# Format: { 'C123': [run_obj1, run_obj2, ...] }
|
||||||
|
id_to_runs = {}
|
||||||
|
|
||||||
|
# Jobs are sorted by updated_at DESC (newest first)
|
||||||
for job in jobs:
|
for job in jobs:
|
||||||
payload = job.get('payload', {})
|
payload = job.get('payload', {})
|
||||||
# Try to find IDs
|
|
||||||
c_id = payload.get('ContactId')
|
c_id = payload.get('ContactId')
|
||||||
p_id = payload.get('PersonId')
|
p_id = payload.get('PersonId')
|
||||||
|
|
||||||
# Fallback for cascaded jobs or primary keys
|
|
||||||
if not c_id and payload.get('PrimaryKey') and 'contact' in job['event_type'].lower():
|
if not c_id and payload.get('PrimaryKey') and 'contact' in job['event_type'].lower():
|
||||||
c_id = payload.get('PrimaryKey')
|
c_id = payload.get('PrimaryKey')
|
||||||
if not p_id and payload.get('PrimaryKey') and 'person' in job['event_type'].lower():
|
if not p_id and payload.get('PrimaryKey') and 'person' in job['event_type'].lower():
|
||||||
@@ -154,90 +168,91 @@ class JobQueue:
|
|||||||
if not c_id and not p_id:
|
if not c_id and not p_id:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Create a unique key for the entity
|
entity_id = f"P{p_id}" if p_id else f"C{c_id}"
|
||||||
key = f"P{p_id}" if p_id else f"C{c_id}"
|
job_time = datetime.strptime(job['updated_at'], "%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
if key not in accounts:
|
target_run = None
|
||||||
accounts[key] = {
|
|
||||||
"id": key,
|
# Check if we can attach this job to an existing (newer) run cluster
|
||||||
|
if entity_id in id_to_runs:
|
||||||
|
for run in id_to_runs[entity_id]:
|
||||||
|
run_latest_time = datetime.strptime(run['updated_at'], "%Y-%m-%d %H:%M:%S")
|
||||||
|
# If this job is within 15 mins of the run's activity
|
||||||
|
if abs((run_latest_time - job_time).total_seconds()) < 900:
|
||||||
|
target_run = run
|
||||||
|
break
|
||||||
|
|
||||||
|
if not target_run:
|
||||||
|
# Start a new run cluster
|
||||||
|
target_run = {
|
||||||
|
"id": f"{entity_id}_{job['id']}", # Unique ID for this run row
|
||||||
|
"entity_id": entity_id,
|
||||||
"contact_id": c_id,
|
"contact_id": c_id,
|
||||||
"person_id": p_id,
|
"person_id": p_id,
|
||||||
"name": "Unknown",
|
"name": "Unknown",
|
||||||
"last_event": job['event_type'],
|
"last_event": job['event_type'],
|
||||||
"status": job['status'],
|
"status": job['status'],
|
||||||
"created_at": job['created_at'], # Oldest job in group (since we sort by DESC)
|
"created_at": job['created_at'],
|
||||||
"updated_at": job['updated_at'], # Most recent job
|
"updated_at": job['updated_at'],
|
||||||
"error_msg": job['error_msg'],
|
"error_msg": job['error_msg'],
|
||||||
"job_count": 0,
|
"job_count": 0,
|
||||||
"duration": "0s",
|
"duration": "0s",
|
||||||
"phases": {
|
"phases": {
|
||||||
"received": "completed",
|
"received": "pending",
|
||||||
"enriching": "pending",
|
"enriching": "pending",
|
||||||
"syncing": "pending",
|
"syncing": "pending",
|
||||||
"completed": "pending"
|
"completed": "pending"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
runs.append(target_run)
|
||||||
acc = accounts[key]
|
if entity_id not in id_to_runs:
|
||||||
acc["job_count"] += 1
|
id_to_runs[entity_id] = []
|
||||||
|
id_to_runs[entity_id].append(target_run)
|
||||||
|
|
||||||
# Update duration
|
# Update the run with job info
|
||||||
|
target_run["job_count"] += 1
|
||||||
|
|
||||||
|
# Update oldest start time (since we iterate newest -> oldest)
|
||||||
|
target_run["created_at"] = job["created_at"]
|
||||||
|
|
||||||
|
# Calculate Duration for this run
|
||||||
try:
|
try:
|
||||||
# We want the absolute start (oldest created_at)
|
start = datetime.strptime(target_run["created_at"], "%Y-%m-%d %H:%M:%S")
|
||||||
# Since jobs are DESC, the last one we iterate through for a key is the oldest
|
end = datetime.strptime(target_run["updated_at"], "%Y-%m-%d %H:%M:%S")
|
||||||
acc["created_at"] = job["created_at"]
|
|
||||||
|
|
||||||
start = datetime.strptime(acc["created_at"], "%Y-%m-%d %H:%M:%S")
|
|
||||||
end = datetime.strptime(acc["updated_at"], "%Y-%m-%d %H:%M:%S")
|
|
||||||
diff = end - start
|
diff = end - start
|
||||||
seconds = int(diff.total_seconds())
|
seconds = int(diff.total_seconds())
|
||||||
if seconds < 60:
|
target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s"
|
||||||
acc["duration"] = f"{seconds}s"
|
except: pass
|
||||||
else:
|
|
||||||
acc["duration"] = f"{seconds // 60}m {seconds % 60}s"
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Try to resolve 'Unknown' name from any job in the group
|
# Resolve Name
|
||||||
if acc["name"] == "Unknown":
|
if target_run["name"] == "Unknown":
|
||||||
name = payload.get('Name') or payload.get('crm_name') or payload.get('FullName') or payload.get('ContactName')
|
name = payload.get('Name') or payload.get('crm_name') or payload.get('FullName') or payload.get('ContactName')
|
||||||
if not name and payload.get('Firstname'):
|
if not name and payload.get('Firstname'):
|
||||||
name = f"{payload.get('Firstname')} {payload.get('Lastname', '')}".strip()
|
name = f"{payload.get('Firstname')} {payload.get('Lastname', '')}".strip()
|
||||||
if name:
|
if name: target_run["name"] = name
|
||||||
acc["name"] = name
|
|
||||||
|
|
||||||
# Update overall status based on most recent job
|
# Update Status based on the jobs in the run
|
||||||
# (Assuming jobs are sorted by updated_at DESC)
|
# Priority: FAILED > PROCESSING > COMPLETED > SKIPPED > PENDING
|
||||||
if acc["job_count"] == 1:
|
status_priority = {"FAILED": 4, "PROCESSING": 3, "COMPLETED": 2, "SKIPPED": 1, "PENDING": 0}
|
||||||
acc["status"] = job["status"]
|
current_prio = status_priority.get(target_run["status"], -1)
|
||||||
acc["updated_at"] = job["updated_at"]
|
new_prio = status_priority.get(job["status"], -1)
|
||||||
acc["error_msg"] = job["error_msg"]
|
|
||||||
|
if new_prio > current_prio:
|
||||||
|
target_run["status"] = job["status"]
|
||||||
|
target_run["error_msg"] = job["error_msg"]
|
||||||
|
|
||||||
# Determine Phase
|
# Set visual phases based on status
|
||||||
if job["status"] == "COMPLETED":
|
if job["status"] == "COMPLETED":
|
||||||
acc["phases"] = {
|
target_run["phases"] = {"received": "completed", "enriching": "completed", "syncing": "completed", "completed": "completed"}
|
||||||
"received": "completed",
|
elif job["status"] == "SKIPPED" and current_prio < 2: # Don't downgrade from COMPLETED
|
||||||
"enriching": "completed",
|
target_run["phases"] = {"received": "completed", "enriching": "completed", "syncing": "completed", "completed": "completed"}
|
||||||
"syncing": "completed",
|
|
||||||
"completed": "completed"
|
|
||||||
}
|
|
||||||
elif job["status"] == "FAILED":
|
elif job["status"] == "FAILED":
|
||||||
acc["phases"]["received"] = "completed"
|
target_run["phases"] = {"received": "completed", "enriching": "failed", "syncing": "pending", "completed": "pending"}
|
||||||
acc["phases"]["enriching"] = "failed"
|
|
||||||
elif job["status"] == "PROCESSING":
|
elif job["status"] == "PROCESSING":
|
||||||
acc["phases"]["received"] = "completed"
|
target_run["phases"] = {"received": "completed", "enriching": "processing", "syncing": "pending", "completed": "pending"}
|
||||||
acc["phases"]["enriching"] = "processing"
|
|
||||||
elif job["status"] == "PENDING":
|
|
||||||
acc["phases"]["received"] = "completed"
|
|
||||||
# If it has an error msg like 'processing', it's in enriching
|
|
||||||
if job["error_msg"] and "processing" in job["error_msg"].lower():
|
|
||||||
acc["phases"]["enriching"] = "processing"
|
|
||||||
else:
|
|
||||||
acc["phases"]["received"] = "processing"
|
|
||||||
|
|
||||||
# Final cleanup for names
|
# Final cleanup
|
||||||
for acc in accounts.values():
|
for r in runs:
|
||||||
if acc["name"] == "Unknown":
|
if r["name"] == "Unknown": r["name"] = f"Entity {r['entity_id']}"
|
||||||
acc["name"] = f"Entity {acc['id']}"
|
|
||||||
|
|
||||||
return list(accounts.values())
|
return runs
|
||||||
|
|||||||
Reference in New Issue
Block a user