Docs: Aktualisierung der Dokumentation für Task [31e88f42]
This commit is contained in:
@@ -38,10 +38,18 @@ This directory contains Python scripts designed to integrate with the SuperOffic
|
|||||||
* Before extensive processing, the worker now checks `connector_queue.db` via `queue_manager.check_for_recent_duplicate()`.
|
* Before extensive processing, the worker now checks `connector_queue.db` via `queue_manager.check_for_recent_duplicate()`.
|
||||||
* If a `contact.created` event for the same company name is already `PROCESSING` or has been `COMPLETED` within the last 5 minutes, the current job is immediately `SKIPPED` as a duplicate.
|
* If a `contact.created` event for the same company name is already `PROCESSING` or has been `COMPLETED` within the last 5 minutes, the current job is immediately `SKIPPED` as a duplicate.
|
||||||
|
|
||||||
|
#### 💊 E. Job Stability (Poison Pill)
|
||||||
|
* **Problem:** API errors or transient issues could cause a job to enter an infinite retry loop, blocking the entire queue indefinitely.
|
||||||
|
* **Solution:** A **"poison pill" mechanism** was added to the queue manager.
|
||||||
|
* A `retry_count` is now tracked for every job.
|
||||||
|
* If a job fails and is retried, this counter is incremented.
|
||||||
|
* If the counter exceeds a maximum threshold (currently 5), the job is automatically moved to the `FAILED` state and will not be retried again. This ensures that problematic jobs are cleared and do not block new incoming events.
|
||||||
|
|
||||||
### 3. Advanced API Handling (Critical Fixes)
|
### 3. Advanced API Handling (Critical Fixes)
|
||||||
|
|
||||||
* **OData Pagination:** We implemented `odata.nextLink` support (Manuel Zierl's advice) to correctly handle large result sets (>1000 records).
|
* **OData Pagination:** We implemented `odata.nextLink` support (Manuel Zierl's advice) to correctly handle large result sets (>1000 records).
|
||||||
* **Case Sensitivity:** We learned that SuperOffice API keys are case-sensitive in responses (e.g., `contactId` vs `ContactId`) depending on the endpoint. Our code now handles both.
|
* **Case Sensitivity:** We learned that SuperOffice API keys are case-sensitive in responses (e.g., `contactId` vs `ContactId`) depending on the endpoint. Our code now handles both.
|
||||||
|
* **Robust Authentication:** The API client was hardened to raise exceptions on authentication failures instead of returning `None`. This ensures that auth errors are properly caught and jobs are failed or retried correctly.
|
||||||
* **Auth Consistency:** Always use `load_dotenv(override=True)` to prevent stale environment variables (like expired tokens) from lingering in the shell process.
|
* **Auth Consistency:** Always use `load_dotenv(override=True)` to prevent stale environment variables (like expired tokens) from lingering in the shell process.
|
||||||
* **Identity Crisis:** The `Associate/Me` and `Associate/{id}` endpoints return 500 errors if the API user lacks a linked Person record. This is a known configuration blocker for automated mailing.
|
* **Identity Crisis:** The `Associate/Me` and `Associate/{id}` endpoints return 500 errors if the API user lacks a linked Person record. This is a known configuration blocker for automated mailing.
|
||||||
|
|
||||||
|
|||||||
@@ -30,7 +30,8 @@ class JobQueue:
|
|||||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
error_msg TEXT,
|
error_msg TEXT,
|
||||||
next_try_at TIMESTAMP
|
next_try_at TIMESTAMP,
|
||||||
|
retry_count INTEGER DEFAULT 0
|
||||||
)
|
)
|
||||||
""")
|
""")
|
||||||
# Migration for existing DBs
|
# Migration for existing DBs
|
||||||
@@ -42,6 +43,9 @@ class JobQueue:
|
|||||||
|
|
||||||
try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT")
|
try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT")
|
||||||
except sqlite3.OperationalError: pass
|
except sqlite3.OperationalError: pass
|
||||||
|
|
||||||
|
try: conn.execute("ALTER TABLE jobs ADD COLUMN retry_count INTEGER DEFAULT 0")
|
||||||
|
except sqlite3.OperationalError: pass
|
||||||
conn.commit()
|
conn.commit()
|
||||||
logger.info("Database initialized with PRAGMA settings (DELETE, NORMAL, mmap=0).")
|
logger.info("Database initialized with PRAGMA settings (DELETE, NORMAL, mmap=0).")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -87,7 +91,7 @@ class JobQueue:
|
|||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
try:
|
try:
|
||||||
cursor.execute("""
|
cursor.execute("""
|
||||||
SELECT id, event_type, payload, created_at
|
SELECT id, event_type, payload, created_at, retry_count
|
||||||
FROM jobs
|
FROM jobs
|
||||||
WHERE status = 'PENDING'
|
WHERE status = 'PENDING'
|
||||||
AND (next_try_at IS NULL OR next_try_at <= datetime('now'))
|
AND (next_try_at IS NULL OR next_try_at <= datetime('now'))
|
||||||
@@ -118,22 +122,25 @@ class JobQueue:
|
|||||||
|
|
||||||
return job
|
return job
|
||||||
|
|
||||||
def retry_job_later(self, job_id, delay_seconds=60, error_msg=None):
|
def retry_job_later(self, job_id, current_retry_count, delay_seconds=60, error_msg=None):
|
||||||
|
MAX_RETRIES = 5
|
||||||
|
if current_retry_count >= MAX_RETRIES:
|
||||||
|
fail_msg = f"Job reached maximum retry limit of {MAX_RETRIES}. Last error: {error_msg}"
|
||||||
|
logger.error(f"Job {job_id} FAILED permanently: {fail_msg}")
|
||||||
|
self.fail_job(job_id, fail_msg)
|
||||||
|
return
|
||||||
|
|
||||||
next_try = datetime.utcnow() + timedelta(seconds=delay_seconds)
|
next_try = datetime.utcnow() + timedelta(seconds=delay_seconds)
|
||||||
|
new_retry_count = current_retry_count + 1
|
||||||
|
|
||||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||||
try:
|
try:
|
||||||
if error_msg:
|
conn.execute(
|
||||||
conn.execute(
|
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ?, retry_count = ? WHERE id = ?",
|
||||||
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now'), error_msg = ? WHERE id = ?",
|
(next_try, str(error_msg), new_retry_count, job_id)
|
||||||
(next_try, str(error_msg), job_id)
|
)
|
||||||
)
|
|
||||||
else:
|
|
||||||
conn.execute(
|
|
||||||
"UPDATE jobs SET status = 'PENDING', next_try_at = ?, updated_at = datetime('now') WHERE id = ?",
|
|
||||||
(next_try, job_id)
|
|
||||||
)
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
logger.warning(f"Job {job_id} set to RETRY. Next attempt at {next_try}.")
|
logger.warning(f"Job {job_id} set to RETRY (Attempt {new_retry_count}/{MAX_RETRIES}). Next attempt at {next_try}.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"❌ Failed to set job {job_id} to RETRY: {e}", exc_info=True)
|
logger.error(f"❌ Failed to set job {job_id} to RETRY: {e}", exc_info=True)
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
@@ -271,7 +278,6 @@ class JobQueue:
|
|||||||
if entity_id in id_to_runs:
|
if entity_id in id_to_runs:
|
||||||
for run in id_to_runs[entity_id]:
|
for run in id_to_runs[entity_id]:
|
||||||
run_latest_time = datetime.strptime(run['updated_at'], "%Y-%m-%d %H:%M:%S")
|
run_latest_time = datetime.strptime(run['updated_at'], "%Y-%m-%d %H:%M:%S")
|
||||||
# If this job is within 15 mins of the run's activity
|
|
||||||
if abs((run_latest_time - job_time).total_seconds()) < 900:
|
if abs((run_latest_time - job_time).total_seconds()) < 900:
|
||||||
target_run = run
|
target_run = run
|
||||||
break
|
break
|
||||||
@@ -316,7 +322,12 @@ class JobQueue:
|
|||||||
end = datetime.strptime(target_run["updated_at"], "%Y-%m-%d %H:%M:%S")
|
end = datetime.strptime(target_run["updated_at"], "%Y-%m-%d %H:%M:%S")
|
||||||
diff = end - start
|
diff = end - start
|
||||||
seconds = int(diff.total_seconds())
|
seconds = int(diff.total_seconds())
|
||||||
target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s"
|
|
||||||
|
# Display a minimal duration for skipped runs to avoid confusion
|
||||||
|
if target_run["status"] == "SKIPPED":
|
||||||
|
target_run["duration"] = "~1s"
|
||||||
|
else:
|
||||||
|
target_run["duration"] = f"{seconds}s" if seconds < 60 else f"{seconds // 60}m {seconds % 60}s"
|
||||||
except: pass
|
except: pass
|
||||||
|
|
||||||
# Resolve Name & Associate (if not already set from a newer job in this cluster)
|
# Resolve Name & Associate (if not already set from a newer job in this cluster)
|
||||||
|
|||||||
@@ -13,6 +13,10 @@ class ContactNotFoundException(Exception):
|
|||||||
"""Custom exception for 404 errors on Contact/Person lookups."""
|
"""Custom exception for 404 errors on Contact/Person lookups."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class SuperOfficeAuthenticationError(Exception):
|
||||||
|
"""Custom exception for auth/token refresh failures."""
|
||||||
|
pass
|
||||||
|
|
||||||
class SuperOfficeClient:
|
class SuperOfficeClient:
|
||||||
"""A client for interacting with the SuperOffice REST API."""
|
"""A client for interacting with the SuperOffice REST API."""
|
||||||
|
|
||||||
@@ -86,7 +90,7 @@ class SuperOfficeClient:
|
|||||||
"""Helper to handle 401 Unauthorized with auto-refresh."""
|
"""Helper to handle 401 Unauthorized with auto-refresh."""
|
||||||
if not self.access_token:
|
if not self.access_token:
|
||||||
if not self._refresh_access_token():
|
if not self._refresh_access_token():
|
||||||
return None
|
raise SuperOfficeAuthenticationError("FATAL: Could not get initial access token.")
|
||||||
|
|
||||||
url = f"{self.base_url}/{endpoint}"
|
url = f"{self.base_url}/{endpoint}"
|
||||||
try:
|
try:
|
||||||
@@ -113,7 +117,7 @@ class SuperOfficeClient:
|
|||||||
return self._request_with_retry(method, endpoint, payload, retry=False)
|
return self._request_with_retry(method, endpoint, payload, retry=False)
|
||||||
else:
|
else:
|
||||||
logger.error("❌ Token Refresh failed during retry.")
|
logger.error("❌ Token Refresh failed during retry.")
|
||||||
return None
|
raise SuperOfficeAuthenticationError("Could not refresh token during retry.")
|
||||||
|
|
||||||
if resp.status_code == 204:
|
if resp.status_code == 204:
|
||||||
return True
|
return True
|
||||||
@@ -128,10 +132,10 @@ class SuperOfficeClient:
|
|||||||
raise ContactNotFoundException(f"Entity not found at {endpoint}") from e
|
raise ContactNotFoundException(f"Entity not found at {endpoint}") from e
|
||||||
|
|
||||||
logger.error(f"❌ API {method} Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
|
logger.error(f"❌ API {method} Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
|
||||||
return None
|
raise # Re-raise the original HTTPError
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"❌ Connection Error during {method} for {endpoint}: {e}")
|
logger.error(f"❌ Connection Error during {method} for {endpoint}: {e}")
|
||||||
return None
|
raise # Re-raise other exceptions
|
||||||
|
|
||||||
def _get(self, endpoint):
|
def _get(self, endpoint):
|
||||||
return self._request_with_retry("GET", endpoint)
|
return self._request_with_retry("GET", endpoint)
|
||||||
|
|||||||
@@ -233,7 +233,7 @@ def dashboard():
|
|||||||
<td>${phasesHtml}</td>
|
<td>${phasesHtml}</td>
|
||||||
<td><span class="meta">${acc.duration || '0s'}</span></td>
|
<td><span class="meta">${acc.duration || '0s'}</span></td>
|
||||||
<td><span class="status status-${acc.status}">${acc.status}</span></td>
|
<td><span class="status status-${acc.status}">${acc.status}</span></td>
|
||||||
<td>${new Date(acc.updated_at + "Z").toLocaleTimeString()}</td>
|
<td>${new Date(acc.updated_at + "Z").toLocaleString('de-DE', { day: '2-digit', month: '2-digit', hour: '2-digit', minute: '2-digit' })}</td>
|
||||||
<td><pre>${acc.error_msg || 'No issues'}</pre></td>
|
<td><pre>${acc.error_msg || 'No issues'}</pre></td>
|
||||||
`;
|
`;
|
||||||
tbody.appendChild(tr);
|
tbody.appendChild(tr);
|
||||||
@@ -256,7 +256,7 @@ def dashboard():
|
|||||||
tr.innerHTML = `
|
tr.innerHTML = `
|
||||||
<td>#${job.id}</td>
|
<td>#${job.id}</td>
|
||||||
<td><span class="status status-${job.status}">${job.status}</span></td>
|
<td><span class="status status-${job.status}">${job.status}</span></td>
|
||||||
<td>${new Date(job.updated_at + "Z").toLocaleTimeString()}</td>
|
<td>${new Date(job.updated_at + "Z").toLocaleString('de-DE', { day: '2-digit', month: '2-digit', hour: '2-digit', minute: '2-digit' })}</td>
|
||||||
<td>${job.event_type}</td>
|
<td>${job.event_type}</td>
|
||||||
<td><pre>${details}</pre></td>
|
<td><pre>${details}</pre></td>
|
||||||
`;
|
`;
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import requests
|
|||||||
import json
|
import json
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from queue_manager import JobQueue
|
from queue_manager import JobQueue
|
||||||
from superoffice_client import SuperOfficeClient, ContactNotFoundException
|
from superoffice_client import SuperOfficeClient, ContactNotFoundException, SuperOfficeAuthenticationError
|
||||||
from config import settings
|
from config import settings
|
||||||
|
|
||||||
# Setup Logging
|
# Setup Logging
|
||||||
@@ -75,7 +75,7 @@ def run_worker():
|
|||||||
status, msg = process_job(job, so_client, queue)
|
status, msg = process_job(job, so_client, queue)
|
||||||
|
|
||||||
if status == "RETRY":
|
if status == "RETRY":
|
||||||
queue.retry_job_later(job['id'], delay_seconds=120, error_msg=msg)
|
queue.retry_job_later(job['id'], job['retry_count'], delay_seconds=120, error_msg=msg)
|
||||||
elif status == "FAILED":
|
elif status == "FAILED":
|
||||||
queue.fail_job(job['id'], msg or "Job failed status")
|
queue.fail_job(job['id'], msg or "Job failed status")
|
||||||
elif status == "SKIPPED":
|
elif status == "SKIPPED":
|
||||||
@@ -104,30 +104,26 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue):
|
|||||||
payload = job['payload']
|
payload = job['payload']
|
||||||
event_low = job['event_type'].lower()
|
event_low = job['event_type'].lower()
|
||||||
|
|
||||||
# --- 1. HARD ECHO SHIELD (Who triggered this?) ---
|
# --- Strict Whitelist Filter ---
|
||||||
changed_by = payload.get("ChangedByAssociateId")
|
# A job is only processed if the webhook indicates a change to a relevant field.
|
||||||
self_id = job.get('self_associate_id')
|
# This is the primary defense against noise and echoes.
|
||||||
|
|
||||||
if changed_by and self_id and int(changed_by) == int(self_id):
|
|
||||||
msg = f"🛡️ ECHO DETECTED: Event triggered by myself (ID {self_id}). Stopping immediately."
|
|
||||||
logger.info(msg)
|
|
||||||
return ("SKIPPED", msg)
|
|
||||||
|
|
||||||
# --- 2. NOISE REDUCTION: FIELD FILTER (What changed?) ---
|
|
||||||
changes = [c.lower() for c in payload.get("Changes", [])]
|
changes = [c.lower() for c in payload.get("Changes", [])]
|
||||||
|
|
||||||
if "person" in event_low:
|
# Define fields that are significant enough to trigger a full re-assessment.
|
||||||
# We allow contact_id changes (linking to company) and basic identity changes
|
trigger_fields = ["name", "urladdress", "urls"]
|
||||||
if "name" not in changes and "email" not in changes and "jobtitle" not in changes and "contact_id" not in changes:
|
|
||||||
msg = f"Skipping person event: No relevant changes (Name/Email/JobTitle/Mapping) in {changes}."
|
|
||||||
logger.info(f"⏭️ {msg}")
|
|
||||||
return ("SKIPPED", msg)
|
|
||||||
|
|
||||||
elif "contact" in event_low:
|
# The ProgID for the Vertical UDF. A change to this field MUST trigger a resync.
|
||||||
if "name" not in changes and "urladdress" not in changes:
|
vertical_udf_key = settings.UDF_VERTICAL.lower() if settings.UDF_VERTICAL else "superoffice:83"
|
||||||
msg = f"Skipping contact event: No relevant changes (Name/Website) in {changes}."
|
|
||||||
logger.info(f"⏭️ {msg}")
|
# Check if any of the trigger fields or the specific vertical UDF is in the changes list.
|
||||||
return ("SKIPPED", msg)
|
has_trigger_field = any(f in changes for f in trigger_fields) or vertical_udf_key in changes
|
||||||
|
|
||||||
|
# For '...created' events, we always process. For all other events, we check the whitelist.
|
||||||
|
if "created" not in event_low and not has_trigger_field:
|
||||||
|
msg = f"⏭️ Skipping event '{event_low}': No relevant field changes in {changes}."
|
||||||
|
logger.info(msg)
|
||||||
|
return ("SKIPPED", msg)
|
||||||
|
|
||||||
# 0. ID Extraction & Early Exit for irrelevant jobs
|
# 0. ID Extraction & Early Exit for irrelevant jobs
|
||||||
person_id = None
|
person_id = None
|
||||||
|
|||||||
Reference in New Issue
Block a user