Files
Brancheneinstufung2/connector-superoffice/queue_manager.py
Floke abdbb84938 [30e88f42] ✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf
✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf
  mit Echtdaten abgeschlossen.
2026-02-22 08:20:28 +00:00

128 lines
4.4 KiB
Python

import sqlite3
import json
from datetime import datetime, timedelta
import os
DB_PATH = os.getenv("DB_PATH", "connector_queue.db")
class JobQueue:
def __init__(self):
self._init_db()
def _init_db(self):
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT,
payload TEXT,
status TEXT DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
error_msg TEXT,
next_try_at TIMESTAMP
)
""")
# Migration for existing DBs
try:
conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP")
except sqlite3.OperationalError:
pass
def add_job(self, event_type: str, payload: dict):
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)",
(event_type, json.dumps(payload), 'PENDING')
)
def get_next_job(self):
"""
Atomically fetches the next pending job where next_try_at is reached.
"""
job = None
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Lock the job
cursor.execute("BEGIN EXCLUSIVE")
try:
cursor.execute("""
SELECT id, event_type, payload, created_at
FROM jobs
WHERE status = 'PENDING'
AND (next_try_at IS NULL OR next_try_at <= datetime('now'))
ORDER BY created_at ASC
LIMIT 1
""")
row = cursor.fetchone()
if row:
job = dict(row)
# Mark as processing
cursor.execute(
"UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?",
(job['id'],)
)
conn.commit()
else:
conn.rollback() # No job found
except Exception:
conn.rollback()
raise
if job:
job['payload'] = json.loads(job['payload'])
return job
def retry_job_later(self, job_id, delay_seconds=60):
next_try = datetime.utcnow() + timedelta(seconds=delay_seconds)
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?",
(next_try, job_id)
)
def complete_job(self, job_id):
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?",
(job_id,)
)
def fail_job(self, job_id, error_msg):
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
(str(error_msg), job_id)
)
def get_stats(self):
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
return dict(cursor.fetchall())
def get_recent_jobs(self, limit=50):
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT id, event_type, status, created_at, updated_at, error_msg, payload
FROM jobs
ORDER BY updated_at DESC, created_at DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
results = []
for row in rows:
r = dict(row)
try:
r['payload'] = json.loads(r['payload'])
except:
pass
results.append(r)
return results