fix(connector): [31e88f42] Implement de-duplication for contact.created
SuperOffice sends two 'contact.created' webhooks for a single new contact. This caused the connector to process the same entity twice, leading to duplicate entries and logs. This commit introduces a de-duplication shield in the worker: - A new method is added to to check for jobs with the same company name that are either 'PROCESSING' or 'COMPLETED' within the last 5 minutes. - The worker now fetches the company name upon receiving a job, updates the job record with the name, and then calls the new de-duplication method. - If a duplicate event is detected, the job is skipped, preventing redundant processing.
This commit is contained in:
@@ -185,6 +185,31 @@ class JobQueue:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"❌ CRITICAL: Failed to set job {job_id} to FAILED: {e}", exc_info=True)
|
logger.critical(f"❌ CRITICAL: Failed to set job {job_id} to FAILED: {e}", exc_info=True)
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
|
|
||||||
|
def check_for_recent_duplicate(self, entity_name: str, current_job_id: int) -> bool:
|
||||||
|
"""
|
||||||
|
Checks if another job with the same entity_name has been processed or is processing recently.
|
||||||
|
"""
|
||||||
|
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
|
||||||
|
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||||
|
try:
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("""
|
||||||
|
SELECT id FROM jobs
|
||||||
|
WHERE entity_name = ?
|
||||||
|
AND id != ?
|
||||||
|
AND status IN ('COMPLETED', 'PROCESSING')
|
||||||
|
AND created_at >= ?
|
||||||
|
""", (entity_name, current_job_id, five_minutes_ago))
|
||||||
|
|
||||||
|
if cursor.fetchone():
|
||||||
|
logger.warning(f"Found recent duplicate job for entity '{entity_name}' (related to job {current_job_id}).")
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ Failed to check for duplicate jobs for entity '{entity_name}': {e}", exc_info=True)
|
||||||
|
return False # Fail safe
|
||||||
|
|
||||||
def get_stats(self):
|
def get_stats(self):
|
||||||
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
with sqlite3.connect(DB_PATH, timeout=30) as conn:
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
|
|||||||
@@ -189,6 +189,13 @@ def process_job(job, so_client: SuperOfficeClient, queue: JobQueue):
|
|||||||
assoc = contact_details.get("Associate") or {}
|
assoc = contact_details.get("Associate") or {}
|
||||||
aname = assoc.get("Name", "").upper().strip()
|
aname = assoc.get("Name", "").upper().strip()
|
||||||
queue.update_entity_name(job['id'], crm_name, associate_name=aname)
|
queue.update_entity_name(job['id'], crm_name, associate_name=aname)
|
||||||
|
|
||||||
|
# --- DE-DUPLICATION SHIELD (Added March 2026) ---
|
||||||
|
if "contact.created" in event_low:
|
||||||
|
if queue.check_for_recent_duplicate(crm_name, job['id']):
|
||||||
|
msg = f"Duplicate 'contact.created' event for '{crm_name}'. This job will be skipped."
|
||||||
|
logger.info(f"🛡️ {msg}")
|
||||||
|
return ("SKIPPED", msg)
|
||||||
|
|
||||||
# ROBOPLANET FILTER
|
# ROBOPLANET FILTER
|
||||||
is_robo = False
|
is_robo = False
|
||||||
|
|||||||
Reference in New Issue
Block a user