diff --git a/connector-superoffice/README.md b/connector-superoffice/README.md index 791e7755..458ba422 100644 --- a/connector-superoffice/README.md +++ b/connector-superoffice/README.md @@ -38,10 +38,18 @@ This directory contains Python scripts designed to integrate with the SuperOffic * Before extensive processing, the worker now checks `connector_queue.db` via `queue_manager.check_for_recent_duplicate()`. * If a `contact.created` event for the same company name is already `PROCESSING` or has been `COMPLETED` within the last 5 minutes, the current job is immediately `SKIPPED` as a duplicate. +#### 💊 E. Job Stability (Poison Pill) +* **Problem:** API errors or transient issues could cause a job to enter an infinite retry loop, blocking the entire queue indefinitely. +* **Solution:** A **"poison pill" mechanism** was added to the queue manager. + * A `retry_count` is now tracked for every job. + * If a job fails and is retried, this counter is incremented. + * If the counter exceeds a maximum threshold (currently 5), the job is automatically moved to the `FAILED` state and will not be retried again. This ensures that problematic jobs are cleared and do not block new incoming events. + ### 3. Advanced API Handling (Critical Fixes) * **OData Pagination:** We implemented `odata.nextLink` support (Manuel Zierl's advice) to correctly handle large result sets (>1000 records). * **Case Sensitivity:** We learned that SuperOffice API keys are case-sensitive in responses (e.g., `contactId` vs `ContactId`) depending on the endpoint. Our code now handles both. +* **Robust Authentication:** The API client was hardened to raise exceptions on authentication failures instead of returning `None`. This ensures that auth errors are properly caught and jobs are failed or retried correctly. * **Auth Consistency:** Always use `load_dotenv(override=True)` to prevent stale environment variables (like expired tokens) from lingering in the shell process. * **Identity Crisis:** The `Associate/Me` and `Associate/{id}` endpoints return 500 errors if the API user lacks a linked Person record. This is a known configuration blocker for automated mailing. diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index 8fe393e6..269822cf 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -30,7 +30,8 @@ class JobQueue: created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_msg TEXT, - next_try_at TIMESTAMP + next_try_at TIMESTAMP, + retry_count INTEGER DEFAULT 0 ) """) # Migration for existing DBs @@ -42,6 +43,9 @@ class JobQueue: try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT") except sqlite3.OperationalError: pass + + try: conn.execute("ALTER TABLE jobs ADD COLUMN retry_count INTEGER DEFAULT 0") + except sqlite3.OperationalError: pass conn.commit() logger.info("Database initialized with PRAGMA settings (DELETE, NORMAL, mmap=0).") except Exception as e: @@ -87,7 +91,7 @@ class JobQueue: cursor = conn.cursor() try: cursor.execute(""" - SELECT id, event_type, payload, created_at + SELECT id, event_type, payload, created_at, retry_count FROM jobs WHERE status = 'PENDING' AND (next_try_at IS NULL OR next_try_at <= datetime('now')) @@ -118,22 +122,25 @@ class JobQueue: return job - def retry_job_later(self, job_id, delay_seconds=60, error_msg=None): + def retry_job_later(self, job_id, current_retry_count, delay_seconds=60, error_msg=None): + MAX_RETRIES = 5 + if current_retry_count >= MAX_RETRIES: + fail_msg = f"Job reached maximum retry limit of {MAX_RETRIES}. Last error: {error_msg}" + logger.error(f"Job {job_id} FAILED permanently: {fail_msg}") + self.fail_job(job_id, fail_msg) + return + next_try = datetime.utcnow() + timedelta(seconds=delay_seconds) + new_retry_count = current_retry_count + 1 + with sqlite3.connect(DB_PATH, timeout=30) as conn: try: - if error_msg: - conn.execute( - "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?", - (next_try, str(error_msg), job_id) - ) - else: - conn.execute( - "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?", - (next_try, job_id) - ) + conn.execute( + "UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ?, retry_count = ? WHERE id = ?", + (next_try, str(error_msg), new_retry_count, job_id) + ) conn.commit() - logger.warning(f"Job {job_id} set to RETRY. Next attempt at {next_try}.") + logger.warning(f"Job {job_id} set to RETRY (Attempt {new_retry_count}/{MAX_RETRIES}). Next attempt at {next_try}.") except Exception as e: logger.error(f"❌ Failed to set job {job_id} to RETRY: {e}", exc_info=True) conn.rollback() @@ -271,7 +278,6 @@ class JobQueue: if entity_id in id_to_runs: for run in id_to_runs[entity_id]: run_latest_time = datetime.strptime(run['updated_at'], "%Y-%m-%d %H:%M:%S") - # If this job is within 15 mins of the run's activity if abs((run_latest_time - job_time).total_seconds()) < 900: target_run = run break @@ -316,7 +322,12 @@ class JobQueue: end = datetime.strptime(target_run["updated_at"], "%Y-%m-%d %H:%M:%S") diff = end - start seconds = int(diff.total_seconds()) - target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s" + + # Display a minimal duration for skipped runs to avoid confusion + if target_run["status"] == "SKIPPED": + target_run["duration"] = "~1s" + else: + target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s" except: pass # Resolve Name & Associate (if not already set from a newer job in this cluster) diff --git a/connector-superoffice/superoffice_client.py b/connector-superoffice/superoffice_client.py index 63112732..7e26dd7d 100644 --- a/connector-superoffice/superoffice_client.py +++ b/connector-superoffice/superoffice_client.py @@ -13,6 +13,10 @@ class ContactNotFoundException(Exception): """Custom exception for 404 errors on Contact/Person lookups.""" pass +class SuperOfficeAuthenticationError(Exception): + """Custom exception for auth/token refresh failures.""" + pass + class SuperOfficeClient: """A client for interacting with the SuperOffice REST API.""" @@ -86,7 +90,7 @@ class SuperOfficeClient: """Helper to handle 401 Unauthorized with auto-refresh.""" if not self.access_token: if not self._refresh_access_token(): - return None + raise SuperOfficeAuthenticationError("FATAL: Could not get initial access token.") url = f"{self.base_url}/{endpoint}" try: @@ -113,7 +117,7 @@ class SuperOfficeClient: return self._request_with_retry(method, endpoint, payload, retry=False) else: logger.error("❌ Token Refresh failed during retry.") - return None + raise SuperOfficeAuthenticationError("Could not refresh token during retry.") if resp.status_code == 204: return True @@ -128,10 +132,10 @@ class SuperOfficeClient: raise ContactNotFoundException(f"Entity not found at {endpoint}") from e logger.error(f"❌ API {method} Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}") - return None + raise # Re-raise the original HTTPError except Exception as e: logger.error(f"❌ Connection Error during {method} for {endpoint}: {e}") - return None + raise # Re-raise other exceptions def _get(self, endpoint): return self._request_with_retry("GET", endpoint) diff --git a/connector-superoffice/webhook_app.py b/connector-superoffice/webhook_app.py index a031fa4a..25df72a7 100644 --- a/connector-superoffice/webhook_app.py +++ b/connector-superoffice/webhook_app.py @@ -233,7 +233,7 @@ def dashboard(): ${phasesHtml} ${acc.duration || '0s'} ${acc.status} - ${new Date(acc.updated_at + "Z").toLocaleTimeString()} + ${new Date(acc.updated_at + "Z").toLocaleString('de-DE', { day: '2-digit', month: '2-digit', hour: '2-digit', minute: '2-digit' })}
${acc.error_msg || 'No issues'}
`; tbody.appendChild(tr); @@ -256,7 +256,7 @@ def dashboard(): tr.innerHTML = ` #${job.id} ${job.status} - ${new Date(job.updated_at + "Z").toLocaleTimeString()} + ${new Date(job.updated_at + "Z").toLocaleString('de-DE', { day: '2-digit', month: '2-digit', hour: '2-digit', minute: '2-digit' })} ${job.event_type}
${details}
`; diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py index 08b8bb47..f9fe2523 100644 --- a/connector-superoffice/worker.py +++ b/connector-superoffice/worker.py @@ -5,7 +5,7 @@ import requests import json from datetime import datetime from queue_manager import JobQueue -from superoffice_client import SuperOfficeClient, ContactNotFoundException +from superoffice_client import SuperOfficeClient, ContactNotFoundException, SuperOfficeAuthenticationError from config import settings # Setup Logging @@ -75,7 +75,7 @@ def run_worker(): status, msg = process_job(job, so_client, queue) if status == "RETRY": - queue.retry_job_later(job['id'], delay_seconds=120, error_msg=msg) + queue.retry_job_later(job['id'], job['retry_count'], delay_seconds=120, error_msg=msg) elif status == "FAILED": queue.fail_job(job['id'], msg or "Job failed status") elif status == "SKIPPED": @@ -104,30 +104,26 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue): payload = job['payload'] event_low = job['event_type'].lower() - # --- 1. HARD ECHO SHIELD (Who triggered this?) --- - changed_by = payload.get("ChangedByAssociateId") - self_id = job.get('self_associate_id') + # --- Strict Whitelist Filter --- + # A job is only processed if the webhook indicates a change to a relevant field. + # This is the primary defense against noise and echoes. - if changed_by and self_id and int(changed_by) == int(self_id): - msg = f"🛡️ ECHO DETECTED: Event triggered by myself (ID {self_id}). Stopping immediately." - logger.info(msg) - return ("SKIPPED", msg) - - # --- 2. NOISE REDUCTION: FIELD FILTER (What changed?) --- changes = [c.lower() for c in payload.get("Changes", [])] - if "person" in event_low: - # We allow contact_id changes (linking to company) and basic identity changes - if "name" not in changes and "email" not in changes and "jobtitle" not in changes and "contact_id" not in changes: - msg = f"Skipping person event: No relevant changes (Name/Email/JobTitle/Mapping) in {changes}." - logger.info(f"⏭️ {msg}") - return ("SKIPPED", msg) - - elif "contact" in event_low: - if "name" not in changes and "urladdress" not in changes: - msg = f"Skipping contact event: No relevant changes (Name/Website) in {changes}." - logger.info(f"⏭️ {msg}") - return ("SKIPPED", msg) + # Define fields that are significant enough to trigger a full re-assessment. + trigger_fields = ["name", "urladdress", "urls"] + + # The ProgID for the Vertical UDF. A change to this field MUST trigger a resync. + vertical_udf_key = settings.UDF_VERTICAL.lower() if settings.UDF_VERTICAL else "superoffice:83" + + # Check if any of the trigger fields or the specific vertical UDF is in the changes list. + has_trigger_field = any(f in changes for f in trigger_fields) or vertical_udf_key in changes + + # For '...created' events, we always process. For all other events, we check the whitelist. + if "created" not in event_low and not has_trigger_field: + msg = f"⏭️ Skipping event '{event_low}': No relevant field changes in {changes}." + logger.info(msg) + return ("SKIPPED", msg) # 0. ID Extraction & Early Exit for irrelevant jobs person_id = None