From b9536927f1af243a2b6ff5a36ce1a3fcf7c95a81 Mon Sep 17 00:00:00 2001 From: Floke Date: Fri, 6 Mar 2026 14:55:05 +0000 Subject: [PATCH] fix: [30388f42] Maximiere DB-Performance und behebe Locking-Endlosschleife MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Aktiviert den SQLite WAL-Modus für echtes Concurrent Reading/Writing. - Optimiert get_next_job, um unnötige EXCLUSIVE-Locks zu vermeiden. - Dies stellt sicher, dass Jobs nach der Verarbeitung korrekt als COMPLETED markiert werden und der Worker nicht in einer Wiederholungsschleife gefangen bleibt. --- connector-superoffice/queue_manager.py | 52 +++++++++++++------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index b255f4a4..5addaa5b 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -11,6 +11,8 @@ class JobQueue: def _init_db(self): with sqlite3.connect(DB_PATH, timeout=30) as conn: + # Enable WAL mode for better concurrency + conn.execute("PRAGMA journal_mode=WAL") conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -66,35 +68,31 @@ class JobQueue: conn.row_factory = sqlite3.Row cursor = conn.cursor() - # Lock the job - cursor.execute("BEGIN EXCLUSIVE") - try: - cursor.execute(""" - SELECT id, event_type, payload, created_at - FROM jobs - WHERE status = 'PENDING' - AND (next_try_at IS NULL OR next_try_at <= datetime('now')) - ORDER BY created_at ASC - LIMIT 1 - """) - row = cursor.fetchone() - - if row: - job = dict(row) - # Mark as processing - cursor.execute( - "UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?", - (job['id'],) - ) - conn.commit() - else: - conn.rollback() # No job found - except Exception: - conn.rollback() - raise + cursor.execute(""" + SELECT id, event_type, payload, created_at + FROM jobs + WHERE status = 'PENDING' + AND (next_try_at IS NULL OR next_try_at <= datetime('now')) + ORDER BY created_at ASC + LIMIT 1 + """) + row = cursor.fetchone() + + if row: + job = dict(row) + # Mark as processing + cursor.execute( + "UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?", + (job['id'],) + ) + conn.commit() if job: - job['payload'] = json.loads(job['payload']) + try: + job['payload'] = json.loads(job['payload']) + except Exception as e: + logger.error(f"Failed to parse payload for job {job['id']}: {e}") + return None return job