386 lines
16 KiB
Python
386 lines
16 KiB
Python
import time
|
||
import logging
|
||
import os
|
||
import requests
|
||
import json
|
||
from datetime import datetime
|
||
from queue_manager import JobQueue
|
||
from superoffice_client import SuperOfficeClient
|
||
from config import settings
|
||
|
||
# Setup Logging
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger("connector-worker")
|
||
|
||
# Poll Interval
|
||
POLL_INTERVAL = 5 # Seconds
|
||
|
||
def safe_get_udfs(entity_data):
|
||
"""
|
||
Safely retrieves UserDefinedFields from an entity dictionary.
|
||
Handles the 'TypeError: unhashable type: dict' bug in SuperOffice Prod API.
|
||
"""
|
||
if not entity_data: return {}
|
||
try:
|
||
return entity_data.get("UserDefinedFields", {})
|
||
except TypeError:
|
||
logger.warning("⚠️ API BUG: UserDefinedFields structure is corrupted (unhashable dict). Treating as empty.")
|
||
return {}
|
||
except Exception as e:
|
||
logger.error(f"Error reading UDFs: {e}")
|
||
return {}
|
||
|
||
def process_job(job, so_client: SuperOfficeClient):
|
||
"""
|
||
Core logic for processing a single job.
|
||
Returns: (STATUS, MESSAGE)
|
||
STATUS: 'SUCCESS', 'SKIPPED', 'RETRY', 'FAILED'
|
||
"""
|
||
logger.info(f"--- [WORKER v1.8] Processing Job {job['id']} ({job['event_type']}) ---")
|
||
payload = job['payload']
|
||
event_low = job['event_type'].lower()
|
||
|
||
# 0. Noise Reduction: Filter irrelevant field changes
|
||
if job['event_type'] == 'contact.changed':
|
||
changes = payload.get('Changes', [])
|
||
changes_lower = [str(c).lower() for c in changes]
|
||
|
||
# Fields that trigger a re-analysis
|
||
relevant_fields = [
|
||
'name', 'urladdress', 'urls', 'orgnr', 'userdef_id', 'country_id'
|
||
]
|
||
|
||
# Identify which relevant field triggered the event
|
||
hit_fields = [f for f in relevant_fields if f in changes_lower]
|
||
|
||
if not hit_fields:
|
||
msg = f"Skipping 'contact.changed': No relevant fields affected. (Changes: {changes})"
|
||
logger.info(f"⏭️ {msg}")
|
||
return ("SKIPPED", msg)
|
||
else:
|
||
logger.info(f"🎯 Relevant change detected in fields: {hit_fields}")
|
||
|
||
if job['event_type'] == 'person.changed':
|
||
changes = payload.get('Changes', [])
|
||
changes_lower = [str(c).lower() for c in changes]
|
||
|
||
relevant_person_fields = [
|
||
'jobtitle', 'title', 'position_id', 'userdef_id'
|
||
]
|
||
|
||
hit_fields = [f for f in relevant_person_fields if f in changes_lower]
|
||
|
||
if not hit_fields:
|
||
msg = f"Skipping 'person.changed': No relevant fields affected. (Changes: {changes})"
|
||
logger.info(f"⏭️ {msg}")
|
||
return ("SKIPPED", msg)
|
||
else:
|
||
logger.info(f"🎯 Relevant change detected in fields: {hit_fields}")
|
||
|
||
# 1. Extract IDs Early
|
||
person_id = None
|
||
contact_id = None
|
||
job_title = payload.get("JobTitle")
|
||
|
||
field_values = payload.get("FieldValues", {})
|
||
if "person_id" in field_values:
|
||
person_id = int(field_values["person_id"])
|
||
if "contact_id" in field_values:
|
||
contact_id = int(field_values["contact_id"])
|
||
if "title" in field_values and not job_title:
|
||
job_title = field_values["title"]
|
||
|
||
if not person_id:
|
||
if "PersonId" in payload:
|
||
person_id = int(payload["PersonId"])
|
||
elif "PrimaryKey" in payload and "person" in event_low:
|
||
person_id = int(payload["PrimaryKey"])
|
||
|
||
if not contact_id:
|
||
if "ContactId" in payload:
|
||
contact_id = int(payload["ContactId"])
|
||
elif "PrimaryKey" in payload and "contact" in event_low:
|
||
contact_id = int(payload["PrimaryKey"])
|
||
|
||
# Fallback/Deep Lookup & Fetch JobTitle if missing
|
||
if person_id and (not job_title or not contact_id):
|
||
try:
|
||
person_details = so_client.get_person(
|
||
person_id,
|
||
select=["JobTitle", "Title", "Contact/ContactId", "FirstName", "LastName", "UserDefinedFields", "Position"]
|
||
)
|
||
if person_details:
|
||
if not job_title:
|
||
job_title = person_details.get("JobTitle") or person_details.get("Title")
|
||
if not contact_id:
|
||
contact_obj = person_details.get("Contact")
|
||
if contact_obj and isinstance(contact_obj, dict):
|
||
contact_id = contact_obj.get("ContactId")
|
||
elif "ContactId" in person_details:
|
||
contact_id = person_details.get("ContactId")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
|
||
|
||
# 2. Noise Reduction Logic (Event Type)
|
||
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
|
||
msg = f"Skipping irrelevant event type: {job['event_type']}"
|
||
logger.info(msg)
|
||
return ("SKIPPED", msg)
|
||
|
||
if not contact_id:
|
||
raise ValueError(f"Could not identify ContactId in payload: {payload}")
|
||
|
||
logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}")
|
||
|
||
# 1b. Fetch full contact details for 'Double Truth' check
|
||
crm_name = None
|
||
crm_website = None
|
||
crm_industry_name = None
|
||
contact_details = None
|
||
campaign_tag = None
|
||
|
||
try:
|
||
contact_details = so_client.get_contact(
|
||
contact_id,
|
||
select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr"]
|
||
)
|
||
if not contact_details:
|
||
raise ValueError(f"Contact {contact_id} not found (API returned None)")
|
||
|
||
crm_name = contact_details.get("Name")
|
||
crm_website = contact_details.get("UrlAddress")
|
||
|
||
# --- Fetch Person UDFs for Campaign Tag ---
|
||
if person_id:
|
||
try:
|
||
# We fetch the person again specifically for UDFs to ensure we get DisplayTexts
|
||
person_details = so_client.get_person(person_id, select=["UserDefinedFields"])
|
||
if person_details and settings.UDF_CAMPAIGN:
|
||
# SAFE GET
|
||
udfs = safe_get_udfs(person_details)
|
||
|
||
# SuperOffice REST returns DisplayText for lists as 'ProgID:DisplayText'
|
||
display_key = f"{settings.UDF_CAMPAIGN}:DisplayText"
|
||
campaign_tag = udfs.get(display_key)
|
||
|
||
if not campaign_tag:
|
||
# Fallback to manual resolution if DisplayText is missing
|
||
raw_tag = udfs.get(settings.UDF_CAMPAIGN, "")
|
||
if raw_tag:
|
||
campaign_tag = str(raw_tag).strip()
|
||
|
||
if campaign_tag:
|
||
logger.info(f"🎯 CAMPAIGN DETECTED: '{campaign_tag}'")
|
||
else:
|
||
logger.info("ℹ️ No Campaign Tag found (Field is empty).")
|
||
except Exception as e:
|
||
logger.warning(f"Could not fetch campaign tag: {e}")
|
||
|
||
if settings.UDF_VERTICAL:
|
||
# SAFE GET
|
||
udfs = safe_get_udfs(contact_details)
|
||
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
|
||
if so_vertical_val:
|
||
val_str = str(so_vertical_val).replace("[I:","").replace("]","")
|
||
try:
|
||
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
|
||
vertical_map_rev = {str(v): k for k, v in vertical_map.items()}
|
||
if val_str in vertical_map_rev:
|
||
crm_industry_name = vertical_map_rev[val_str]
|
||
except Exception as ex:
|
||
logger.error(f"Error mapping vertical ID {val_str}: {ex}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to fetch contact details for {contact_id}: {e}")
|
||
raise Exception(f"SuperOffice API Failure: {e}")
|
||
|
||
# --- 3. Company Explorer Provisioning ---
|
||
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
|
||
ce_req = {
|
||
"so_contact_id": contact_id,
|
||
"so_person_id": person_id,
|
||
"job_title": job_title,
|
||
"crm_name": crm_name,
|
||
"crm_website": crm_website,
|
||
"crm_industry_name": crm_industry_name,
|
||
"campaign_tag": campaign_tag
|
||
}
|
||
|
||
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
|
||
|
||
try:
|
||
resp = requests.post(ce_url, json=ce_req, auth=ce_auth)
|
||
if resp.status_code == 404:
|
||
return ("RETRY", "CE returned 404")
|
||
resp.raise_for_status()
|
||
provisioning_data = resp.json()
|
||
if provisioning_data.get("status") == "processing":
|
||
return ("RETRY", "CE is processing")
|
||
except requests.exceptions.RequestException as e:
|
||
raise Exception(f"Company Explorer API failed: {e}")
|
||
|
||
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
|
||
|
||
# Fetch fresh Contact Data for comparison
|
||
contact_data = so_client.get_contact(contact_id)
|
||
if not contact_data: return ("FAILED", "Could not fetch contact for patch")
|
||
|
||
# SAFE GET FOR COMPARISON
|
||
current_udfs = safe_get_udfs(contact_data)
|
||
|
||
contact_patch = {}
|
||
|
||
# --- A. Vertical Sync ---
|
||
vertical_name = provisioning_data.get("vertical_name")
|
||
if vertical_name:
|
||
try:
|
||
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
|
||
vertical_id = vertical_map.get(vertical_name)
|
||
if vertical_id:
|
||
udf_key = settings.UDF_VERTICAL
|
||
current_val = current_udfs.get(udf_key, "")
|
||
if str(current_val).replace("[I:","").replace("]","") != str(vertical_id):
|
||
logger.info(f"Change detected: Vertical -> {vertical_id}")
|
||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||
contact_patch["UserDefinedFields"][udf_key] = str(vertical_id)
|
||
except Exception as e:
|
||
logger.error(f"Vertical sync error: {e}")
|
||
|
||
# --- B. Address & VAT Sync ---
|
||
ce_city = provisioning_data.get("address_city")
|
||
ce_street = provisioning_data.get("address_street")
|
||
ce_zip = provisioning_data.get("address_zip")
|
||
ce_vat = provisioning_data.get("vat_id")
|
||
|
||
if ce_city or ce_street or ce_zip:
|
||
for type_key in ["Postal", "Street"]:
|
||
cur_addr = (contact_data.get("Address") or {}).get(type_key, {})
|
||
if ce_city and cur_addr.get("City") != ce_city: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["City"] = ce_city
|
||
if ce_street and cur_addr.get("Address1") != ce_street: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Address1"] = ce_street
|
||
if ce_zip and cur_addr.get("Zipcode") != ce_zip: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Zipcode"] = ce_zip
|
||
|
||
if ce_vat and contact_data.get("OrgNr") != ce_vat:
|
||
contact_patch["OrgNr"] = ce_vat
|
||
|
||
# --- C. AI Openers & Summary Sync ---
|
||
ce_opener = provisioning_data.get("opener")
|
||
ce_opener_secondary = provisioning_data.get("opener_secondary")
|
||
ce_summary = provisioning_data.get("summary")
|
||
|
||
if ce_opener and ce_opener != "null" and current_udfs.get(settings.UDF_OPENER) != ce_opener:
|
||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||
contact_patch["UserDefinedFields"][settings.UDF_OPENER] = ce_opener
|
||
if ce_opener_secondary and ce_opener_secondary != "null" and current_udfs.get(settings.UDF_OPENER_SECONDARY) != ce_opener_secondary:
|
||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||
contact_patch["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
|
||
|
||
if ce_summary and ce_summary != "null":
|
||
short_summary = (ce_summary[:132] + "...") if len(ce_summary) > 135 else ce_summary
|
||
if current_udfs.get(settings.UDF_SUMMARY) != short_summary:
|
||
logger.info("Change detected: AI Summary")
|
||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||
contact_patch["UserDefinedFields"][settings.UDF_SUMMARY] = short_summary
|
||
|
||
# --- D. Timestamps & Website Sync ---
|
||
if settings.UDF_LAST_UPDATE:
|
||
now_so = f"[D:{datetime.now().strftime('%m/%d/%Y %H:%M:%S')}]"
|
||
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
|
||
contact_patch["UserDefinedFields"][settings.UDF_LAST_UPDATE] = now_so
|
||
|
||
ce_website = provisioning_data.get("website")
|
||
if ce_website and (not contact_data.get("Urls") or settings.ENABLE_WEBSITE_SYNC):
|
||
current_urls = contact_data.get("Urls") or []
|
||
if not any(u.get("Value") == ce_website for u in current_urls):
|
||
logger.info(f"Syncing Website: {ce_website}")
|
||
if "Urls" not in contact_patch: contact_patch["Urls"] = []
|
||
contact_patch["Urls"] = [{"Value": ce_website, "Description": "AI Discovered"}] + current_urls
|
||
|
||
# --- E. Apply Updates (Single PATCH) ---
|
||
if contact_patch:
|
||
logger.info(f"Pushing combined PATCH for Contact {contact_id}: {list(contact_patch.keys())}")
|
||
so_client.patch_contact(contact_id, contact_patch)
|
||
logger.info("✅ Contact Update Successful.")
|
||
|
||
# 2d. Sync Person Position
|
||
role_name = provisioning_data.get("role_name")
|
||
if person_id and role_name:
|
||
try:
|
||
persona_map = json.loads(settings.PERSONA_MAP_JSON)
|
||
position_id = persona_map.get(role_name)
|
||
if position_id:
|
||
so_client.update_person_position(person_id, int(position_id))
|
||
except Exception as e:
|
||
logger.error(f"Error syncing position: {e}")
|
||
|
||
# 3. Update SuperOffice Texts (Person)
|
||
if person_id:
|
||
texts = provisioning_data.get("texts", {})
|
||
unsubscribe_link = provisioning_data.get("unsubscribe_link")
|
||
|
||
udf_update = {}
|
||
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
|
||
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
|
||
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
|
||
if unsubscribe_link and settings.UDF_UNSUBSCRIBE_LINK:
|
||
udf_update[settings.UDF_UNSUBSCRIBE_LINK] = unsubscribe_link
|
||
|
||
if udf_update:
|
||
logger.info(f"Applying text update to Person {person_id}.")
|
||
so_client.update_entity_udfs(person_id, "Person", udf_update)
|
||
|
||
# --- 4. Create Email Simulation Appointment ---
|
||
try:
|
||
opener = provisioning_data.get("opener") or ""
|
||
intro = texts.get("intro") or ""
|
||
proof = texts.get("social_proof") or ""
|
||
subject = texts.get("subject", "No Subject")
|
||
email_body = f"Betreff: {subject}\n\n{opener}\n\n{intro}\n\n{proof}\n\n(Generated via Gemini Marketing Engine)"
|
||
so_client.create_appointment(f"KI: {subject}", email_body, contact_id, person_id)
|
||
except Exception as e:
|
||
logger.error(f"Failed simulation: {e}")
|
||
|
||
return ("SUCCESS", None)
|
||
|
||
def run_worker():
|
||
queue = JobQueue()
|
||
so_client = None
|
||
while not so_client:
|
||
try:
|
||
so_client = SuperOfficeClient()
|
||
if not so_client.access_token: raise Exception("Auth failed")
|
||
except Exception as e:
|
||
logger.critical(f"Failed to initialize SO Client. Retrying in 30s...")
|
||
time.sleep(30)
|
||
|
||
logger.info("Worker started. Polling queue...")
|
||
while True:
|
||
try:
|
||
job = queue.get_next_job()
|
||
if job:
|
||
try:
|
||
# process_job now returns a tuple (STATUS, MESSAGE)
|
||
status, msg = process_job(job, so_client)
|
||
|
||
if status == "RETRY":
|
||
queue.retry_job_later(job['id'], delay_seconds=120, error_msg=msg)
|
||
elif status == "FAILED":
|
||
queue.fail_job(job['id'], msg or "Job failed status")
|
||
elif status == "SKIPPED":
|
||
queue.skip_job(job['id'], msg or "Skipped")
|
||
else:
|
||
queue.complete_job(job['id'])
|
||
|
||
except Exception as e:
|
||
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
|
||
queue.fail_job(job['id'], str(e))
|
||
else:
|
||
time.sleep(POLL_INTERVAL)
|
||
except Exception as e:
|
||
logger.error(f"Worker loop error: {e}")
|
||
time.sleep(POLL_INTERVAL)
|
||
|
||
if __name__ == "__main__":
|
||
run_worker() |