import time import logging import os import requests import json from queue_manager import JobQueue from superoffice_client import SuperOfficeClient from config import settings # Setup Logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("connector-worker") # Poll Interval POLL_INTERVAL = 5 # Seconds def process_job(job, so_client: SuperOfficeClient): """ Core logic for processing a single job. """ logger.info(f"Processing Job {job['id']} ({job['event_type']})") payload = job['payload'] event_low = job['event_type'].lower() # 1. Extract IDs Early (Crucial for logging and logic) person_id = None contact_id = None job_title = payload.get("JobTitle") # Try getting IDs from FieldValues (more reliable for Webhooks) field_values = payload.get("FieldValues", {}) if "person_id" in field_values: person_id = int(field_values["person_id"]) if "contact_id" in field_values: contact_id = int(field_values["contact_id"]) if "title" in field_values and not job_title: job_title = field_values["title"] # Fallback to older payload structure if not found if not person_id: if "PersonId" in payload: person_id = int(payload["PersonId"]) elif "PrimaryKey" in payload and "person" in event_low: person_id = int(payload["PrimaryKey"]) if not contact_id: if "ContactId" in payload: contact_id = int(payload["ContactId"]) elif "PrimaryKey" in payload and "contact" in event_low: contact_id = int(payload["PrimaryKey"]) # Fallback/Deep Lookup & Fetch JobTitle if missing # Only fetch if we are missing critical info AND have a person_id if person_id and (not job_title or not contact_id): try: person_details = so_client.get_person( person_id, select=["JobTitle", "Title", "Contact/ContactId", "FirstName", "LastName", "UserDefinedFields", "Position"] ) if person_details: if not job_title: job_title = person_details.get("JobTitle") or person_details.get("Title") # Robust extraction of ContactId if not contact_id: contact_obj = person_details.get("Contact") if contact_obj and isinstance(contact_obj, dict): contact_id = contact_obj.get("ContactId") elif "ContactId" in person_details: # Sometimes flat contact_id = person_details.get("ContactId") except Exception as e: logger.warning(f"Failed to fetch person details for {person_id}: {e}") # 2. Noise Reduction Logic if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]): logger.info(f"Skipping irrelevant event type: {job['event_type']}") return "SUCCESS" changes = [c.lower() for c in payload.get("Changes", [])] if changes: relevant_contact = ["name", "department", "urladdress", "number1", "number2", "userdefinedfields"] if settings.UDF_VERTICAL: relevant_contact.append(settings.UDF_VERTICAL.lower()) relevant_person = ["jobtitle", "position", "title", "userdefinedfields", "person_id"] technical_fields = ["updated", "updated_associate_id", "contact_id", "person_id", "registered", "registered_associate_id"] actual_changes = [c for c in changes if c not in technical_fields] is_relevant = False if "contact" in event_low: logger.info(f"Checking relevance for Contact {contact_id or 'Unknown'}. Changes: {actual_changes}") if any(f in actual_changes for f in relevant_contact): is_relevant = True elif "urls" in actual_changes: is_relevant = True if "person" in event_low: logger.info(f"Checking relevance for Person {person_id or 'Unknown'}. Changes: {actual_changes}") if any(f in actual_changes for f in relevant_person): is_relevant = True if not is_relevant: logger.info(f"Skipping technical/irrelevant changes: {changes}") return "SUCCESS" else: logger.info("Change is deemed RELEVANT. Proceeding...") if not contact_id: raise ValueError(f"Could not identify ContactId in payload: {payload}") logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}") # --- Cascading Logic --- if "contact" in event_low and not person_id: logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.") try: persons = so_client.search(f"Person?$select=PersonId&$filter=contact/contactId eq {contact_id}") if persons: q = JobQueue() for p in persons: p_id = p.get("PersonId") if p_id: logger.info(f"Cascading: Enqueueing job for Person {p_id}") q.add_job("person.changed", {"PersonId": p_id, "ContactId": contact_id, "Source": "Cascade"}) except Exception as e: logger.warning(f"Failed to cascade to persons for contact {contact_id}: {e}") # 1b. Fetch full contact details for 'Double Truth' check (Master Data Sync) crm_name = None crm_website = None crm_industry_name = None contact_details = None try: contact_details = so_client.get_contact( contact_id, select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr"] ) if not contact_details: raise ValueError(f"Contact {contact_id} not found (API returned None)") crm_name = contact_details.get("Name") crm_website = contact_details.get("UrlAddress") if not crm_website and "Urls" in contact_details and contact_details["Urls"]: crm_website = contact_details["Urls"][0].get("Value") if settings.UDF_VERTICAL: udfs = contact_details.get("UserDefinedFields", {}) so_vertical_val = udfs.get(settings.UDF_VERTICAL) if so_vertical_val: val_str = str(so_vertical_val) if val_str.startswith("[I:"): val_str = val_str.split(":")[1].strip("]") try: vertical_map = json.loads(settings.VERTICAL_MAP_JSON) vertical_map_rev = {str(v): k for k, v in vertical_map.items()} if val_str in vertical_map_rev: crm_industry_name = vertical_map_rev[val_str] logger.info(f"Detected CRM Vertical Override: {so_vertical_val} -> {crm_industry_name}") except Exception as ex: logger.error(f"Error mapping vertical ID {val_str}: {ex}") except Exception as e: logger.error(f"Failed to fetch contact details for {contact_id}: {e}") # Critical failure: Without contact details, we cannot provision correctly. # Raising exception triggers a retry. raise Exception(f"SuperOffice API Failure: {e}") # --- 3. Company Explorer Provisioning --- ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" ce_req = { "so_contact_id": contact_id, "so_person_id": person_id, "job_title": job_title, "crm_name": crm_name, "crm_website": crm_website, "crm_industry_name": crm_industry_name } ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini")) try: resp = requests.post(ce_url, json=ce_req, auth=ce_auth) if resp.status_code == 404: logger.warning(f"Company Explorer returned 404. Retrying later.") return "RETRY" resp.raise_for_status() provisioning_data = resp.json() if provisioning_data.get("status") == "processing": logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.") return "RETRY" except requests.exceptions.RequestException as e: raise Exception(f"Company Explorer API failed: {e}") logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}") # Fetch fresh Contact Data for comparison try: contact_data = so_client.get_contact(contact_id) if not contact_data: logger.error(f"Contact {contact_id} not found in SuperOffice.") return "FAILED" except Exception as e: logger.error(f"Failed to fetch contact {contact_id}: {e}") return "RETRY" dirty_standard = False dirty_udfs = False if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {} # --- A. Vertical Sync --- vertical_name = provisioning_data.get("vertical_name") if vertical_name: try: vertical_map = json.loads(settings.VERTICAL_MAP_JSON) vertical_id = vertical_map.get(vertical_name) if vertical_id: udf_key = settings.UDF_VERTICAL current_val = contact_data["UserDefinedFields"].get(udf_key, "") if current_val and str(current_val).startswith("[I:"): current_val = str(current_val).split(":")[1].strip("]") if str(current_val) != str(vertical_id): logger.info(f"Change detected: Vertical {current_val} -> {vertical_id}") contact_data["UserDefinedFields"][udf_key] = str(vertical_id) dirty_udfs = True except Exception as e: logger.error(f"Vertical sync error: {e}") # --- B. Address & VAT Sync --- ce_city = provisioning_data.get("address_city") ce_street = provisioning_data.get("address_street") ce_zip = provisioning_data.get("address_zip") ce_vat = provisioning_data.get("vat_id") if ce_city or ce_street or ce_zip: if "Address" not in contact_data or contact_data["Address"] is None: contact_data["Address"] = {"Street": {}, "Postal": {}} addr_obj = contact_data["Address"] if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {} if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {} def update_addr_field(field_name, new_val, log_name): nonlocal dirty_standard if new_val: for type_key in ["Postal", "Street"]: cur = addr_obj[type_key].get(field_name, "") if cur != new_val: logger.info(f"Change detected: {type_key} {log_name} '{cur}' -> '{new_val}'") addr_obj[type_key][field_name] = new_val dirty_standard = True update_addr_field("City", ce_city, "City") update_addr_field("Address1", ce_street, "Street") update_addr_field("Zipcode", ce_zip, "Zip") if ce_vat: current_vat = contact_data.get("OrgNr", "") if current_vat != ce_vat: logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'") contact_data["OrgNr"] = ce_vat dirty_standard = True # --- C. AI Openers Sync --- ce_opener = provisioning_data.get("opener") ce_opener_secondary = provisioning_data.get("opener_secondary") if ce_opener and ce_opener != "null": current_opener = contact_data["UserDefinedFields"].get(settings.UDF_OPENER, "") if current_opener != ce_opener: logger.info("Change detected: Primary Opener") contact_data["UserDefinedFields"][settings.UDF_OPENER] = ce_opener dirty_udfs = True if ce_opener_secondary and ce_opener_secondary != "null": current_opener_sec = contact_data["UserDefinedFields"].get(settings.UDF_OPENER_SECONDARY, "") if current_opener_sec != ce_opener_secondary: logger.info("Change detected: Secondary Opener") contact_data["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary dirty_udfs = True # --- D. Apply Updates (Single Transaction) --- if dirty_standard or dirty_udfs: logger.info(f"Pushing combined updates for Contact {contact_id}...") try: so_client._put(f"Contact/{contact_id}", contact_data) logger.info("✅ Contact Update Successful.") except Exception as e: logger.error(f"Failed to update Contact {contact_id}: {e}") raise # 2d. Sync Person Position (Role) - if Person exists role_name = provisioning_data.get("role_name") if person_id and role_name: try: persona_map = json.loads(settings.PERSONA_MAP_JSON) position_id = persona_map.get(role_name) if position_id: logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}") so_client.update_person_position(person_id, int(position_id)) except Exception as e: logger.error(f"Error syncing position for Person {person_id}: {e}") # 3. Update SuperOffice Texts (Only if person_id is present) if not person_id: logger.info("Sync complete (Company only).") return "SUCCESS" texts = provisioning_data.get("texts", {}) if not any(texts.values()): logger.info("No texts returned from Matrix yet.") return "SUCCESS" udf_update = {} if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"] if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"] if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"] if udf_update: try: current_person = so_client.get_person(person_id) current_udfs = current_person.get("UserDefinedFields", {}) needs_update = False for key, new_val in udf_update.items(): if current_udfs.get(key, "") != new_val: needs_update = True break # Simulation Trigger: Either texts changed, OR it's a direct manual trigger if needs_update or (person_id and not "Source" in payload): if needs_update: logger.info(f"Applying text update to Person {person_id}.") so_client.update_entity_udfs(person_id, "Person", udf_update) else: logger.info(f"Texts already in sync for Person {person_id}, but triggering simulation.") # --- 4. Create Email Simulation Appointment --- try: opener = provisioning_data.get("opener", "") intro = texts.get("intro", "") proof = texts.get("social_proof", "") subject = texts.get("subject", "No Subject") salutation = "Hallo" p_data = so_client.get_person(person_id) if p_data: fname = p_data.get("Firstname", "") lname = p_data.get("Lastname", "") if fname or lname: salutation = f"Hallo {fname} {lname}".strip() cta = ( "H\u00e4tten Sie am kommenden Mittwoch gegen 11 Uhr kurz Zeit, f\u00fcr einen kurzen Austausch hierzu?\n" "Gerne k\u00f6nnen Sie auch einen alternativen Termin in meinem Kalender buchen. (bookings Link)" ) email_body = ( f"{salutation},\n\n" f"{opener.strip()}\n\n" f"{intro.strip()}\n\n" f"{cta.strip()}\n\n" f"{proof.strip()}\n\n" "(Generated via Gemini Marketing Engine)" ) from datetime import datetime now_str = datetime.now().strftime("%H:%M") appt_title = f"[{now_str}] KI: {subject}" so_client.create_appointment( subject=appt_title, description=email_body, contact_id=contact_id, person_id=person_id ) except Exception as e: logger.error(f"Failed to create email simulation appointment: {e}") else: logger.info(f"Skipping update for Person {person_id}: Values match.") except Exception as e: logger.error(f"Error during Person update: {e}") raise logger.info("Job successfully processed.") return "SUCCESS" def run_worker(): queue = JobQueue() so_client = None while not so_client: try: so_client = SuperOfficeClient() if not so_client.access_token: raise Exception("Auth failed") except Exception as e: logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...") time.sleep(30) logger.info("Worker started. Polling queue...") while True: try: job = queue.get_next_job() if job: try: result = process_job(job, so_client) if result == "RETRY": queue.retry_job_later(job['id'], delay_seconds=120, error_msg="CE is processing...") elif result == "FAILED": queue.fail_job(job['id'], "Job failed with FAILED status") else: queue.complete_job(job['id']) except Exception as e: logger.error(f"Job {job['id']} failed: {e}", exc_info=True) queue.fail_job(job['id'], str(e)) else: time.sleep(POLL_INTERVAL) except Exception as e: logger.error(f"Worker loop error: {e}") time.sleep(POLL_INTERVAL) if __name__ == "__main__": run_worker()