[31188f42] Keine neuen Commits in dieser Session.
Keine neuen Commits in dieser Session.
This commit is contained in:
@@ -36,62 +36,49 @@ def safe_get_udfs(entity_data):
|
||||
def process_job(job, so_client: SuperOfficeClient):
|
||||
"""
|
||||
Core logic for processing a single job.
|
||||
Returns: (STATUS, MESSAGE)
|
||||
STATUS: 'SUCCESS', 'SKIPPED', 'RETRY', 'FAILED'
|
||||
"""
|
||||
logger.info(f"--- [WORKER v1.8] Processing Job {job['id']} ({job['event_type']}) ---")
|
||||
payload = job['payload']
|
||||
event_low = job['event_type'].lower()
|
||||
|
||||
# 0. Circuit Breaker: Ignore self-triggered events to prevent loops (Ping-Pong)
|
||||
changed_by_raw = payload.get("ChangedByAssociateId")
|
||||
logger.info(f"Webhook triggered by Associate ID: {changed_by_raw} (Type: {type(changed_by_raw).__name__})")
|
||||
|
||||
try:
|
||||
if changed_by_raw and int(changed_by_raw) == 528:
|
||||
logger.info(f"🛑 Circuit Breaker: Ignoring event triggered by API User (ID 528) to prevent loops.")
|
||||
return "SUCCESS"
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# 0b. Noise Reduction: Filter irrelevant field changes
|
||||
# Only re-process if core data (Name, Website) or UDFs (Vertical) changed.
|
||||
# 0. Noise Reduction: Filter irrelevant field changes
|
||||
if job['event_type'] == 'contact.changed':
|
||||
changes = payload.get('Changes', [])
|
||||
# Normalize to lower case for comparison
|
||||
changes_lower = [str(c).lower() for c in changes]
|
||||
|
||||
# Fields that trigger a re-analysis
|
||||
relevant_fields = [
|
||||
'name', # Company Name change -> Full Re-Scan
|
||||
'urladdress', # Website change -> Full Re-Scan
|
||||
'urls', # Website array change
|
||||
'orgnr', # VAT/Register ID change
|
||||
'userdef_id', # UDFs (Verticals, etc.) changed
|
||||
'country_id' # Country change
|
||||
'name', 'urladdress', 'urls', 'orgnr', 'userdef_id', 'country_id'
|
||||
]
|
||||
|
||||
# Check if ANY relevant field is in the changes list
|
||||
is_relevant = any(field in changes_lower for field in relevant_fields)
|
||||
# Identify which relevant field triggered the event
|
||||
hit_fields = [f for f in relevant_fields if f in changes_lower]
|
||||
|
||||
if not is_relevant:
|
||||
logger.info(f"⏭️ Skipping 'contact.changed': No relevant fields changed. (Changes: {changes})")
|
||||
return "SUCCESS"
|
||||
if not hit_fields:
|
||||
msg = f"Skipping 'contact.changed': No relevant fields affected. (Changes: {changes})"
|
||||
logger.info(f"⏭️ {msg}")
|
||||
return ("SKIPPED", msg)
|
||||
else:
|
||||
logger.info(f"🎯 Relevant change detected in fields: {hit_fields}")
|
||||
|
||||
if job['event_type'] == 'person.changed':
|
||||
changes = payload.get('Changes', [])
|
||||
changes_lower = [str(c).lower() for c in changes]
|
||||
|
||||
relevant_person_fields = [
|
||||
'jobtitle', # Job Title change
|
||||
'title', # Alternative title field
|
||||
'position_id', # Standard Position/Role dropdown
|
||||
'userdef_id' # UDFs (MA Status, Campaign, etc.)
|
||||
'jobtitle', 'title', 'position_id', 'userdef_id'
|
||||
]
|
||||
|
||||
is_relevant = any(field in changes_lower for field in relevant_person_fields)
|
||||
hit_fields = [f for f in relevant_person_fields if f in changes_lower]
|
||||
|
||||
if not is_relevant:
|
||||
logger.info(f"⏭️ Skipping 'person.changed': No relevant fields changed. (Changes: {changes})")
|
||||
return "SUCCESS"
|
||||
if not hit_fields:
|
||||
msg = f"Skipping 'person.changed': No relevant fields affected. (Changes: {changes})"
|
||||
logger.info(f"⏭️ {msg}")
|
||||
return ("SKIPPED", msg)
|
||||
else:
|
||||
logger.info(f"🎯 Relevant change detected in fields: {hit_fields}")
|
||||
|
||||
# 1. Extract IDs Early
|
||||
person_id = None
|
||||
@@ -137,10 +124,11 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
|
||||
|
||||
# 2. Noise Reduction Logic
|
||||
# 2. Noise Reduction Logic (Event Type)
|
||||
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
|
||||
logger.info(f"Skipping irrelevant event type: {job['event_type']}")
|
||||
return "SUCCESS"
|
||||
msg = f"Skipping irrelevant event type: {job['event_type']}"
|
||||
logger.info(msg)
|
||||
return ("SKIPPED", msg)
|
||||
|
||||
if not contact_id:
|
||||
raise ValueError(f"Could not identify ContactId in payload: {payload}")
|
||||
@@ -226,11 +214,11 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
try:
|
||||
resp = requests.post(ce_url, json=ce_req, auth=ce_auth)
|
||||
if resp.status_code == 404:
|
||||
return "RETRY"
|
||||
return ("RETRY", "CE returned 404")
|
||||
resp.raise_for_status()
|
||||
provisioning_data = resp.json()
|
||||
if provisioning_data.get("status") == "processing":
|
||||
return "RETRY"
|
||||
return ("RETRY", "CE is processing")
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"Company Explorer API failed: {e}")
|
||||
|
||||
@@ -238,7 +226,7 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
|
||||
# Fetch fresh Contact Data for comparison
|
||||
contact_data = so_client.get_contact(contact_id)
|
||||
if not contact_data: return "FAILED"
|
||||
if not contact_data: return ("FAILED", "Could not fetch contact for patch")
|
||||
|
||||
# SAFE GET FOR COMPARISON
|
||||
current_udfs = safe_get_udfs(contact_data)
|
||||
@@ -354,7 +342,7 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
except Exception as e:
|
||||
logger.error(f"Failed simulation: {e}")
|
||||
|
||||
return "SUCCESS"
|
||||
return ("SUCCESS", None)
|
||||
|
||||
def run_worker():
|
||||
queue = JobQueue()
|
||||
@@ -373,10 +361,18 @@ def run_worker():
|
||||
job = queue.get_next_job()
|
||||
if job:
|
||||
try:
|
||||
result = process_job(job, so_client)
|
||||
if result == "RETRY": queue.retry_job_later(job['id'], delay_seconds=120)
|
||||
elif result == "FAILED": queue.fail_job(job['id'], "Job failed status")
|
||||
else: queue.complete_job(job['id'])
|
||||
# process_job now returns a tuple (STATUS, MESSAGE)
|
||||
status, msg = process_job(job, so_client)
|
||||
|
||||
if status == "RETRY":
|
||||
queue.retry_job_later(job['id'], delay_seconds=120, error_msg=msg)
|
||||
elif status == "FAILED":
|
||||
queue.fail_job(job['id'], msg or "Job failed status")
|
||||
elif status == "SKIPPED":
|
||||
queue.skip_job(job['id'], msg or "Skipped")
|
||||
else:
|
||||
queue.complete_job(job['id'])
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
|
||||
queue.fail_job(job['id'], str(e))
|
||||
|
||||
Reference in New Issue
Block a user