import time import logging import os import requests import json from queue_manager import JobQueue from superoffice_client import SuperOfficeClient from config import settings # Setup Logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("connector-worker") # Poll Interval POLL_INTERVAL = 5 # Seconds def process_job(job, so_client: SuperOfficeClient): """ Core logic for processing a single job. """ logger.info(f"Processing Job {job['id']} ({job['event_type']})") payload = job['payload'] event_low = job['event_type'].lower() # 0. Fast-Fail on Irrelevant Events (Noise Reduction) if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]): logger.info(f"Skipping irrelevant event type: {job['event_type']}") return "SUCCESS" # 0b. Fast-Fail on Irrelevant Field Changes # Only if 'Changes' list is provided by Webhook changes = [c.lower() for c in payload.get("Changes", [])] if changes: # Define what we care about (Strategic triggers for re-evaluation) # Company: Name/Department (Identity), Urls (Source), Numbers (Matching) relevant_contact = ["name", "department", "urladdress", "number1", "number2"] # Person: JobTitle (Persona Logic), Position (Role Logic) relevant_person = ["jobtitle", "position"] is_relevant = False if "contact" in event_low: if any(f in changes for f in relevant_contact): is_relevant = True elif "urls" in changes: # Website might be in Urls collection is_relevant = True if "person" in event_low: if any(f in changes for f in relevant_person): is_relevant = True if not is_relevant: logger.info(f"Skipping irrelevant changes: {changes}") return "SUCCESS" # 1. Extract IDs from Webhook Payload person_id = None contact_id = None if "PersonId" in payload: person_id = int(payload["PersonId"]) elif "PrimaryKey" in payload and "person" in event_low: person_id = int(payload["PrimaryKey"]) if "ContactId" in payload: contact_id = int(payload["ContactId"]) elif "PrimaryKey" in payload and "contact" in event_low: contact_id = int(payload["PrimaryKey"]) # Fallback/Deep Lookup if not contact_id and person_id: person_data = so_client.get_person(person_id) if person_data and "Contact" in person_data: contact_id = person_data["Contact"].get("ContactId") if not contact_id: raise ValueError(f"Could not identify ContactId in payload: {payload}") logger.info(f"Target: Person {person_id}, Contact {contact_id}") # --- Cascading Logic --- # If a company changes, we want to update all its persons eventually. if "contact" in event_low and not person_id: logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.") try: persons = so_client.search(f"Person?$filter=contact/contactId eq {contact_id}") if persons: q = JobQueue() for p in persons: p_id = p.get("PersonId") if p_id: logger.info(f"Cascading: Enqueueing job for Person {p_id}") q.add_job("person.changed", {"PersonId": p_id, "ContactId": contact_id, "Source": "Cascade"}) except Exception as e: logger.warning(f"Failed to cascade to persons for contact {contact_id}: {e}") # 1b. Fetch full contact details for 'Double Truth' check (Master Data Sync) crm_name = None crm_website = None try: contact_details = so_client.get_contact(contact_id) if contact_details: crm_name = contact_details.get("Name") crm_website = contact_details.get("UrlAddress") if not crm_website and "Urls" in contact_details and contact_details["Urls"]: crm_website = contact_details["Urls"][0].get("Value") except Exception as e: logger.warning(f"Failed to fetch contact details for {contact_id}: {e}") # 2. Call Company Explorer Provisioning API ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" ce_req = { "so_contact_id": contact_id, "so_person_id": person_id, "job_title": payload.get("JobTitle"), "crm_name": crm_name, "crm_website": crm_website } # Simple Basic Auth for internal API ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini")) try: resp = requests.post(ce_url, json=ce_req, auth=ce_auth) if resp.status_code == 404: logger.warning(f"Company Explorer returned 404. Retrying later.") return "RETRY" resp.raise_for_status() provisioning_data = resp.json() if provisioning_data.get("status") == "processing": logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.") return "RETRY" except requests.exceptions.RequestException as e: raise Exception(f"Company Explorer API failed: {e}") logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}") # 2b. Sync Vertical to SuperOffice (Company Level) vertical_name = provisioning_data.get("vertical_name") if vertical_name: try: vertical_map = json.loads(settings.VERTICAL_MAP_JSON) except: vertical_map = {} logger.error("Failed to parse VERTICAL_MAP_JSON from config.") vertical_id = vertical_map.get(vertical_name) if vertical_id: logger.info(f"Identified Vertical '{vertical_name}' -> ID {vertical_id}") try: # Check current value to avoid loops current_contact = so_client.get_contact(contact_id) current_udfs = current_contact.get("UserDefinedFields", {}) # Use Config UDF key udf_key = settings.UDF_VERTICAL current_val = current_udfs.get(udf_key, "") # Normalize SO list ID format (e.g., "[I:26]" -> "26") if current_val and current_val.startswith("[I:"): current_val = current_val.split(":")[1].strip("]") if str(current_val) != str(vertical_id): logger.info(f"Updating Contact {contact_id} Vertical: {current_val} -> {vertical_id}") so_client.update_entity_udfs(contact_id, "Contact", {udf_key: str(vertical_id)}) else: logger.info(f"Vertical for Contact {contact_id} already in sync ({vertical_id}).") except Exception as e: logger.error(f"Failed to sync vertical for Contact {contact_id}: {e}") else: logger.warning(f"Vertical '{vertical_name}' not found in internal mapping.") # 2c. Sync Website (Company Level) # TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue) # Re-enable via config if needed if settings.ENABLE_WEBSITE_SYNC: website = provisioning_data.get("website") if website and website != "k.A.": try: contact_data = so_client.get_contact(contact_id) current_url = contact_data.get("UrlAddress", "") def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else "" if norm(current_url) != norm(website): logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}") # Update Urls collection (Rank 1) new_urls = [] if "Urls" in contact_data: found = False for u in contact_data["Urls"]: if u.get("Rank") == 1: u["Value"] = website found = True new_urls.append(u) if not found: new_urls.append({"Value": website, "Rank": 1, "Description": "Website"}) contact_data["Urls"] = new_urls else: contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}] if not current_url: contact_data["UrlAddress"] = website so_client._put(f"Contact/{contact_id}", contact_data) else: logger.info(f"Website for Contact {contact_id} already in sync.") except Exception as e: logger.error(f"Failed to sync website for Contact {contact_id}: {e}") # 2d. Sync Person Position (Role) - if Person exists role_name = provisioning_data.get("role_name") if person_id and role_name: try: persona_map = json.loads(settings.PERSONA_MAP_JSON) except: persona_map = {} logger.error("Failed to parse PERSONA_MAP_JSON from config.") position_id = persona_map.get(role_name) if position_id: logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}") try: success = so_client.update_person_position(person_id, int(position_id)) if not success: logger.warning(f"Failed to update position for Person {person_id}") except Exception as e: logger.error(f"Error syncing position for Person {person_id}: {e}") else: logger.info(f"Role '{role_name}' has no mapped Position ID in config. Skipping update.") # 3. Update SuperOffice Texts (Only if person_id is present) if not person_id: logger.info("Sync complete (Company only). No texts to write back.") return "SUCCESS" texts = provisioning_data.get("texts", {}) if not any(texts.values()): logger.info("No texts returned from Matrix (yet). Skipping write-back.") return "SUCCESS" udf_update = {} if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"] if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"] if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"] if udf_update: # Loop Prevention try: current_person = so_client.get_person(person_id) current_udfs = current_person.get("UserDefinedFields", {}) needs_update = False for key, new_val in udf_update.items(): if current_udfs.get(key, "") != new_val: needs_update = True break if needs_update: logger.info(f"Applying update to Person {person_id} (Changes detected).") success = so_client.update_entity_udfs(person_id, "Person", udf_update) if not success: raise Exception("Failed to update SuperOffice UDFs") else: logger.info(f"Skipping update for Person {person_id}: Values match (Loop Prevention).") except Exception as e: logger.error(f"Error during pre-update check: {e}") raise logger.info("Job successfully processed.") return "SUCCESS" def run_worker(): queue = JobQueue() # Initialize SO Client with retry so_client = None while not so_client: try: so_client = SuperOfficeClient() if not so_client.access_token: # Check if auth worked raise Exception("Auth failed") except Exception as e: logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...") time.sleep(30) logger.info("Worker started. Polling queue...") while True: try: job = queue.get_next_job() if job: try: result = process_job(job, so_client) if result == "RETRY": queue.retry_job_later(job['id'], delay_seconds=120) else: queue.complete_job(job['id']) except Exception as e: logger.error(f"Job {job['id']} failed: {e}", exc_info=True) queue.fail_job(job['id'], str(e)) else: time.sleep(POLL_INTERVAL) except Exception as e: logger.error(f"Worker loop error: {e}") time.sleep(POLL_INTERVAL) if __name__ == "__main__": run_worker()