fix: [30388f42] Erhöhe SQLite Timeout auf 30s in queue_manager.py
- Setzt für alle -Aufrufe. - Behebt unter hoher Last, wenn Dashboard und Worker gleichzeitig zugreifen.
This commit is contained in:
@@ -10,7 +10,7 @@ class JobQueue:
|
||||
self._init_db()
|
||||
|
||||
def _init_db(self):
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
@@ -38,14 +38,14 @@ class JobQueue:
|
||||
except sqlite3.OperationalError: pass
|
||||
|
||||
def add_job(self, event_type: str, payload: dict):
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)",
|
||||
(event_type, json.dumps(payload), 'PENDING')
|
||||
)
|
||||
|
||||
def update_entity_name(self, job_id, name, associate_name=None):
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
if associate_name:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?",
|
||||
@@ -62,7 +62,7 @@ class JobQueue:
|
||||
Atomically fetches the next pending job where next_try_at is reached.
|
||||
"""
|
||||
job = None
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
|
||||
@@ -100,7 +100,7 @@ class JobQueue:
|
||||
|
||||
def retry_job_later(self, job_id, delay_seconds=60, error_msg=None):
|
||||
next_try = datetime.utcnow() + timedelta(seconds=delay_seconds)
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
if error_msg:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?",
|
||||
@@ -113,7 +113,7 @@ class JobQueue:
|
||||
)
|
||||
|
||||
def complete_job(self, job_id):
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?",
|
||||
(job_id,)
|
||||
@@ -124,7 +124,7 @@ class JobQueue:
|
||||
Marks a job as SKIPPED (intentionally not processed).
|
||||
Reason is stored in error_msg for visibility.
|
||||
"""
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET status = 'SKIPPED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
|
||||
(str(reason), job_id)
|
||||
@@ -134,27 +134,27 @@ class JobQueue:
|
||||
"""
|
||||
Marks a job as DELETED (entity no longer exists in source system).
|
||||
"""
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET status = 'DELETED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
|
||||
(str(reason), job_id)
|
||||
)
|
||||
|
||||
def fail_job(self, job_id, error_msg):
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
conn.execute(
|
||||
"UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
|
||||
(str(error_msg), job_id)
|
||||
)
|
||||
|
||||
def get_stats(self):
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
|
||||
return dict(cursor.fetchall())
|
||||
|
||||
def get_recent_jobs(self, limit=50):
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
|
||||
Reference in New Issue
Block a user