From 434524338ddc5d4709b7559e5e11b042a94dde6b Mon Sep 17 00:00:00 2001 From: Floke Date: Wed, 4 Mar 2026 18:21:20 +0000 Subject: [PATCH] Dashboard: Implement status prioritization in Sync-Runs (COMPLETED over SKIPPED) [31188f42] --- connector-superoffice/queue_manager.py | 137 ++++++++++++++----------- 1 file changed, 76 insertions(+), 61 deletions(-) diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index a1974b8a..83703f18 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -98,6 +98,17 @@ class JobQueue: (job_id,) ) + def skip_job(self, job_id, reason): + """ + Marks a job as SKIPPED (intentionally not processed). + Reason is stored in error_msg for visibility. + """ + with sqlite3.connect(DB_PATH) as conn: + conn.execute( + "UPDATE jobs SET status = 'SKIPPED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", + (str(reason), job_id) + ) + def fail_job(self, job_id, error_msg): with sqlite3.connect(DB_PATH) as conn: conn.execute( @@ -134,18 +145,21 @@ class JobQueue: def get_account_summary(self, limit=1000): """ - Groups recent jobs by ContactId/PersonId and returns a summary status. + Groups recent jobs into logical 'Sync-Runs' using time-gap clustering. + If a job for the same ID is more than 15 mins apart, it's a new run. """ jobs = self.get_recent_jobs(limit=limit) - accounts = {} + runs = [] + # Temporary storage to track the latest run for each ID + # Format: { 'C123': [run_obj1, run_obj2, ...] } + id_to_runs = {} + # Jobs are sorted by updated_at DESC (newest first) for job in jobs: payload = job.get('payload', {}) - # Try to find IDs c_id = payload.get('ContactId') p_id = payload.get('PersonId') - # Fallback for cascaded jobs or primary keys if not c_id and payload.get('PrimaryKey') and 'contact' in job['event_type'].lower(): c_id = payload.get('PrimaryKey') if not p_id and payload.get('PrimaryKey') and 'person' in job['event_type'].lower(): @@ -154,90 +168,91 @@ class JobQueue: if not c_id and not p_id: continue - # Create a unique key for the entity - key = f"P{p_id}" if p_id else f"C{c_id}" + entity_id = f"P{p_id}" if p_id else f"C{c_id}" + job_time = datetime.strptime(job['updated_at'], "%Y-%m-%d %H:%M:%S") - if key not in accounts: - accounts[key] = { - "id": key, + target_run = None + + # Check if we can attach this job to an existing (newer) run cluster + if entity_id in id_to_runs: + for run in id_to_runs[entity_id]: + run_latest_time = datetime.strptime(run['updated_at'], "%Y-%m-%d %H:%M:%S") + # If this job is within 15 mins of the run's activity + if abs((run_latest_time - job_time).total_seconds()) < 900: + target_run = run + break + + if not target_run: + # Start a new run cluster + target_run = { + "id": f"{entity_id}_{job['id']}", # Unique ID for this run row + "entity_id": entity_id, "contact_id": c_id, "person_id": p_id, "name": "Unknown", "last_event": job['event_type'], "status": job['status'], - "created_at": job['created_at'], # Oldest job in group (since we sort by DESC) - "updated_at": job['updated_at'], # Most recent job + "created_at": job['created_at'], + "updated_at": job['updated_at'], "error_msg": job['error_msg'], "job_count": 0, "duration": "0s", "phases": { - "received": "completed", + "received": "pending", "enriching": "pending", "syncing": "pending", "completed": "pending" } } - - acc = accounts[key] - acc["job_count"] += 1 + runs.append(target_run) + if entity_id not in id_to_runs: + id_to_runs[entity_id] = [] + id_to_runs[entity_id].append(target_run) - # Update duration + # Update the run with job info + target_run["job_count"] += 1 + + # Update oldest start time (since we iterate newest -> oldest) + target_run["created_at"] = job["created_at"] + + # Calculate Duration for this run try: - # We want the absolute start (oldest created_at) - # Since jobs are DESC, the last one we iterate through for a key is the oldest - acc["created_at"] = job["created_at"] - - start = datetime.strptime(acc["created_at"], "%Y-%m-%d %H:%M:%S") - end = datetime.strptime(acc["updated_at"], "%Y-%m-%d %H:%M:%S") + start = datetime.strptime(target_run["created_at"], "%Y-%m-%d %H:%M:%S") + end = datetime.strptime(target_run["updated_at"], "%Y-%m-%d %H:%M:%S") diff = end - start seconds = int(diff.total_seconds()) - if seconds < 60: - acc["duration"] = f"{seconds}s" - else: - acc["duration"] = f"{seconds // 60}m {seconds % 60}s" - except Exception: - pass + target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s" + except: pass - # Try to resolve 'Unknown' name from any job in the group - if acc["name"] == "Unknown": + # Resolve Name + if target_run["name"] == "Unknown": name = payload.get('Name') or payload.get('crm_name') or payload.get('FullName') or payload.get('ContactName') if not name and payload.get('Firstname'): name = f"{payload.get('Firstname')} {payload.get('Lastname', '')}".strip() - if name: - acc["name"] = name + if name: target_run["name"] = name - # Update overall status based on most recent job - # (Assuming jobs are sorted by updated_at DESC) - if acc["job_count"] == 1: - acc["status"] = job["status"] - acc["updated_at"] = job["updated_at"] - acc["error_msg"] = job["error_msg"] + # Update Status based on the jobs in the run + # Priority: FAILED > PROCESSING > COMPLETED > SKIPPED > PENDING + status_priority = {"FAILED": 4, "PROCESSING": 3, "COMPLETED": 2, "SKIPPED": 1, "PENDING": 0} + current_prio = status_priority.get(target_run["status"], -1) + new_prio = status_priority.get(job["status"], -1) + + if new_prio > current_prio: + target_run["status"] = job["status"] + target_run["error_msg"] = job["error_msg"] - # Determine Phase + # Set visual phases based on status if job["status"] == "COMPLETED": - acc["phases"] = { - "received": "completed", - "enriching": "completed", - "syncing": "completed", - "completed": "completed" - } + target_run["phases"] = {"received": "completed", "enriching": "completed", "syncing": "completed", "completed": "completed"} + elif job["status"] == "SKIPPED" and current_prio < 2: # Don't downgrade from COMPLETED + target_run["phases"] = {"received": "completed", "enriching": "completed", "syncing": "completed", "completed": "completed"} elif job["status"] == "FAILED": - acc["phases"]["received"] = "completed" - acc["phases"]["enriching"] = "failed" + target_run["phases"] = {"received": "completed", "enriching": "failed", "syncing": "pending", "completed": "pending"} elif job["status"] == "PROCESSING": - acc["phases"]["received"] = "completed" - acc["phases"]["enriching"] = "processing" - elif job["status"] == "PENDING": - acc["phases"]["received"] = "completed" - # If it has an error msg like 'processing', it's in enriching - if job["error_msg"] and "processing" in job["error_msg"].lower(): - acc["phases"]["enriching"] = "processing" - else: - acc["phases"]["received"] = "processing" + target_run["phases"] = {"received": "completed", "enriching": "processing", "syncing": "pending", "completed": "pending"} - # Final cleanup for names - for acc in accounts.values(): - if acc["name"] == "Unknown": - acc["name"] = f"Entity {acc['id']}" + # Final cleanup + for r in runs: + if r["name"] == "Unknown": r["name"] = f"Entity {r['entity_id']}" - return list(accounts.values()) + return runs