[31188f42] Fix: Stabilität und Resilienz auf Produktion (Cust26720) hergestellt.

- worker.py: Circuit Breaker implementiert (Ignoriert Associate ID 528), um Ping-Pong-Loops zu verhindern.
- worker.py: Resiliente UDF-Behandlung hinzugefügt (behebt 'unhashable type: dict' API-Antwort-Problem).
- tools/: Umfangreiche Test- und Diagnose-Suite hinzugefügt.
Die Anreicherung für 'Bremer Abenteuerland' wurde erfolgreich verifiziert.
This commit is contained in:
2026-03-04 16:46:16 +00:00
parent 81921e144d
commit f486b981e9
5 changed files with 184 additions and 55 deletions

View File

@@ -18,6 +18,21 @@ logger = logging.getLogger("connector-worker")
# Poll Interval
POLL_INTERVAL = 5 # Seconds
def safe_get_udfs(entity_data):
"""
Safely retrieves UserDefinedFields from an entity dictionary.
Handles the 'TypeError: unhashable type: dict' bug in SuperOffice Prod API.
"""
if not entity_data: return {}
try:
return entity_data.get("UserDefinedFields", {})
except TypeError:
logger.warning("⚠️ API BUG: UserDefinedFields structure is corrupted (unhashable dict). Treating as empty.")
return {}
except Exception as e:
logger.error(f"Error reading UDFs: {e}")
return {}
def process_job(job, so_client: SuperOfficeClient):
"""
Core logic for processing a single job.
@@ -26,6 +41,17 @@ def process_job(job, so_client: SuperOfficeClient):
payload = job['payload']
event_low = job['event_type'].lower()
# 0. Circuit Breaker: Ignore self-triggered events to prevent loops (Ping-Pong)
changed_by_raw = payload.get("ChangedByAssociateId")
logger.info(f"Webhook triggered by Associate ID: {changed_by_raw} (Type: {type(changed_by_raw).__name__})")
try:
if changed_by_raw and int(changed_by_raw) == 528:
logger.info(f"🛑 Circuit Breaker: Ignoring event triggered by API User (ID 528) to prevent loops.")
return "SUCCESS"
except (ValueError, TypeError):
pass
# 1. Extract IDs Early
person_id = None
contact_id = None
@@ -104,7 +130,9 @@ def process_job(job, so_client: SuperOfficeClient):
# We fetch the person again specifically for UDFs to ensure we get DisplayTexts
person_details = so_client.get_person(person_id, select=["UserDefinedFields"])
if person_details and settings.UDF_CAMPAIGN:
udfs = person_details.get("UserDefinedFields", {})
# SAFE GET
udfs = safe_get_udfs(person_details)
# SuperOffice REST returns DisplayText for lists as 'ProgID:DisplayText'
display_key = f"{settings.UDF_CAMPAIGN}:DisplayText"
campaign_tag = udfs.get(display_key)
@@ -123,7 +151,8 @@ def process_job(job, so_client: SuperOfficeClient):
logger.warning(f"Could not fetch campaign tag: {e}")
if settings.UDF_VERTICAL:
udfs = contact_details.get("UserDefinedFields", {})
# SAFE GET
udfs = safe_get_udfs(contact_details)
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
if so_vertical_val:
val_str = str(so_vertical_val).replace("[I:","").replace("]","")
@@ -169,6 +198,10 @@ def process_job(job, so_client: SuperOfficeClient):
# Fetch fresh Contact Data for comparison
contact_data = so_client.get_contact(contact_id)
if not contact_data: return "FAILED"
# SAFE GET FOR COMPARISON
current_udfs = safe_get_udfs(contact_data)
contact_patch = {}
# --- A. Vertical Sync ---
@@ -179,7 +212,7 @@ def process_job(job, so_client: SuperOfficeClient):
vertical_id = vertical_map.get(vertical_name)
if vertical_id:
udf_key = settings.UDF_VERTICAL
current_val = contact_data.get("UserDefinedFields", {}).get(udf_key, "")
current_val = current_udfs.get(udf_key, "")
if str(current_val).replace("[I:","").replace("]","") != str(vertical_id):
logger.info(f"Change detected: Vertical -> {vertical_id}")
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
@@ -208,16 +241,16 @@ def process_job(job, so_client: SuperOfficeClient):
ce_opener_secondary = provisioning_data.get("opener_secondary")
ce_summary = provisioning_data.get("summary")
if ce_opener and ce_opener != "null" and contact_data.get("UserDefinedFields", {}).get(settings.UDF_OPENER) != ce_opener:
if ce_opener and ce_opener != "null" and current_udfs.get(settings.UDF_OPENER) != ce_opener:
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_OPENER] = ce_opener
if ce_opener_secondary and ce_opener_secondary != "null" and contact_data.get("UserDefinedFields", {}).get(settings.UDF_OPENER_SECONDARY) != ce_opener_secondary:
if ce_opener_secondary and ce_opener_secondary != "null" and current_udfs.get(settings.UDF_OPENER_SECONDARY) != ce_opener_secondary:
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
if ce_summary and ce_summary != "null":
short_summary = (ce_summary[:132] + "...") if len(ce_summary) > 135 else ce_summary
if contact_data.get("UserDefinedFields", {}).get(settings.UDF_SUMMARY) != short_summary:
if current_udfs.get(settings.UDF_SUMMARY) != short_summary:
logger.info("Change detected: AI Summary")
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_SUMMARY] = short_summary