import time import logging import os import requests import json from datetime import datetime from queue_manager import JobQueue from superoffice_client import SuperOfficeClient, ContactNotFoundException from config import settings # Setup Logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("connector-worker") # Poll Interval POLL_INTERVAL = 5 # Seconds def safe_get_udfs(entity_data): """ Safely retrieves UserDefinedFields from an entity dictionary. Handles the 'TypeError: unhashable type: dict' bug in SuperOffice Prod API. """ if not entity_data: return {} try: return entity_data.get("UserDefinedFields", {}) except TypeError: logger.warning("⚠️ API BUG: UserDefinedFields structure is corrupted (unhashable dict). Treating as empty.") return {} except Exception as e: logger.error(f"Error reading UDFs: {e}") return {} def process_job(job, so_client: SuperOfficeClient, queue: JobQueue): """ Core logic for processing a single job. Returns: (STATUS, MESSAGE) STATUS: 'SUCCESS', 'SKIPPED', 'RETRY', 'FAILED' """ logger.info(f"--- [WORKER v1.9.2] Processing Job {job['id']} ({job['event_type']}) ---") payload = job['payload'] event_low = job['event_type'].lower() # --- CIRCUIT BREAKER: STOP INFINITE LOOPS --- # Ignore webhooks triggered by our own API user (Associate 528) changed_by = payload.get("ChangedByAssociateId") if changed_by == 528: msg = f"Skipping Echo: Event was triggered by our own API user (Associate 528)." logger.info(f"⏭️ {msg}") return ("SKIPPED", msg) # -------------------------------------------- # 0. Noise Reduction: Filter irrelevant field changes person_id = None contact_id = None job_title = payload.get("JobTitle") field_values = payload.get("FieldValues", {}) if "person_id" in field_values: person_id = int(field_values["person_id"]) if "contact_id" in field_values: contact_id = int(field_values["contact_id"]) if "title" in field_values and not job_title: job_title = field_values["title"] if not person_id: if "PersonId" in payload: person_id = int(payload["PersonId"]) elif "PrimaryKey" in payload and "person" in event_low: person_id = int(payload["PrimaryKey"]) if not contact_id: if "ContactId" in payload: contact_id = int(payload["ContactId"]) elif "PrimaryKey" in payload and "contact" in event_low: contact_id = int(payload["PrimaryKey"]) # Fallback/Deep Lookup & Fetch JobTitle if missing if person_id and (not job_title or not contact_id): try: person_details = so_client.get_person( person_id, select=["JobTitle", "Title", "Contact/ContactId", "FirstName", "LastName", "UserDefinedFields", "Position"] ) if person_details: if not job_title: job_title = person_details.get("JobTitle") or person_details.get("Title") if not contact_id: contact_obj = person_details.get("Contact") if contact_obj and isinstance(contact_obj, dict): contact_id = contact_obj.get("ContactId") elif "ContactId" in person_details: contact_id = person_details.get("ContactId") except ContactNotFoundException: msg = f"Skipping job because Person ID {person_id} was not found in SuperOffice (likely deleted)." logger.warning(msg) return ("SKIPPED", msg) except Exception as e: logger.warning(f"Failed to fetch person details for {person_id}: {e}") # 2. Noise Reduction Logic (Event Type) if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]): msg = f"Skipping irrelevant event type: {job['event_type']}" logger.info(msg) return ("SKIPPED", msg) if not contact_id: raise ValueError(f"Could not identify ContactId in payload: {payload}") logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}") # 1b. Fetch full contact details for 'Double Truth' check crm_name = None crm_website = None crm_industry_name = None contact_details = None campaign_tag = None try: # Request Associate details explicitly contact_details = so_client.get_contact( contact_id, select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr", "Associate"] ) crm_name = contact_details.get("Name", "Unknown") # Safely get Associate object assoc = contact_details.get("Associate") or {} aid = assoc.get("AssociateId") aname = assoc.get("Name", "").upper().strip() if assoc.get("Name") else "" # PERSIST DETAILS TO DASHBOARD early queue.update_entity_name(job['id'], crm_name, associate_name=aname) # --- ROBOPLANET FILTER LOGIC --- # Check both numerical ID and shortname is_robo = False if aname in settings.ROBOPLANET_WHITELIST: is_robo = True else: try: if aid and int(aid) in settings.ROBOPLANET_WHITELIST: is_robo = True except (ValueError, TypeError): pass if not is_robo: msg = f"Skipped, Wackler. Contact {contact_id} ('{crm_name}'): Owner '{aname}' is not in Roboplanet whitelist." logger.info(f"⏭️ {msg}") return ("SKIPPED", msg) logger.info(f"✅ Filter Passed: Contact '{crm_name}' belongs to Roboplanet Associate '{aname}'.") # ------------------------------- crm_website = contact_details.get("UrlAddress") # --- Fetch Person UDFs for Campaign Tag --- if person_id: try: # We fetch the person again specifically for UDFs to ensure we get DisplayTexts person_details = so_client.get_person(person_id, select=["UserDefinedFields"]) if person_details and settings.UDF_CAMPAIGN: # SAFE GET udfs = safe_get_udfs(person_details) # SuperOffice REST returns DisplayText for lists as 'ProgID:DisplayText' display_key = f"{settings.UDF_CAMPAIGN}:DisplayText" campaign_tag = udfs.get(display_key) if not campaign_tag: # Fallback to manual resolution if DisplayText is missing raw_tag = udfs.get(settings.UDF_CAMPAIGN, "") if raw_tag: campaign_tag = str(raw_tag).strip() if campaign_tag: logger.info(f"🎯 CAMPAIGN DETECTED: '{campaign_tag}'") else: logger.info("ℹ️ No Campaign Tag found (Field is empty).") except ContactNotFoundException: # This is not critical, we can proceed without a campaign tag logger.warning(f"Could not fetch campaign tag: Person {person_id} not found.") except Exception as e: logger.warning(f"Could not fetch campaign tag: {e}") if settings.UDF_VERTICAL: # SAFE GET udfs = safe_get_udfs(contact_details) so_vertical_val = udfs.get(settings.UDF_VERTICAL) if so_vertical_val: val_str = str(so_vertical_val).replace("[I:","").replace("]","") try: vertical_map = json.loads(settings.VERTICAL_MAP_JSON) vertical_map_rev = {str(v): k for k, v in vertical_map.items()} if val_str in vertical_map_rev: crm_industry_name = vertical_map_rev[val_str] except Exception as ex: logger.error(f"Error mapping vertical ID {val_str}: {ex}") except ContactNotFoundException: msg = f"Skipping job because Contact ID {contact_id} was not found in SuperOffice (likely deleted)." logger.warning(msg) return ("SKIPPED", msg) except Exception as e: logger.error(f"Failed to fetch contact details for {contact_id}: {e}") raise Exception(f"SuperOffice API Failure: {e}") # --- 3. Company Explorer Provisioning --- ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" ce_req = { "so_contact_id": contact_id, "so_person_id": person_id, "job_title": job_title, "crm_name": crm_name, "crm_website": crm_website, "crm_industry_name": crm_industry_name, "campaign_tag": campaign_tag } ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini")) try: resp = requests.post(ce_url, json=ce_req, auth=ce_auth) if resp.status_code == 404: return ("RETRY", "CE returned 404") resp.raise_for_status() provisioning_data = resp.json() if provisioning_data.get("status") == "processing": return ("RETRY", "CE is processing") except requests.exceptions.RequestException as e: raise Exception(f"Company Explorer API failed: {e}") logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}") # Fetch fresh Contact Data for comparison contact_data = so_client.get_contact(contact_id) if not contact_data: # This can happen if the contact was deleted between the CE call and now logger.warning(f"Could not re-fetch contact {contact_id} for patch (deleted?). Skipping patch.") return ("SKIPPED", "Contact deleted post-analysis") # SAFE GET FOR COMPARISON current_udfs = safe_get_udfs(contact_data) contact_patch = {} # --- A. Vertical Sync --- vertical_name = provisioning_data.get("vertical_name") if vertical_name: try: vertical_map = json.loads(settings.VERTICAL_MAP_JSON) vertical_id = vertical_map.get(vertical_name) if vertical_id: udf_key = settings.UDF_VERTICAL current_val = current_udfs.get(udf_key, "") if str(current_val).replace("[I:","").replace("]","") != str(vertical_id): logger.info(f"Change detected: Vertical -> {vertical_id}") if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {} contact_patch["UserDefinedFields"][udf_key] = str(vertical_id) except Exception as e: logger.error(f"Vertical sync error: {e}") # --- B. Address & VAT Sync --- ce_city = provisioning_data.get("address_city") ce_street = provisioning_data.get("address_street") ce_zip = provisioning_data.get("address_zip") ce_vat = provisioning_data.get("vat_id") if ce_city or ce_street or ce_zip: for type_key in ["Postal", "Street"]: cur_addr = (contact_data.get("Address") or {}).get(type_key, {}) if ce_city and cur_addr.get("City") != ce_city: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["City"] = ce_city if ce_street and cur_addr.get("Address1") != ce_street: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Address1"] = ce_street if ce_zip and cur_addr.get("Zipcode") != ce_zip: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Zipcode"] = ce_zip if ce_vat and contact_data.get("OrgNr") != ce_vat: contact_patch["OrgNr"] = ce_vat # --- C. AI Openers & Summary Sync --- ce_opener = provisioning_data.get("opener") ce_opener_secondary = provisioning_data.get("opener_secondary") ce_summary = provisioning_data.get("summary") if ce_opener and ce_opener != "null" and current_udfs.get(settings.UDF_OPENER) != ce_opener: if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {} contact_patch["UserDefinedFields"][settings.UDF_OPENER] = ce_opener if ce_opener_secondary and ce_opener_secondary != "null" and current_udfs.get(settings.UDF_OPENER_SECONDARY) != ce_opener_secondary: if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {} contact_patch["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary if ce_summary and ce_summary != "null": short_summary = (ce_summary[:132] + "...") if len(ce_summary) > 135 else ce_summary if current_udfs.get(settings.UDF_SUMMARY) != short_summary: logger.info("Change detected: AI Summary") if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {} contact_patch["UserDefinedFields"][settings.UDF_SUMMARY] = short_summary # --- D. Timestamps & Website Sync --- if settings.UDF_LAST_UPDATE: now_so = f"[D:{datetime.now().strftime('%m/%d/%Y %H:%M:%S')}]" if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {} contact_patch["UserDefinedFields"][settings.UDF_LAST_UPDATE] = now_so ce_website = provisioning_data.get("website") if ce_website and (not contact_data.get("Urls") or settings.ENABLE_WEBSITE_SYNC): current_urls = contact_data.get("Urls") or [] if not any(u.get("Value") == ce_website for u in current_urls): logger.info(f"Syncing Website: {ce_website}") if "Urls" not in contact_patch: contact_patch["Urls"] = [] contact_patch["Urls"] = [{"Value": ce_website, "Description": "AI Discovered"}] + current_urls # --- E. Apply Updates (Single PATCH) --- if contact_patch: logger.info(f"Pushing combined PATCH for Contact {contact_id}: {list(contact_patch.keys())}") so_client.patch_contact(contact_id, contact_patch) logger.info("✅ Contact Update Successful.") # 2d. Sync Person Position role_name = provisioning_data.get("role_name") if person_id and role_name: try: persona_map = json.loads(settings.PERSONA_MAP_JSON) position_id = persona_map.get(role_name) if position_id: so_client.update_person_position(person_id, int(position_id)) except Exception as e: logger.error(f"Error syncing position: {e}") # 3. Update SuperOffice Texts (Person) if person_id: texts = provisioning_data.get("texts", {}) unsubscribe_link = provisioning_data.get("unsubscribe_link") udf_update = {} if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"] if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"] if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"] if unsubscribe_link and settings.UDF_UNSUBSCRIBE_LINK: udf_update[settings.UDF_UNSUBSCRIBE_LINK] = unsubscribe_link if udf_update: logger.info(f"Applying text update to Person {person_id}.") so_client.update_entity_udfs(person_id, "Person", udf_update) # --- 4. Create Email Simulation Appointment --- try: opener = provisioning_data.get("opener") or "" intro = texts.get("intro") or "" proof = texts.get("social_proof") or "" subject = texts.get("subject", "No Subject") email_body = f"Betreff: {subject}\n\n{opener}\n\n{intro}\n\n{proof}\n\n(Generated via Gemini Marketing Engine)" so_client.create_appointment(f"KI: {subject}", email_body, contact_id, person_id) except Exception as e: logger.error(f"Failed simulation: {e}") return ("SUCCESS", None) def run_worker(): queue = JobQueue() so_client = None while not so_client: try: so_client = SuperOfficeClient() if not so_client.access_token: raise Exception("Auth failed") except Exception as e: logger.critical(f"Failed to initialize SO Client. Retrying in 30s...") time.sleep(30) logger.info("Worker started. Polling queue...") while True: try: job = queue.get_next_job() if job: try: # process_job now takes (job, client, queue) status, msg = process_job(job, so_client, queue) if status == "RETRY": queue.retry_job_later(job['id'], delay_seconds=120, error_msg=msg) elif status == "FAILED": queue.fail_job(job['id'], msg or "Job failed status") elif status == "SKIPPED": queue.skip_job(job['id'], msg or "Skipped") else: queue.complete_job(job['id']) except Exception as e: logger.error(f"Job {job['id']} failed: {e}", exc_info=True) queue.fail_job(job['id'], str(e)) else: time.sleep(POLL_INTERVAL) except Exception as e: logger.error(f"Worker loop error: {e}") time.sleep(POLL_INTERVAL) if __name__ == "__main__": run_worker()