diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py index 1fa3bab0..9eda3be0 100644 --- a/connector-superoffice/worker.py +++ b/connector-superoffice/worker.py @@ -40,303 +40,378 @@ def clean_text_for_so(text, limit=200): return str(text).strip()[:limit] def process_job(job, so_client: SuperOfficeClient, queue: JobQueue): + """ + Core logic for processing a single job. + Returns: (STATUS, MESSAGE) + STATUS: 'SUCCESS', 'SKIPPED', 'DELETED', 'RETRY', 'FAILED' + """ - logger.info(f"--- [WORKER v1.9.3] Processing Job {job['id']} ({job['event_type']}) ---") + + logger.info(f"--- [WORKER v1.9.5] Processing Job {job['id']} ({job['event_type']}) ---") + payload = job['payload'] + event_low = job['event_type'].lower() + + # --- CIRCUIT BREAKER: DETECT ECHOES --- - # We log if the event was triggered by our own API user (Associate 528) - # but we NO LONGER SKIP IT, to allow manual changes by the user who shares the same ID. - # The idempotency logic below will prevent infinite loops. + changed_by = payload.get("ChangedByAssociateId") + if changed_by == 528: + logger.info(f"ℹ️ Potential Echo: Event triggered by Associate 528. Proceeding to check for meaningful changes.") + # -------------------------------------------- + + # 0. ID Extraction & Early Exit for irrelevant jobs + person_id = None + contact_id = None + job_title = payload.get("JobTitle") + + field_values = payload.get("FieldValues", {}) + if "person_id" in field_values: + person_id = int(field_values["person_id"]) + if "contact_id" in field_values: + contact_id = int(field_values["contact_id"]) + if "title" in field_values and not job_title: + job_title = field_values["title"] + + if not person_id: + if "PersonId" in payload: + person_id = int(payload["PersonId"]) + elif "PrimaryKey" in payload and "person" in event_low: + person_id = int(payload["PrimaryKey"]) + + if not contact_id: + if "ContactId" in payload: + contact_id = int(payload["ContactId"]) + elif "PrimaryKey" in payload and "contact" in event_low: + contact_id = int(payload["PrimaryKey"]) - # If after all checks, we have no ID, we can't process it. + + if not person_id and not contact_id: - msg = f"Skipping job: No ContactId or PersonId could be identified in the payload." + + msg = f"Skipping job: No ContactId or PersonId identified." + logger.warning(msg) + return ("SKIPPED", msg) - # Fallback/Deep Lookup & Fetch JobTitle if missing + + + # Fallback Lookup + if person_id and (not job_title or not contact_id): + try: - person_details = so_client.get_person( - person_id, - select=["JobTitle", "Title", "Contact/ContactId", "FirstName", "LastName", "UserDefinedFields", "Position"] - ) + + person_details = so_client.get_person(person_id, select=["JobTitle", "Title", "Contact/ContactId"]) + if person_details: - if not job_title: - job_title = person_details.get("JobTitle") or person_details.get("Title") + + if not job_title: job_title = person_details.get("JobTitle") or person_details.get("Title") + if not contact_id: + contact_obj = person_details.get("Contact") - if contact_obj and isinstance(contact_obj, dict): - contact_id = contact_obj.get("ContactId") - elif "ContactId" in person_details: - contact_id = person_details.get("ContactId") + + if contact_obj and isinstance(contact_obj, dict): contact_id = contact_obj.get("ContactId") + except ContactNotFoundException: - msg = f"Skipping job because Person ID {person_id} was not found in SuperOffice (likely deleted)." - logger.warning(msg) - return ("DELETED", msg) + + return ("DELETED", f"Person {person_id} not found.") + except Exception as e: + logger.warning(f"Failed to fetch person details for {person_id}: {e}") - # 2. Noise Reduction Logic (Event Type) - if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]): - msg = f"Skipping irrelevant event type: {job['event_type']}" - logger.info(msg) - return ("SKIPPED", msg) - if not contact_id: - raise ValueError(f"Could not identify ContactId in payload: {payload}") + + if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]): + + return ("SKIPPED", f"Irrelevant event type: {job['event_type']}") + + + + if not contact_id: raise ValueError(f"No ContactId found.") + + logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}") - # 1b. Fetch full contact details for 'Double Truth' check - crm_name = None - crm_website = None - crm_industry_name = None - contact_details = None - campaign_tag = None + + + # 1b. Fetch full contact details + + crm_name, crm_website, crm_industry_name, contact_details, campaign_tag = None, None, None, None, None + + try: - # Request Associate details explicitly - contact_details = so_client.get_contact( - contact_id, - select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr", "Associate"] - ) - - crm_name = contact_details.get("Name", "Unknown") - - # Safely get Associate object - assoc = contact_details.get("Associate") or {} - aid = assoc.get("AssociateId") - aname = assoc.get("Name", "").upper().strip() if assoc.get("Name") else "" - # PERSIST DETAILS TO DASHBOARD early + contact_details = so_client.get_contact(contact_id, select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr", "Associate"]) + + crm_name = contact_details.get("Name", "Unknown") + + assoc = contact_details.get("Associate") or {} + + aname = assoc.get("Name", "").upper().strip() + queue.update_entity_name(job['id'], crm_name, associate_name=aname) + - # --- ROBOPLANET FILTER LOGIC --- - - # Check both numerical ID and shortname - is_robo = False - if aname in settings.ROBOPLANET_WHITELIST: - is_robo = True - else: - try: - if aid and int(aid) in settings.ROBOPLANET_WHITELIST: - is_robo = True - except (ValueError, TypeError): - pass + + if aname not in settings.ROBOPLANET_WHITELIST and assoc.get("AssociateId") not in settings.ROBOPLANET_WHITELIST: + + return ("SKIPPED", f"Owner '{aname}' not in Roboplanet whitelist.") + - if not is_robo: - msg = f"WACKLER FILTER: Contact {contact_id} ('{crm_name}') belongs to Associate '{aname}' (not in whitelist). Skipping." - logger.info(f"⏭️ {msg}") - return ("SKIPPED", msg) - - logger.info(f"✅ Filter Passed: Contact '{crm_name}' belongs to Roboplanet Associate '{aname}'.") - # ------------------------------- crm_website = contact_details.get("UrlAddress") - - # --- Fetch Person UDFs for Campaign Tag --- + + + + # Campaign Tag + if person_id: + try: - # We fetch the person again specifically for UDFs to ensure we get DisplayTexts + person_details = so_client.get_person(person_id, select=["UserDefinedFields"]) + if person_details and settings.UDF_CAMPAIGN: - # SAFE GET + udfs = safe_get_udfs(person_details) - - # SuperOffice REST returns DisplayText for lists as 'ProgID:DisplayText' - display_key = f"{settings.UDF_CAMPAIGN}:DisplayText" - campaign_tag = udfs.get(display_key) - - if not campaign_tag: - # Fallback to manual resolution if DisplayText is missing - raw_tag = udfs.get(settings.UDF_CAMPAIGN, "") - if raw_tag: - campaign_tag = str(raw_tag).strip() - - if campaign_tag: - logger.info(f"🎯 CAMPAIGN DETECTED: '{campaign_tag}'") - else: - logger.info("ℹ️ No Campaign Tag found (Field is empty).") - except ContactNotFoundException: - # This is not critical, we can proceed without a campaign tag - logger.warning(f"Could not fetch campaign tag: Person {person_id} not found.") - except Exception as e: - logger.warning(f"Could not fetch campaign tag: {e}") + + campaign_tag = udfs.get(f"{settings.UDF_CAMPAIGN}:DisplayText") or udfs.get(settings.UDF_CAMPAIGN) + + except Exception: pass + + + + # Current Vertical if settings.UDF_VERTICAL: - # SAFE GET + udfs = safe_get_udfs(contact_details) + so_vertical_val = udfs.get(settings.UDF_VERTICAL) + if so_vertical_val: + val_str = str(so_vertical_val).replace("[I:","").replace("]","") + try: - vertical_map = json.loads(settings.VERTICAL_MAP_JSON) - vertical_map_rev = {str(v): k for k, v in vertical_map.items()} - if val_str in vertical_map_rev: - crm_industry_name = vertical_map_rev[val_str] - except Exception as ex: - logger.error(f"Error mapping vertical ID {val_str}: {ex}") - + + v_map = json.loads(settings.VERTICAL_MAP_JSON) + + crm_industry_name = {str(v): k for k, v in v_map.items()}.get(val_str) + + except Exception: pass + + + except ContactNotFoundException: - msg = f"Skipping job because Contact ID {contact_id} was not found in SuperOffice (likely deleted)." - logger.warning(msg) - return ("DELETED", msg) + + return ("DELETED", f"Contact {contact_id} not found.") + except Exception as e: - logger.error(f"Failed to fetch contact details for {contact_id}: {e}") + raise Exception(f"SuperOffice API Failure: {e}") + # --- 3. Company Explorer Provisioning --- + ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" - ce_req = { - "so_contact_id": contact_id, - "so_person_id": person_id, - "job_title": job_title, - "crm_name": crm_name, - "crm_website": crm_website, - "crm_industry_name": crm_industry_name, - "campaign_tag": campaign_tag - } - - ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini")) + + ce_req = {"so_contact_id": contact_id, "so_person_id": person_id, "job_title": job_title, "crm_name": crm_name, "crm_website": crm_website, "crm_industry_name": crm_industry_name, "campaign_tag": campaign_tag} + + try: - resp = requests.post(ce_url, json=ce_req, auth=ce_auth) - if resp.status_code == 404: - return ("RETRY", "CE returned 404") + + resp = requests.post(ce_url, json=ce_req, auth=(os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))) + + if resp.status_code == 404: return ("RETRY", "CE 404") + resp.raise_for_status() + provisioning_data = resp.json() - if provisioning_data.get("status") == "processing": - return ("RETRY", "CE is processing") - except requests.exceptions.RequestException as e: + + if provisioning_data.get("status") == "processing": return ("RETRY", "CE processing") + + except Exception as e: + raise Exception(f"Company Explorer API failed: {e}") - logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}") - # Fetch fresh Contact Data for comparison + + # Fetch fresh Contact for comparison + contact_data = so_client.get_contact(contact_id) - if not contact_data: - # This can happen if the contact was deleted between the CE call and now - logger.warning(f"Could not re-fetch contact {contact_id} for patch (deleted?). Skipping patch.") - return ("SKIPPED", "Contact deleted post-analysis") - - # SAFE GET FOR COMPARISON + + if not contact_data: return ("SKIPPED", "Contact deleted post-analysis") + current_udfs = safe_get_udfs(contact_data) - + contact_patch = {} + + # --- A. Vertical Sync --- + vertical_name = provisioning_data.get("vertical_name") + if vertical_name: - try: - vertical_map = json.loads(settings.VERTICAL_MAP_JSON) - vertical_id = vertical_map.get(vertical_name) - if vertical_id: - udf_key = settings.UDF_VERTICAL - current_val = current_udfs.get(udf_key, "") - if str(current_val).replace("[I:","").replace("]","") != str(vertical_id): - logger.info(f"Change detected: Vertical -> {vertical_id}") - if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {} - contact_patch["UserDefinedFields"][udf_key] = str(vertical_id) - except Exception as e: - logger.error(f"Vertical sync error: {e}") + + v_map = json.loads(settings.VERTICAL_MAP_JSON) + + vertical_id = v_map.get(vertical_name) + + if vertical_id: + + current_val = str(current_udfs.get(settings.UDF_VERTICAL, "")).replace("[I:","").replace("]","") + + logger.info(f"Checking Vertical: CRM='{current_val}' vs AI='{vertical_id}'") + + if current_val != str(vertical_id): + + logger.info(f" -> UPDATE: Vertical") + + contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_VERTICAL] = str(vertical_id) + + # --- B. Address & VAT Sync --- - ce_city = provisioning_data.get("address_city") - ce_street = provisioning_data.get("address_street") - ce_zip = provisioning_data.get("address_zip") - ce_vat = provisioning_data.get("vat_id") - + + ce_city, ce_street, ce_zip, ce_vat = provisioning_data.get("address_city"), provisioning_data.get("address_street"), provisioning_data.get("address_zip"), provisioning_data.get("vat_id") + if ce_city or ce_street or ce_zip: + for type_key in ["Postal", "Street"]: + cur_addr = (contact_data.get("Address") or {}).get(type_key, {}) + + logger.info(f"Checking Address ({type_key}): CRM='{cur_addr.get('City')}' vs AI='{ce_city}'") + if ce_city and cur_addr.get("City") != ce_city: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["City"] = ce_city + if ce_street and cur_addr.get("Address1") != ce_street: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Address1"] = ce_street + if ce_zip and cur_addr.get("Zipcode") != ce_zip: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Zipcode"] = ce_zip + + if ce_vat and contact_data.get("OrgNr") != ce_vat: + + logger.info(f"Checking VAT: CRM='{contact_data.get('OrgNr')}' vs AI='{ce_vat}'") + contact_patch["OrgNr"] = ce_vat + + # --- C. AI Openers & Summary Sync --- - # TRUNCATION TO 200 CHARS TO PREVENT INFINITE LOOPS (SO UDF LIMITS) - ce_opener = clean_text_for_so(provisioning_data.get("opener"), limit=200) - ce_opener_secondary = clean_text_for_so(provisioning_data.get("opener_secondary"), limit=200) - ce_summary = clean_text_for_so(provisioning_data.get("summary"), limit=132) # Summary is very short in SO + + p_opener = clean_text_for_so(provisioning_data.get("opener"), 200) + + s_opener = clean_text_for_so(provisioning_data.get("opener_secondary"), 200) + + ai_sum = clean_text_for_so(provisioning_data.get("summary"), 132) + - if ce_opener and current_udfs.get(settings.UDF_OPENER) != ce_opener: - logger.info(f"Change detected: Opener Primary (Length: {len(ce_opener)})") - if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {} - contact_patch["UserDefinedFields"][settings.UDF_OPENER] = ce_opener + + logger.info(f"Checking Opener Primary: CRM='{current_udfs.get(settings.UDF_OPENER)}' vs AI='{p_opener}'") + + if p_opener and current_udfs.get(settings.UDF_OPENER) != p_opener: + + contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER] = p_opener + - if ce_opener_secondary and current_udfs.get(settings.UDF_OPENER_SECONDARY) != ce_opener_secondary: - logger.info(f"Change detected: Opener Secondary (Length: {len(ce_opener_secondary)})") - if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {} - contact_patch["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary - if ce_summary and current_udfs.get(settings.UDF_SUMMARY) != ce_summary: - logger.info(f"Change detected: AI Summary (Length: {len(ce_summary)})") - if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {} - contact_patch["UserDefinedFields"][settings.UDF_SUMMARY] = ce_summary + logger.info(f"Checking Opener Secondary: CRM='{current_udfs.get(settings.UDF_OPENER_SECONDARY)}' vs AI='{s_opener}'") + + if s_opener and current_udfs.get(settings.UDF_OPENER_SECONDARY) != s_opener: + + contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER_SECONDARY] = s_opener + + + + logger.info(f"Checking AI Summary: CRM='{current_udfs.get(settings.UDF_SUMMARY)}' vs AI='{ai_sum}'") + + if ai_sum and current_udfs.get(settings.UDF_SUMMARY) != ai_sum: + + contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_SUMMARY] = ai_sum + + + + # --- D. Timestamps --- - # --- D. Timestamps & Website Sync --- - # CRITICAL: We only update the timestamp if we actually have OTHER changes to push. - # Otherwise, we create an infinite loop of self-triggering webhooks. if settings.UDF_LAST_UPDATE and contact_patch: + now_so = f"[D:{datetime.now().strftime('%m/%d/%Y %H:%M:%S')}]" - if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {} - contact_patch["UserDefinedFields"][settings.UDF_LAST_UPDATE] = now_so - ce_website = provisioning_data.get("website") - if ce_website and (not contact_data.get("Urls") or settings.ENABLE_WEBSITE_SYNC): + contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_LAST_UPDATE] = now_so + + + + if ce_website := provisioning_data.get("website"): + current_urls = contact_data.get("Urls") or [] - if not any(u.get("Value") == ce_website for u in current_urls): - logger.info(f"Syncing Website: {ce_website}") - if "Urls" not in contact_patch: contact_patch["Urls"] = [] - contact_patch["Urls"] = [{"Value": ce_website, "Description": "AI Discovered"}] + current_urls - # --- E. Apply Updates (Single PATCH) --- + if not any(u.get("Value") == ce_website for u in current_urls): + + contact_patch.setdefault("Urls", []).append({"Value": ce_website, "Description": "AI Discovered"}) + + + if contact_patch: - logger.info(f"Pushing combined PATCH for Contact {contact_id}: {list(contact_patch.keys())}") + + logger.info(f"🚀 Pushing combined PATCH for Contact {contact_id}: {list(contact_patch.get('UserDefinedFields', {}).keys())}") + so_client.patch_contact(contact_id, contact_patch) - logger.info("✅ Contact Update Successful.") - else: - logger.info(f"ℹ️ No changes detected for Contact {contact_id}. Skipping PATCH.") + + return ("SUCCESS", "Contact updated") + + + + logger.info(f"✅ No changes detected for Contact {contact_id}.") + + return ("SUCCESS", "Idempotent - no changes needed") + + # 2d. Sync Person Position role_name = provisioning_data.get("role_name")