fix: [30388f42] Connector: Umfassendes DB-Logging & Rollback-Fixes (Queue Manager)

- Implementiert detailliertes Error/Critical-Logging mit exc_info=True in allen DB-Operationen.
- Stellt sicher, dass conn.rollback() bei jedem Fehler aufgerufen wird, um Datenkorruption zu verhindern.
- Erzwingt PRAGMA journal_mode=DELETE, synchronous=NORMAL und mmap_size=0 für Synology-Kompatibilität.
- Dies sollte die Ursache der wiederkehrenden Job-Loops durch Datenbank-Schreibfehler aufdecken.
This commit is contained in:
2026-03-06 18:15:04 +00:00
parent 28d1cf0bf0
commit bad4fcb0a0

View File

@@ -2,6 +2,9 @@ import sqlite3
import json import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
import os import os
import logging
logger = logging.getLogger("connector-queue")
# HARDCODED PATH TO FORCE CONSISTENCY # HARDCODED PATH TO FORCE CONSISTENCY
DB_PATH = "/data/connector_queue.db" DB_PATH = "/data/connector_queue.db"
@@ -12,87 +15,105 @@ class JobQueue:
def _init_db(self): def _init_db(self):
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
# Enable WAL mode for better concurrency
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT,
payload TEXT,
entity_name TEXT,
status TEXT DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
error_msg TEXT,
next_try_at TIMESTAMP
)
""")
# Migration for existing DBs
try: try:
conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP") # Revert to default journal mode for problematic Synology mounts
except sqlite3.OperationalError: pass conn.execute("PRAGMA journal_mode=DELETE")
conn.execute("PRAGMA synchronous=NORMAL")
try: conn.execute("PRAGMA mmap_size=0")
conn.execute("ALTER TABLE jobs ADD COLUMN entity_name TEXT") conn.execute("""
except sqlite3.OperationalError: pass CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
try: event_type TEXT,
conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT") payload TEXT,
except sqlite3.OperationalError: pass entity_name TEXT,
status TEXT DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
error_msg TEXT,
next_try_at TIMESTAMP
)
""")
# Migration for existing DBs
try: conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP")
except sqlite3.OperationalError: pass
try: conn.execute("ALTER TABLE jobs ADD COLUMN entity_name TEXT")
except sqlite3.OperationalError: pass
try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT")
except sqlite3.OperationalError: pass
conn.commit()
logger.info("Database initialized with PRAGMA settings (DELETE, NORMAL, mmap=0).")
except Exception as e:
logger.critical(f"❌ CRITICAL DB INIT ERROR: {e}", exc_info=True)
raise
def add_job(self, event_type: str, payload: dict): def add_job(self, event_type: str, payload: dict):
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
conn.execute( try:
"INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)", conn.execute(
(event_type, json.dumps(payload), 'PENDING') "INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)",
) (event_type, json.dumps(payload), 'PENDING')
)
conn.commit()
logger.debug(f"Job added: {event_type}")
except Exception as e:
logger.error(f"❌ Failed to add job: {e}", exc_info=True)
conn.rollback()
raise
def update_entity_name(self, job_id, name, associate_name=None): def update_entity_name(self, job_id, name, associate_name=None):
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
if associate_name: try:
conn.execute( if associate_name:
"UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?", conn.execute(
(str(name), str(associate_name), job_id) "UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?",
) (str(name), str(associate_name), job_id)
else: )
conn.execute( else:
"UPDATE jobs SET entity_name = ?, updated_at = datetime('now') WHERE id = ?", conn.execute(
(str(name), job_id) "UPDATE jobs SET entity_name = ?, updated_at = datetime('now') WHERE id = ?",
) (str(name), job_id)
)
conn.commit()
logger.debug(f"Entity name updated for job {job_id}")
except Exception as e:
logger.error(f"❌ Failed to update entity name for job {job_id}: {e}", exc_info=True)
conn.rollback()
raise
def get_next_job(self): def get_next_job(self):
"""
Atomically fetches the next pending job where next_try_at is reached.
"""
job = None job = None
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
cursor = conn.cursor() cursor = conn.cursor()
try:
cursor.execute(""" cursor.execute("""
SELECT id, event_type, payload, created_at SELECT id, event_type, payload, created_at
FROM jobs FROM jobs
WHERE status = 'PENDING' WHERE status = 'PENDING'
AND (next_try_at IS NULL OR next_try_at <= datetime('now')) AND (next_try_at IS NULL OR next_try_at <= datetime('now'))
ORDER BY created_at ASC ORDER BY created_at ASC
LIMIT 1 LIMIT 1
""") """)
row = cursor.fetchone() row = cursor.fetchone()
if row:
job = dict(row)
# Mark as processing
cursor.execute(
"UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?",
(job['id'],)
)
conn.commit()
if row:
job = dict(row)
cursor.execute(
"UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?",
(job['id'],)
)
conn.commit()
logger.info(f"Fetched and marked job {job['id']} as PROCESSING.")
except Exception as e:
logger.error(f"❌ Failed to get next job or mark as PROCESSING: {e}", exc_info=True)
conn.rollback()
if job: if job:
try: try:
job['payload'] = json.loads(job['payload']) job['payload'] = json.loads(job['payload'])
except Exception as e: except Exception as e:
logger.error(f"Failed to parse payload for job {job['id']}: {e}") logger.error(f"Failed to parse payload for job {job['id']}: {e}")
return None return None
return job return job
@@ -100,52 +121,70 @@ class JobQueue:
def retry_job_later(self, job_id, delay_seconds=60, error_msg=None): def retry_job_later(self, job_id, delay_seconds=60, error_msg=None):
next_try = datetime.utcnow() + timedelta(seconds=delay_seconds) next_try = datetime.utcnow() + timedelta(seconds=delay_seconds)
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
if error_msg: try:
conn.execute( if error_msg:
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?", conn.execute(
(next_try, str(error_msg), job_id) "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?",
) (next_try, str(error_msg), job_id)
else: )
conn.execute( else:
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?", conn.execute(
(next_try, job_id) "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?",
) (next_try, job_id)
)
conn.commit()
logger.warning(f"Job {job_id} set to RETRY. Next attempt at {next_try}.")
except Exception as e:
logger.error(f"❌ Failed to set job {job_id} to RETRY: {e}", exc_info=True)
conn.rollback()
def complete_job(self, job_id): def complete_job(self, job_id):
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
conn.execute( try:
"UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?", conn.execute(
(job_id,) "UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?",
) (job_id,)
)
conn.commit()
logger.info(f"Job {job_id} COMPLETED.")
except Exception as e:
logger.error(f"❌ Failed to set job {job_id} to COMPLETED: {e}", exc_info=True)
conn.rollback()
def skip_job(self, job_id, reason): def skip_job(self, job_id, reason):
"""
Marks a job as SKIPPED (intentionally not processed).
Reason is stored in error_msg for visibility.
"""
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
conn.execute( try:
"UPDATE jobs SET status = 'SKIPPED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", conn.execute(
(str(reason), job_id) "UPDATE jobs SET status = 'SKIPPED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
) (str(reason), job_id)
)
conn.commit()
logger.info(f"Job {job_id} SKIPPED: {reason}")
except Exception as e:
logger.error(f"❌ Failed to set job {job_id} to SKIPPED: {e}", exc_info=True)
conn.rollback()
def mark_as_deleted(self, job_id, reason): def mark_as_deleted(self, job_id, reason):
"""
Marks a job as DELETED (entity no longer exists in source system).
"""
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
conn.execute( try:
"UPDATE jobs SET status = 'DELETED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", conn.execute(
(str(reason), job_id) "UPDATE jobs SET status = 'DELETED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
) (str(reason), job_id)
)
conn.commit()
logger.info(f"Job {job_id} DELETED: {reason}")
except Exception as e:
logger.error(f"❌ Failed to set job {job_id} to DELETED: {e}", exc_info=True)
conn.rollback()
def fail_job(self, job_id, error_msg): def fail_job(self, job_id, error_msg):
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
conn.execute( try:
"UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", conn.execute(
(str(error_msg), job_id) "UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?",
) (str(error_msg), job_id)
)
conn.commit()
logger.error(f"Job {job_id} FAILED: {error_msg}")
except Exception as e:
logger.critical(f"❌ CRITICAL: Failed to set job {job_id} to FAILED: {e}", exc_info=True)
conn.rollback()
def get_stats(self): def get_stats(self):
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
cursor = conn.cursor() cursor = conn.cursor()