diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index b76836ad..4b0f2a1f 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -130,6 +130,16 @@ class JobQueue: (str(reason), job_id) ) + def mark_as_deleted(self, job_id, reason): + """ + Marks a job as DELETED (entity no longer exists in source system). + """ + with sqlite3.connect(DB_PATH) as conn: + conn.execute( + "UPDATE jobs SET status = 'DELETED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", + (str(reason), job_id) + ) + def fail_job(self, job_id, error_msg): with sqlite3.connect(DB_PATH) as conn: conn.execute( diff --git a/connector-superoffice/webhook_app.py b/connector-superoffice/webhook_app.py index 1684e4d7..a031fa4a 100644 --- a/connector-superoffice/webhook_app.py +++ b/connector-superoffice/webhook_app.py @@ -102,6 +102,7 @@ def dashboard(): .status-COMPLETED { background: #064e3b; color: #a7f3d0; } .status-FAILED { background: #7f1d1d; color: #fecaca; } .status-SKIPPED { background: #475569; color: #cbd5e1; } + .status-DELETED { background: #7c2d12; color: #fdba74; } .phases { display: flex; gap: 4px; align-items: center; } .phase { width: 12px; height: 12px; border-radius: 50%; background: #334155; border: 2px solid #1e293b; box-shadow: 0 0 0 1px #334155; } diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py index 0d953c72..a184a6b3 100644 --- a/connector-superoffice/worker.py +++ b/connector-superoffice/worker.py @@ -37,9 +37,9 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue): """ Core logic for processing a single job. Returns: (STATUS, MESSAGE) - STATUS: 'SUCCESS', 'SKIPPED', 'RETRY', 'FAILED' + STATUS: 'SUCCESS', 'SKIPPED', 'DELETED', 'RETRY', 'FAILED' """ - logger.info(f"--- [WORKER v1.9.2] Processing Job {job['id']} ({job['event_type']}) ---") + logger.info(f"--- [WORKER v1.9.3] Processing Job {job['id']} ({job['event_type']}) ---") payload = job['payload'] event_low = job['event_type'].lower() @@ -102,7 +102,7 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue): except ContactNotFoundException: msg = f"Skipping job because Person ID {person_id} was not found in SuperOffice (likely deleted)." logger.warning(msg) - return ("SKIPPED", msg) + return ("DELETED", msg) except Exception as e: logger.warning(f"Failed to fetch person details for {person_id}: {e}") @@ -155,7 +155,7 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue): pass if not is_robo: - msg = f"Skipped, Wackler. Contact {contact_id} ('{crm_name}'): Owner '{aname}' is not in Roboplanet whitelist." + msg = f"WACKLER FILTER: Contact {contact_id} ('{crm_name}') belongs to Associate '{aname}' (not in whitelist). Skipping." logger.info(f"⏭️ {msg}") return ("SKIPPED", msg) @@ -210,11 +210,12 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue): except ContactNotFoundException: msg = f"Skipping job because Contact ID {contact_id} was not found in SuperOffice (likely deleted)." logger.warning(msg) - return ("SKIPPED", msg) + return ("DELETED", msg) except Exception as e: logger.error(f"Failed to fetch contact details for {contact_id}: {e}") raise Exception(f"SuperOffice API Failure: {e}") + # --- 3. Company Explorer Provisioning --- ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" ce_req = { @@ -391,6 +392,8 @@ def run_worker(): queue.fail_job(job['id'], msg or "Job failed status") elif status == "SKIPPED": queue.skip_job(job['id'], msg or "Skipped") + elif status == "DELETED": + queue.mark_as_deleted(job['id'], msg or "Deleted in SuperOffice") else: queue.complete_job(job['id'])