From bad4fcb0a0e61c827f9003a06f114545b80286c6 Mon Sep 17 00:00:00 2001 From: Floke Date: Fri, 6 Mar 2026 18:15:04 +0000 Subject: [PATCH] fix: [30388f42] Connector: Umfassendes DB-Logging & Rollback-Fixes (Queue Manager) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implementiert detailliertes Error/Critical-Logging mit exc_info=True in allen DB-Operationen. - Stellt sicher, dass conn.rollback() bei jedem Fehler aufgerufen wird, um Datenkorruption zu verhindern. - Erzwingt PRAGMA journal_mode=DELETE, synchronous=NORMAL und mmap_size=0 für Synology-Kompatibilität. - Dies sollte die Ursache der wiederkehrenden Job-Loops durch Datenbank-Schreibfehler aufdecken. --- connector-superoffice/queue_manager.py | 245 ++++++++++++++----------- 1 file changed, 142 insertions(+), 103 deletions(-) diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index 807ffd36..60b30900 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -2,6 +2,9 @@ import sqlite3 import json from datetime import datetime, timedelta import os +import logging + +logger = logging.getLogger("connector-queue") # HARDCODED PATH TO FORCE CONSISTENCY DB_PATH = "/data/connector_queue.db" @@ -12,87 +15,105 @@ class JobQueue: def _init_db(self): with sqlite3.connect(DB_PATH, timeout=30) as conn: - # Enable WAL mode for better concurrency - conn.execute("PRAGMA journal_mode=WAL") - conn.execute(""" - CREATE TABLE IF NOT EXISTS jobs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - event_type TEXT, - payload TEXT, - entity_name TEXT, - status TEXT DEFAULT 'PENDING', - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - error_msg TEXT, - next_try_at TIMESTAMP - ) - """) - # Migration for existing DBs try: - conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP") - except sqlite3.OperationalError: pass - - try: - conn.execute("ALTER TABLE jobs ADD COLUMN entity_name TEXT") - except sqlite3.OperationalError: pass - - try: - conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT") - except sqlite3.OperationalError: pass + # Revert to default journal mode for problematic Synology mounts + conn.execute("PRAGMA journal_mode=DELETE") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA mmap_size=0") + conn.execute(""" + CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_type TEXT, + payload TEXT, + entity_name TEXT, + status TEXT DEFAULT 'PENDING', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + error_msg TEXT, + next_try_at TIMESTAMP + ) + """) + # Migration for existing DBs + try: conn.execute("ALTER TABLE jobs ADD COLUMN next_try_at TIMESTAMP") + except sqlite3.OperationalError: pass + + try: conn.execute("ALTER TABLE jobs ADD COLUMN entity_name TEXT") + except sqlite3.OperationalError: pass + + try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT") + except sqlite3.OperationalError: pass + conn.commit() + logger.info("Database initialized with PRAGMA settings (DELETE, NORMAL, mmap=0).") + except Exception as e: + logger.critical(f"❌ CRITICAL DB INIT ERROR: {e}", exc_info=True) + raise def add_job(self, event_type: str, payload: dict): with sqlite3.connect(DB_PATH, timeout=30) as conn: - conn.execute( - "INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)", - (event_type, json.dumps(payload), 'PENDING') - ) - + try: + conn.execute( + "INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)", + (event_type, json.dumps(payload), 'PENDING') + ) + conn.commit() + logger.debug(f"Job added: {event_type}") + except Exception as e: + logger.error(f"❌ Failed to add job: {e}", exc_info=True) + conn.rollback() + raise def update_entity_name(self, job_id, name, associate_name=None): with sqlite3.connect(DB_PATH, timeout=30) as conn: - if associate_name: - conn.execute( - "UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?", - (str(name), str(associate_name), job_id) - ) - else: - conn.execute( - "UPDATE jobs SET entity_name = ?, updated_at = datetime('now') WHERE id = ?", - (str(name), job_id) - ) - + try: + if associate_name: + conn.execute( + "UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?", + (str(name), str(associate_name), job_id) + ) + else: + conn.execute( + "UPDATE jobs SET entity_name = ?, updated_at = datetime('now') WHERE id = ?", + (str(name), job_id) + ) + conn.commit() + logger.debug(f"Entity name updated for job {job_id}") + except Exception as e: + logger.error(f"❌ Failed to update entity name for job {job_id}: {e}", exc_info=True) + conn.rollback() + raise def get_next_job(self): - """ - Atomically fetches the next pending job where next_try_at is reached. - """ job = None with sqlite3.connect(DB_PATH, timeout=30) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() - - cursor.execute(""" - SELECT id, event_type, payload, created_at - FROM jobs - WHERE status = 'PENDING' - AND (next_try_at IS NULL OR next_try_at <= datetime('now')) - ORDER BY created_at ASC - LIMIT 1 - """) - row = cursor.fetchone() - - if row: - job = dict(row) - # Mark as processing - cursor.execute( - "UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?", - (job['id'],) - ) - conn.commit() + try: + cursor.execute(""" + SELECT id, event_type, payload, created_at + FROM jobs + WHERE status = 'PENDING' + AND (next_try_at IS NULL OR next_try_at <= datetime('now')) + ORDER BY created_at ASC + LIMIT 1 + """) + row = cursor.fetchone() + if row: + job = dict(row) + cursor.execute( + "UPDATE jobs SET status = 'PROCESSING', updated_at = datetime('now') WHERE id = ?", + (job['id'],) + ) + conn.commit() + logger.info(f"Fetched and marked job {job['id']} as PROCESSING.") + + except Exception as e: + logger.error(f"❌ Failed to get next job or mark as PROCESSING: {e}", exc_info=True) + conn.rollback() + if job: try: job['payload'] = json.loads(job['payload']) except Exception as e: - logger.error(f"Failed to parse payload for job {job['id']}: {e}") + logger.error(f"❌ Failed to parse payload for job {job['id']}: {e}") return None return job @@ -100,52 +121,70 @@ class JobQueue: def retry_job_later(self, job_id, delay_seconds=60, error_msg=None): next_try = datetime.utcnow() + timedelta(seconds=delay_seconds) with sqlite3.connect(DB_PATH, timeout=30) as conn: - if error_msg: - conn.execute( - "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?", - (next_try, str(error_msg), job_id) - ) - else: - conn.execute( - "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?", - (next_try, job_id) - ) - + try: + if error_msg: + conn.execute( + "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?", + (next_try, str(error_msg), job_id) + ) + else: + conn.execute( + "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?", + (next_try, job_id) + ) + conn.commit() + logger.warning(f"Job {job_id} set to RETRY. Next attempt at {next_try}.") + except Exception as e: + logger.error(f"❌ Failed to set job {job_id} to RETRY: {e}", exc_info=True) + conn.rollback() def complete_job(self, job_id): with sqlite3.connect(DB_PATH, timeout=30) as conn: - conn.execute( - "UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?", - (job_id,) - ) - + try: + conn.execute( + "UPDATE jobs SET status = 'COMPLETED', updated_at = datetime('now') WHERE id = ?", + (job_id,) + ) + conn.commit() + logger.info(f"Job {job_id} COMPLETED.") + except Exception as e: + logger.error(f"❌ Failed to set job {job_id} to COMPLETED: {e}", exc_info=True) + conn.rollback() def skip_job(self, job_id, reason): - """ - Marks a job as SKIPPED (intentionally not processed). - Reason is stored in error_msg for visibility. - """ with sqlite3.connect(DB_PATH, timeout=30) as conn: - conn.execute( - "UPDATE jobs SET status = 'SKIPPED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", - (str(reason), job_id) - ) - + try: + conn.execute( + "UPDATE jobs SET status = 'SKIPPED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", + (str(reason), job_id) + ) + conn.commit() + logger.info(f"Job {job_id} SKIPPED: {reason}") + except Exception as e: + logger.error(f"❌ Failed to set job {job_id} to SKIPPED: {e}", exc_info=True) + conn.rollback() def mark_as_deleted(self, job_id, reason): - """ - Marks a job as DELETED (entity no longer exists in source system). - """ with sqlite3.connect(DB_PATH, timeout=30) as conn: - conn.execute( - "UPDATE jobs SET status = 'DELETED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", - (str(reason), job_id) - ) - + try: + conn.execute( + "UPDATE jobs SET status = 'DELETED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", + (str(reason), job_id) + ) + conn.commit() + logger.info(f"Job {job_id} DELETED: {reason}") + except Exception as e: + logger.error(f"❌ Failed to set job {job_id} to DELETED: {e}", exc_info=True) + conn.rollback() def fail_job(self, job_id, error_msg): with sqlite3.connect(DB_PATH, timeout=30) as conn: - conn.execute( - "UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", - (str(error_msg), job_id) - ) - + try: + conn.execute( + "UPDATE jobs SET status = 'FAILED', error_msg = ?, updated_at = datetime('now') WHERE id = ?", + (str(error_msg), job_id) + ) + conn.commit() + logger.error(f"Job {job_id} FAILED: {error_msg}") + except Exception as e: + logger.critical(f"❌ CRITICAL: Failed to set job {job_id} to FAILED: {e}", exc_info=True) + conn.rollback() def get_stats(self): with sqlite3.connect(DB_PATH, timeout=30) as conn: cursor = conn.cursor()