2 Commits

Author SHA1 Message Date
73d29bd5cd [31e88f42] Keine neuen Commits in dieser Session.
Keine neuen Commits in dieser Session.
2026-03-09 13:25:43 +00:00
65aaff3936 feat(connector): [31e88f42] Implement robust de-duplication at ingress
This commit addresses the issue of duplicate jobs being created by the SuperOffice connector.

The root cause was identified as a race condition where SuperOffice would send multiple  webhooks in quick succession for the same entity, leading to multiple identical jobs in the queue.

The solution involves several layers of improvement:
1.  **Ingress De-duplication:** The  now checks for existing  jobs for the same entity *before* adding a new job to the queue. This is the primary fix and prevents duplicates at the source.
2.  **DB Schema Enhancement:** The  table schema in  was extended with an  column to allow for reliable and efficient checking of duplicate entities.
3.  **Improved Logging:** The log messages in  for job retries (e.g., when waiting for the Company Explorer) have been made more descriptive to avoid confusion and false alarms.
2026-03-09 12:50:32 +00:00
4 changed files with 62 additions and 13 deletions

View File

@@ -1 +1 @@
{"task_id": "31e88f42-8544-8024-ad7c-da1733e94f9a", "token": "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", "readme_path": "connector-superoffice/README.md", "session_start_time": "2026-03-09T12:38:07.040119"} {"task_id": "31e88f42-8544-8024-ad7c-da1733e94f9a", "token": "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", "readme_path": "connector-superoffice/README.md", "session_start_time": "2026-03-09T13:25:40.399888"}

View File

@@ -44,6 +44,9 @@ class JobQueue:
try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT") try: conn.execute("ALTER TABLE jobs ADD COLUMN associate_name TEXT")
except sqlite3.OperationalError: pass except sqlite3.OperationalError: pass
try: conn.execute("ALTER TABLE jobs ADD COLUMN entity_id INTEGER")
except sqlite3.OperationalError: pass
try: conn.execute("ALTER TABLE jobs ADD COLUMN retry_count INTEGER DEFAULT 0") try: conn.execute("ALTER TABLE jobs ADD COLUMN retry_count INTEGER DEFAULT 0")
except sqlite3.OperationalError: pass except sqlite3.OperationalError: pass
conn.commit() conn.commit()
@@ -53,22 +56,57 @@ class JobQueue:
raise raise
def add_job(self, event_type: str, payload: dict): def add_job(self, event_type: str, payload: dict):
entity_id = payload.get("PrimaryKey")
with sqlite3.connect(DB_PATH, timeout=30) as conn: with sqlite3.connect(DB_PATH, timeout=30) as conn:
try: try:
conn.execute( conn.execute(
"INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)", "INSERT INTO jobs (event_type, payload, status, entity_id) VALUES (?, ?, ?, ?)",
(event_type, json.dumps(payload), 'PENDING') (event_type, json.dumps(payload), 'PENDING', entity_id)
) )
conn.commit() conn.commit()
logger.debug(f"Job added: {event_type}") logger.debug(f"Job added: {event_type} for entity {entity_id}")
except Exception as e: except Exception as e:
logger.error(f"❌ Failed to add job: {e}", exc_info=True) logger.error(f"❌ Failed to add job: {e}", exc_info=True)
conn.rollback() conn.rollback()
raise raise
def update_entity_name(self, job_id, name, associate_name=None):
with sqlite3.connect(DB_PATH, timeout=30) as conn: def is_duplicate_pending(self, event_type: str, payload: dict) -> bool:
try: """
if associate_name: Checks if a job with the same event_type and entity_id is already PENDING.
This is to prevent adding duplicate jobs from webhooks that are sent multiple times.
"""
# We only want to de-duplicate creation events, as change events can be validly stacked.
if "created" not in event_type.lower():
return False
entity_id = payload.get("PrimaryKey")
if not entity_id:
return False # Cannot de-duplicate without a key
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM jobs
WHERE event_type = ?
AND entity_id = ?
AND status = 'PENDING'
AND created_at >= ?
""", (event_type, entity_id, five_minutes_ago))
if existing_job := cursor.fetchone():
logger.warning(f"Found PENDING duplicate of job {existing_job[0]} for event '{event_type}' and entity '{entity_id}'. Ignoring new webhook.")
return True
return False
except Exception as e:
logger.error(f"❌ Failed to check for PENDING duplicate jobs for entity '{entity_id}': {e}", exc_info=True)
return False # Fail safe: better to have a duplicate than to miss an event.
def update_entity_name(self, job_id, name, associate_name=None):
with sqlite3.connect(DB_PATH, timeout=30) as conn:
try:
if associate_name:
conn.execute( conn.execute(
"UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?", "UPDATE jobs SET entity_name = ?, associate_name = ?, updated_at = datetime('now') WHERE id = ?",
(str(name), str(associate_name), job_id) (str(name), str(associate_name), job_id)

View File

@@ -34,6 +34,11 @@ async def receive_webhook(request: Request, background_tasks: BackgroundTasks):
event_type = payload.get("Event", "unknown") event_type = payload.get("Event", "unknown")
# --- DEDUPLICATION AT INGRESS (Added March 2026) ---
# Before adding a job, check if an identical one is already pending.
if queue.is_duplicate_pending(event_type, payload):
return {"status": "skipped_duplicate"}
# Add to local Queue # Add to local Queue
queue.add_job(event_type, payload) queue.add_job(event_type, payload)

View File

@@ -243,10 +243,16 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue):
try: try:
auth_tuple = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini")) auth_tuple = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
resp = requests.post(ce_url, json=ce_req, auth=auth_tuple) resp = requests.post(ce_url, json=ce_req, auth=auth_tuple)
if resp.status_code == 404: return ("RETRY", "CE 404") if resp.status_code == 404:
msg = f"Company Explorer API returned 404 for '{crm_name}'"
logger.warning(msg)
return ("RETRY", msg)
resp.raise_for_status() resp.raise_for_status()
provisioning_data = resp.json() provisioning_data = resp.json()
if provisioning_data.get("status") == "processing": return ("RETRY", "CE processing") if provisioning_data.get("status") == "processing":
msg = f"Company Explorer is still processing '{crm_name}'. This is expected for new companies. Will retry."
logger.info(msg)
return ("RETRY", msg)
except Exception as e: except Exception as e:
raise Exception(f"Company Explorer API failed: {e}") raise Exception(f"Company Explorer API failed: {e}")