Files
Brancheneinstufung2/connector-superoffice/queue_manager.py
2026-02-24 08:40:38 +00:00

244 lines
9.4 KiB
Python

import sqlite3
import json
from datetime import datetime, timedelta
import os
DB_PATH = os.getenv("DB_PATH", "connector_queue.db")
class JobQueue:
def __init__(self):
self._init_db()
def _init_db(self):
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT,
payload TEXT,
status TEXT DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
error_msg TEXT,
next_try_at TIMESTAMP
)
""")
# Migration for existing DBs
try:
conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP")
except sqlite3.OperationalError:
pass
def add_job(self, event_type: str, payload: dict):
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)",
(event_type, json.dumps(payload), 'PENDING')
)
def get_next_job(self):
"""
Atomically fetches the next pending job where next_try_at is reached.
"""
job = None
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Lock the job
cursor.execute("BEGIN EXCLUSIVE")
try:
cursor.execute("""
SELECT id, event_type, payload, created_at
FROM jobs
WHERE status = 'PENDING'
AND (next_try_at IS NULL OR next_try_at <= datetime('now'))
ORDER BY created_at ASC
LIMIT 1
""")
row = cursor.fetchone()
if row:
job = dict(row)
# Mark as processing
cursor.execute(
"UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?",
(job['id'],)
)
conn.commit()
else:
conn.rollback() # No job found
except Exception:
conn.rollback()
raise
if job:
job['payload'] = json.loads(job['payload'])
return job
def retry_job_later(self, job_id, delay_seconds=60, error_msg=None):
next_try = datetime.utcnow() + timedelta(seconds=delay_seconds)
with sqlite3.connect(DB_PATH) as conn:
if error_msg:
conn.execute(
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?",
(next_try, str(error_msg), job_id)
)
else:
conn.execute(
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?",
(next_try, job_id)
)
def complete_job(self, job_id):
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?",
(job_id,)
)
def fail_job(self, job_id, error_msg):
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
(str(error_msg), job_id)
)
def get_stats(self):
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
return dict(cursor.fetchall())
def get_recent_jobs(self, limit=50):
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT id, event_type, status, created_at, updated_at, error_msg, payload
FROM jobs
ORDER BY updated_at DESC, created_at DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
results = []
for row in rows:
r = dict(row)
try:
r['payload'] = json.loads(r['payload'])
except:
pass
results.append(r)
return results
def get_account_summary(self, limit=1000):
"""
Groups recent jobs by ContactId/PersonId and returns a summary status.
"""
jobs = self.get_recent_jobs(limit=limit)
accounts = {}
for job in jobs:
payload = job.get('payload', {})
# Try to find IDs
c_id = payload.get('ContactId')
p_id = payload.get('PersonId')
# Fallback for cascaded jobs or primary keys
if not c_id and payload.get('PrimaryKey') and 'contact' in job['event_type'].lower():
c_id = payload.get('PrimaryKey')
if not p_id and payload.get('PrimaryKey') and 'person' in job['event_type'].lower():
p_id = payload.get('PrimaryKey')
if not c_id and not p_id:
continue
# Create a unique key for the entity
key = f"P{p_id}" if p_id else f"C{c_id}"
if key not in accounts:
accounts[key] = {
"id": key,
"contact_id": c_id,
"person_id": p_id,
"name": "Unknown",
"last_event": job['event_type'],
"status": job['status'],
"created_at": job['created_at'], # Oldest job in group (since we sort by DESC)
"updated_at": job['updated_at'], # Most recent job
"error_msg": job['error_msg'],
"job_count": 0,
"duration": "0s",
"phases": {
"received": "completed",
"enriching": "pending",
"syncing": "pending",
"completed": "pending"
}
}
acc = accounts[key]
acc["job_count"] += 1
# Update duration
try:
# We want the absolute start (oldest created_at)
# Since jobs are DESC, the last one we iterate through for a key is the oldest
acc["created_at"] = job["created_at"]
start = datetime.strptime(acc["created_at"], "%Y-%m-%d %H:%M:%S")
end = datetime.strptime(acc["updated_at"], "%Y-%m-%d %H:%M:%S")
diff = end - start
seconds = int(diff.total_seconds())
if seconds < 60:
acc["duration"] = f"{seconds}s"
else:
acc["duration"] = f"{seconds // 60}m {seconds % 60}s"
except Exception:
pass
# Try to resolve 'Unknown' name from any job in the group
if acc["name"] == "Unknown":
name = payload.get('Name') or payload.get('crm_name') or payload.get('FullName') or payload.get('ContactName')
if not name and payload.get('Firstname'):
name = f"{payload.get('Firstname')} {payload.get('Lastname', '')}".strip()
if name:
acc["name"] = name
# Update overall status based on most recent job
# (Assuming jobs are sorted by updated_at DESC)
if acc["job_count"] == 1:
acc["status"] = job["status"]
acc["updated_at"] = job["updated_at"]
acc["error_msg"] = job["error_msg"]
# Determine Phase
if job["status"] == "COMPLETED":
acc["phases"] = {
"received": "completed",
"enriching": "completed",
"syncing": "completed",
"completed": "completed"
}
elif job["status"] == "FAILED":
acc["phases"]["received"] = "completed"
acc["phases"]["enriching"] = "failed"
elif job["status"] == "PROCESSING":
acc["phases"]["received"] = "completed"
acc["phases"]["enriching"] = "processing"
elif job["status"] == "PENDING":
acc["phases"]["received"] = "completed"
# If it has an error msg like 'processing', it's in enriching
if job["error_msg"] and "processing" in job["error_msg"].lower():
acc["phases"]["enriching"] = "processing"
else:
acc["phases"]["received"] = "processing"
# Final cleanup for names
for acc in accounts.values():
if acc["name"] == "Unknown":
acc["name"] = f"Entity {acc['id']}"
return list(accounts.values())