Files
Brancheneinstufung2/connector-superoffice/queue_manager.py
Floke 65aaff3936 feat(connector): [31e88f42] Implement robust de-duplication at ingress
This commit addresses the issue of duplicate jobs being created by the SuperOffice connector.

The root cause was identified as a race condition where SuperOffice would send multiple  webhooks in quick succession for the same entity, leading to multiple identical jobs in the queue.

The solution involves several layers of improvement:
1.  **Ingress De-duplication:** The  now checks for existing  jobs for the same entity *before* adding a new job to the queue. This is the primary fix and prevents duplicates at the source.
2.  **DB Schema Enhancement:** The  table schema in  was extended with an  column to allow for reliable and efficient checking of duplicate entities.
3.  **Improved Logging:** The log messages in  for job retries (e.g., when waiting for the Company Explorer) have been made more descriptive to avoid confusion and false alarms.
2026-03-09 12:50:32 +00:00

414 lines
19 KiB
Python

import sqlite3
import json
from datetime import datetime, timedelta
import os
import logging
logger = logging.getLogger("connector-queue")
# HARDCODED PATH TO FORCE CONSISTENCY
DB_PATH = "/data/connector_queue.db"
class JobQueue:
def __init__(self):
self._init_db()
def _init_db(self):
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
# Revert to default journal mode for problematic Synology mounts
conn.execute("PRAGMA journal_mode=DELETE")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA mmap_size=0")
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT,
payload TEXT,
entity_name TEXT,
status TEXT DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
error_msg TEXT,
next_try_at TIMESTAMP,
retry_count INTEGER DEFAULT 0
)
""")
# Migration for existing DBs
try: conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP")
except sqlite3.OperationalError: pass
try: conn.execute("ALTER TABLE jobs ADD COLUMN entity_name TEXT")
except sqlite3.OperationalError: pass
try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT")
except sqlite3.OperationalError: pass
try: conn.execute("ALTER TABLE jobs ADD COLUMN entity_id INTEGER")
except sqlite3.OperationalError: pass
try: conn.execute("ALTER TABLE jobs ADD COLUMN retry_count INTEGER DEFAULT 0")
except sqlite3.OperationalError: pass
conn.commit()
logger.info("Database initialized with PRAGMA settings (DELETE, NORMAL, mmap=0).")
except Exception as e:
logger.critical(f"❌ CRITICAL DB INIT ERROR: {e}", exc_info=True)
raise
def add_job(self, event_type: str, payload: dict):
entity_id = payload.get("PrimaryKey")
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
conn.execute(
"INSERT INTO jobs (event_type, payload, status, entity_id) VALUES (?, ?, ?, ?)",
(event_type, json.dumps(payload), 'PENDING', entity_id)
)
conn.commit()
logger.debug(f"Job added: {event_type} for entity {entity_id}")
except Exception as e:
logger.error(f"❌ Failed to add job: {e}", exc_info=True)
conn.rollback()
raise
def is_duplicate_pending(self, event_type: str, payload: dict) -> bool:
"""
Checks if a job with the same event_type and entity_id is already PENDING.
This is to prevent adding duplicate jobs from webhooks that are sent multiple times.
"""
# We only want to de-duplicate creation events, as change events can be validly stacked.
if "created" not in event_type.lower():
return False
entity_id = payload.get("PrimaryKey")
if not entity_id:
return False # Cannot de-duplicate without a key
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM jobs
WHERE event_type = ?
AND entity_id = ?
AND status = 'PENDING'
AND created_at >= ?
""", (event_type, entity_id, five_minutes_ago))
if existing_job := cursor.fetchone():
logger.warning(f"Found PENDING duplicate of job {existing_job[0]} for event '{event_type}' and entity '{entity_id}'. Ignoring new webhook.")
return True
return False
except Exception as e:
logger.error(f"❌ Failed to check for PENDING duplicate jobs for entity '{entity_id}': {e}", exc_info=True)
return False # Fail safe: better to have a duplicate than to miss an event.
def update_entity_name(self, job_id, name, associate_name=None):
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
if associate_name:
conn.execute(
"UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?",
(str(name), str(associate_name), job_id)
)
else:
conn.execute(
"UPDATE jobs SET entity_name = ?, updated_at = datetime('now') WHERE id = ?",
(str(name), job_id)
)
conn.commit()
logger.debug(f"Entity name updated for job {job_id}")
except Exception as e:
logger.error(f"❌ Failed to update entity name for job {job_id}: {e}", exc_info=True)
conn.rollback()
raise
def get_next_job(self):
job = None
with sqlite3.connect(DB_PATH, timeout=30) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
cursor.execute("""
SELECT id, event_type, payload, created_at, retry_count
FROM jobs
WHERE status = 'PENDING'
AND (next_try_at IS NULL OR next_try_at <= datetime('now'))
ORDER BY created_at ASC
LIMIT 1
""")
row = cursor.fetchone()
if row:
job = dict(row)
cursor.execute(
"UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?",
(job['id'],)
)
conn.commit()
logger.info(f"Fetched and marked job {job['id']} as PROCESSING.")
except Exception as e:
logger.error(f"❌ Failed to get next job or mark as PROCESSING: {e}", exc_info=True)
conn.rollback()
if job:
try:
job['payload'] = json.loads(job['payload'])
except Exception as e:
logger.error(f"❌ Failed to parse payload for job {job['id']}: {e}")
return None
return job
def retry_job_later(self, job_id, current_retry_count, delay_seconds=60, error_msg=None):
MAX_RETRIES = 5
if current_retry_count >= MAX_RETRIES:
fail_msg = f"Job reached maximum retry limit of {MAX_RETRIES}. Last error: {error_msg}"
logger.error(f"Job {job_id} FAILED permanently: {fail_msg}")
self.fail_job(job_id, fail_msg)
return
next_try = datetime.utcnow() + timedelta(seconds=delay_seconds)
new_retry_count = current_retry_count + 1
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
conn.execute(
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ?, retry_count = ? WHERE id = ?",
(next_try, str(error_msg), new_retry_count, job_id)
)
conn.commit()
logger.warning(f"Job {job_id} set to RETRY (Attempt {new_retry_count}/{MAX_RETRIES}). Next attempt at {next_try}.")
except Exception as e:
logger.error(f"❌ Failed to set job {job_id} to RETRY: {e}", exc_info=True)
conn.rollback()
def complete_job(self, job_id):
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
conn.execute(
"UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?",
(job_id,)
)
conn.commit()
logger.info(f"Job {job_id} COMPLETED.")
except Exception as e:
logger.error(f"❌ Failed to set job {job_id} to COMPLETED: {e}", exc_info=True)
conn.rollback()
def skip_job(self, job_id, reason):
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
conn.execute(
"UPDATE jobs SET status = 'SKIPPED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
(str(reason), job_id)
)
conn.commit()
logger.info(f"Job {job_id} SKIPPED: {reason}")
except Exception as e:
logger.error(f"❌ Failed to set job {job_id} to SKIPPED: {e}", exc_info=True)
conn.rollback()
def mark_as_deleted(self, job_id, reason):
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
conn.execute(
"UPDATE jobs SET status = 'DELETED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
(str(reason), job_id)
)
conn.commit()
logger.info(f"Job {job_id} DELETED: {reason}")
except Exception as e:
logger.error(f"❌ Failed to set job {job_id} to DELETED: {e}", exc_info=True)
conn.rollback()
def fail_job(self, job_id, error_msg):
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
conn.execute(
"UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
(str(error_msg), job_id)
)
conn.commit()
logger.error(f"Job {job_id} FAILED: {error_msg}")
except Exception as e:
logger.critical(f"❌ CRITICAL: Failed to set job {job_id} to FAILED: {e}", exc_info=True)
conn.rollback()
def check_for_recent_duplicate(self, entity_name: str, current_job_id: int) -> bool:
"""
Checks if another job with the same entity_name has been processed or is processing recently.
"""
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM jobs
WHERE entity_name = ?
AND id != ?
AND status IN ('COMPLETED', 'PROCESSING')
AND created_at >= ?
""", (entity_name, current_job_id, five_minutes_ago))
if cursor.fetchone():
logger.warning(f"Found recent duplicate job for entity '{entity_name}' (related to job {current_job_id}).")
return True
return False
except Exception as e:
logger.error(f"❌ Failed to check for duplicate jobs for entity '{entity_name}': {e}", exc_info=True)
return False # Fail safe
def get_stats(self):
with sqlite3.connect(DB_PATH, timeout=30) as conn:
cursor = conn.cursor()
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
return dict(cursor.fetchall())
def get_recent_jobs(self, limit=50):
with sqlite3.connect(DB_PATH, timeout=30) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT id, event_type, status, created_at, updated_at, error_msg, payload, entity_name, associate_name
FROM jobs
ORDER BY updated_at DESC, created_at DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
results = []
for row in rows:
r = dict(row)
try:
r['payload'] = json.loads(r['payload'])
except:
pass
results.append(r)
return results
def get_account_summary(self, limit=1000):
"""
Groups recent jobs into logical 'Sync-Runs' using time-gap clustering.
If a job for the same ID is more than 15 mins apart, it's a new run.
"""
jobs = self.get_recent_jobs(limit=limit)
runs = []
# Temporary storage to track the latest run for each ID
# Format: { 'C123': [run_obj1, run_obj2, ...] }
id_to_runs = {}
# Jobs are sorted by updated_at DESC (newest first)
for job in jobs:
payload = job.get('payload', {})
c_id = payload.get('ContactId')
p_id = payload.get('PersonId')
if not c_id and payload.get('PrimaryKey') and 'contact' in job['event_type'].lower():
c_id = payload.get('PrimaryKey')
if not p_id and payload.get('PrimaryKey') and 'person' in job['event_type'].lower():
p_id = payload.get('PrimaryKey')
if not c_id and not p_id:
continue
entity_id = f"P{p_id}" if p_id else f"C{c_id}"
job_time = datetime.strptime(job['updated_at'], "%Y-%m-%d %H:%M:%S")
target_run = None
# Check if we can attach this job to an existing (newer) run cluster
if entity_id in id_to_runs:
for run in id_to_runs[entity_id]:
run_latest_time = datetime.strptime(run['updated_at'], "%Y-%m-%d %H:%M:%S")
if abs((run_latest_time - job_time).total_seconds()) < 900:
target_run = run
break
if not target_run:
# Start a new run cluster
target_run = {
"id": f"{entity_id}_{job['id']}", # Unique ID for this run row
"entity_id": entity_id,
"contact_id": c_id,
"person_id": p_id,
"name": job.get('entity_name') or "Unknown",
"associate": job.get('associate_name') or "",
"last_event": job['event_type'],
"status": job['status'],
"created_at": job['created_at'],
"updated_at": job['updated_at'],
"error_msg": job['error_msg'],
"job_count": 0,
"duration": "0s",
"phases": {
"received": "pending",
"enriching": "pending",
"syncing": "pending",
"completed": "pending"
}
}
runs.append(target_run)
if entity_id not in id_to_runs:
id_to_runs[entity_id] = []
id_to_runs[entity_id].append(target_run)
# Update the run with job info
target_run["job_count"] += 1
# Update oldest start time (since we iterate newest -> oldest)
target_run["created_at"] = job["created_at"]
# Calculate Duration for this run
try:
start = datetime.strptime(target_run["created_at"], "%Y-%m-%d %H:%M:%S")
end = datetime.strptime(target_run["updated_at"], "%Y-%m-%d %H:%M:%S")
diff = end - start
seconds = int(diff.total_seconds())
# Display a minimal duration for skipped runs to avoid confusion
if target_run["status"] == "SKIPPED":
target_run["duration"] = "~1s"
else:
target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s"
except: pass
# Resolve Name & Associate (if not already set from a newer job in this cluster)
if target_run["name"] == "Unknown":
name = job.get('entity_name') or payload.get('Name') or payload.get('crm_name') or payload.get('FullName') or payload.get('ContactName')
if not name and payload.get('Firstname'):
name = f"{payload.get('Firstname')} {payload.get('Lastname', '')}".strip()
if name: target_run["name"] = name
if not target_run["associate"] and job.get('associate_name'):
target_run["associate"] = job['associate_name']
# Update Status based on the jobs in the run
# Update Status based on the jobs in the run
# Priority: FAILED > PROCESSING > COMPLETED > SKIPPED > PENDING
status_priority = {"FAILED": 4, "PROCESSING": 3, "COMPLETED": 2, "SKIPPED": 1, "PENDING": 0}
current_prio = status_priority.get(target_run["status"], -1)
new_prio = status_priority.get(job["status"], -1)
# CRITICAL: We only update the status if the new job has a HIGHER priority
# Example: If current is COMPLETED (2) and new is SKIPPED (1), we keep COMPLETED.
if new_prio > current_prio:
target_run["status"] = job["status"]
target_run["error_msg"] = job["error_msg"]
# Set visual phases based on status
if job["status"] == "COMPLETED":
target_run["phases"] = {"received": "completed", "enriching": "completed", "syncing": "completed", "completed": "completed"}
elif job["status"] == "FAILED":
target_run["phases"] = {"received": "completed", "enriching": "failed", "syncing": "pending", "completed": "pending"}
elif job["status"] == "PROCESSING":
target_run["phases"] = {"received": "completed", "enriching": "processing", "syncing": "pending", "completed": "pending"}
# Note: SKIPPED (1) and PENDING (0) will use the target_run's initial phases or keep previous ones.
# SPECIAL CASE: If we already have COMPLETED but a new job is SKIPPED, we might want to keep the error_msg empty
# to avoid showing "Skipped Echo" on a successful row.
if target_run["status"] == "COMPLETED" and job["status"] == "SKIPPED":
pass # Keep everything from the successful run
# Final cleanup
for r in runs:
if r["name"] == "Unknown": r["name"] = f"Entity {r['entity_id']}"
return runs