From 2b9c4b737e5f3bd635334e8c383dfc41c5dfbbde Mon Sep 17 00:00:00 2001 From: Floke Date: Fri, 6 Mar 2026 13:54:09 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20[30388f42]=20Erh=C3=B6he=20SQLite=20Time?= =?UTF-8?q?out=20auf=2030s=20in=20queue=5Fmanager.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Setzt für alle -Aufrufe. - Behebt unter hoher Last, wenn Dashboard und Worker gleichzeitig zugreifen. --- connector-superoffice/queue_manager.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index 4b0f2a1f..b255f4a4 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -10,7 +10,7 @@ class JobQueue: self._init_db() def _init_db(self): - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -38,14 +38,14 @@ class JobQueue: except sqlite3.OperationalError: pass def add_job(self, event_type: str, payload: dict): - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute( "INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)", (event_type, json.dumps(payload), 'PENDING') ) def update_entity_name(self, job_id, name, associate_name=None): - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: if associate_name: conn.execute( "UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?", @@ -62,7 +62,7 @@ class JobQueue: Atomically fetches the next pending job where next_try_at is reached. """ job = None - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() @@ -100,7 +100,7 @@ class JobQueue: def retry_job_later(self, job_id, delay_seconds=60, error_msg=None): next_try = datetime.utcnow() + timedelta(seconds=delay_seconds) - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: if error_msg: conn.execute( "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?", @@ -113,7 +113,7 @@ class JobQueue: ) def complete_job(self, job_id): - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute( "UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?", (job_id,) @@ -124,7 +124,7 @@ class JobQueue: Marks a job as SKIPPED (intentionally not processed). Reason is stored in error_msg for visibility. """ - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute( "UPDATE jobs SET status = 'SKIPPED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", (str(reason), job_id) @@ -134,27 +134,27 @@ class JobQueue: """ Marks a job as DELETED (entity no longer exists in source system). """ - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute( "UPDATE jobs SET status = 'DELETED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", (str(reason), job_id) ) def fail_job(self, job_id, error_msg): - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.execute( "UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", (str(error_msg), job_id) ) def get_stats(self): - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: cursor = conn.cursor() cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status") return dict(cursor.fetchall()) def get_recent_jobs(self, limit=50): - with sqlite3.connect(DB_PATH) as conn: + with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("""