[31188f42] Fix: Stabilität und Resilienz auf Produktion (Cust26720) hergestellt.
- worker.py: Circuit Breaker implementiert (Ignoriert Associate ID 528), um Ping-Pong-Loops zu verhindern. - worker.py: Resiliente UDF-Behandlung hinzugefügt (behebt 'unhashable type: dict' API-Antwort-Problem). - tools/: Umfangreiche Test- und Diagnose-Suite hinzugefügt. Die Anreicherung für 'Bremer Abenteuerland' wurde erfolgreich verifiziert.
This commit is contained in:
@@ -18,6 +18,21 @@ logger = logging.getLogger("connector-worker")
|
||||
# Poll Interval
|
||||
POLL_INTERVAL = 5 # Seconds
|
||||
|
||||
def safe_get_udfs(entity_data):
|
||||
"""
|
||||
Safely retrieves UserDefinedFields from an entity dictionary.
|
||||
Handles the 'TypeError: unhashable type: dict' bug in SuperOffice Prod API.
|
||||
"""
|
||||
if not entity_data: return {}
|
||||
try:
|
||||
return entity_data.get("UserDefinedFields", {})
|
||||
except TypeError:
|
||||
logger.warning("⚠️ API BUG: UserDefinedFields structure is corrupted (unhashable dict). Treating as empty.")
|
||||
return {}
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading UDFs: {e}")
|
||||
return {}
|
||||
|
||||
def process_job(job, so_client: SuperOfficeClient):
|
||||
"""
|
||||
Core logic for processing a single job.
|
||||
@@ -26,6 +41,17 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
payload = job['payload']
|
||||
event_low = job['event_type'].lower()
|
||||
|
||||
# 0. Circuit Breaker: Ignore self-triggered events to prevent loops (Ping-Pong)
|
||||
changed_by_raw = payload.get("ChangedByAssociateId")
|
||||
logger.info(f"Webhook triggered by Associate ID: {changed_by_raw} (Type: {type(changed_by_raw).__name__})")
|
||||
|
||||
try:
|
||||
if changed_by_raw and int(changed_by_raw) == 528:
|
||||
logger.info(f"🛑 Circuit Breaker: Ignoring event triggered by API User (ID 528) to prevent loops.")
|
||||
return "SUCCESS"
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# 1. Extract IDs Early
|
||||
person_id = None
|
||||
contact_id = None
|
||||
@@ -104,7 +130,9 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
# We fetch the person again specifically for UDFs to ensure we get DisplayTexts
|
||||
person_details = so_client.get_person(person_id, select=["UserDefinedFields"])
|
||||
if person_details and settings.UDF_CAMPAIGN:
|
||||
udfs = person_details.get("UserDefinedFields", {})
|
||||
# SAFE GET
|
||||
udfs = safe_get_udfs(person_details)
|
||||
|
||||
# SuperOffice REST returns DisplayText for lists as 'ProgID:DisplayText'
|
||||
display_key = f"{settings.UDF_CAMPAIGN}:DisplayText"
|
||||
campaign_tag = udfs.get(display_key)
|
||||
@@ -123,7 +151,8 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
logger.warning(f"Could not fetch campaign tag: {e}")
|
||||
|
||||
if settings.UDF_VERTICAL:
|
||||
udfs = contact_details.get("UserDefinedFields", {})
|
||||
# SAFE GET
|
||||
udfs = safe_get_udfs(contact_details)
|
||||
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
|
||||
if so_vertical_val:
|
||||
val_str = str(so_vertical_val).replace("[I:","").replace("]","")
|
||||
@@ -169,6 +198,10 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
# Fetch fresh Contact Data for comparison
|
||||
contact_data = so_client.get_contact(contact_id)
|
||||
if not contact_data: return "FAILED"
|
||||
|
||||
# SAFE GET FOR COMPARISON
|
||||
current_udfs = safe_get_udfs(contact_data)
|
||||
|
||||
contact_patch = {}
|
||||
|
||||
# --- A. Vertical Sync ---
|
||||
@@ -179,7 +212,7 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
vertical_id = vertical_map.get(vertical_name)
|
||||
if vertical_id:
|
||||
udf_key = settings.UDF_VERTICAL
|
||||
current_val = contact_data.get("UserDefinedFields", {}).get(udf_key, "")
|
||||
current_val = current_udfs.get(udf_key, "")
|
||||
if str(current_val).replace("[I:","").replace("]","") != str(vertical_id):
|
||||
logger.info(f"Change detected: Vertical -> {vertical_id}")
|
||||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||||
@@ -208,16 +241,16 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
ce_opener_secondary = provisioning_data.get("opener_secondary")
|
||||
ce_summary = provisioning_data.get("summary")
|
||||
|
||||
if ce_opener and ce_opener != "null" and contact_data.get("UserDefinedFields", {}).get(settings.UDF_OPENER) != ce_opener:
|
||||
if ce_opener and ce_opener != "null" and current_udfs.get(settings.UDF_OPENER) != ce_opener:
|
||||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||||
contact_patch["UserDefinedFields"][settings.UDF_OPENER] = ce_opener
|
||||
if ce_opener_secondary and ce_opener_secondary != "null" and contact_data.get("UserDefinedFields", {}).get(settings.UDF_OPENER_SECONDARY) != ce_opener_secondary:
|
||||
if ce_opener_secondary and ce_opener_secondary != "null" and current_udfs.get(settings.UDF_OPENER_SECONDARY) != ce_opener_secondary:
|
||||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||||
contact_patch["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
|
||||
|
||||
if ce_summary and ce_summary != "null":
|
||||
short_summary = (ce_summary[:132] + "...") if len(ce_summary) > 135 else ce_summary
|
||||
if contact_data.get("UserDefinedFields", {}).get(settings.UDF_SUMMARY) != short_summary:
|
||||
if current_udfs.get(settings.UDF_SUMMARY) != short_summary:
|
||||
logger.info("Change detected: AI Summary")
|
||||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||||
contact_patch["UserDefinedFields"][settings.UDF_SUMMARY] = short_summary
|
||||
|
||||
Reference in New Issue
Block a user