353 lines
16 KiB
Python
353 lines
16 KiB
Python
import time
|
|
import logging
|
|
import os
|
|
import requests
|
|
import json
|
|
from datetime import datetime
|
|
from queue_manager import JobQueue
|
|
from superoffice_client import SuperOfficeClient, ContactNotFoundException, SuperOfficeAuthenticationError
|
|
from config import settings
|
|
|
|
# Setup Logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger("connector-worker")
|
|
|
|
# Poll Interval
|
|
POLL_INTERVAL = 5 # Seconds
|
|
|
|
def safe_get_udfs(entity_data):
|
|
"""
|
|
Safely retrieves UserDefinedFields from an entity dictionary.
|
|
Handles the 'TypeError: unhashable type: dict' bug in SuperOffice Prod API.
|
|
"""
|
|
if not entity_data: return {}
|
|
try:
|
|
return entity_data.get("UserDefinedFields", {})
|
|
except TypeError:
|
|
logger.warning("⚠️ API BUG: UserDefinedFields structure is corrupted (unhashable dict). Treating as empty.")
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"Error reading UDFs: {e}")
|
|
return {}
|
|
|
|
def clean_text_for_so(text, limit=200):
|
|
"""Clean and truncate text for SuperOffice UDF compatibility."""
|
|
if not text or text == "null": return ""
|
|
# Strip whitespace and truncate to safe limit
|
|
return str(text).strip()[:limit]
|
|
|
|
def run_worker():
|
|
queue = JobQueue()
|
|
so_client = None
|
|
self_associate_id = None
|
|
|
|
while not so_client:
|
|
try:
|
|
so_client = SuperOfficeClient()
|
|
if not so_client.access_token: raise Exception("Auth failed")
|
|
|
|
# Dynamic ID Fetch for Echo Prevention
|
|
try:
|
|
me = so_client._get("Associate/Me")
|
|
if me:
|
|
self_associate_id = me.get("AssociateId")
|
|
logger.info(f"✅ Worker Identity Confirmed: Associate ID {self_associate_id}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not fetch own Associate ID: {e}. Echo prevention might be limited to ID 528.")
|
|
self_associate_id = 528 # Fallback
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Failed to initialize SO Client. Retrying in 30s...")
|
|
time.sleep(30)
|
|
|
|
logger.info("Worker started. Polling queue...")
|
|
while True:
|
|
try:
|
|
job = queue.get_next_job()
|
|
if job:
|
|
try:
|
|
# Pass self_id to process_job
|
|
job['self_associate_id'] = self_associate_id
|
|
|
|
status, msg = process_job(job, so_client, queue)
|
|
|
|
if status == "RETRY":
|
|
queue.retry_job_later(job['id'], job['retry_count'], delay_seconds=120, error_msg=msg)
|
|
elif status == "FAILED":
|
|
queue.fail_job(job['id'], msg or "Job failed status")
|
|
elif status == "SKIPPED":
|
|
queue.skip_job(job['id'], msg or "Skipped")
|
|
elif status == "DELETED":
|
|
queue.mark_as_deleted(job['id'], msg or "Deleted in SuperOffice")
|
|
else:
|
|
queue.complete_job(job['id'])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
|
|
queue.fail_job(job['id'], str(e))
|
|
else:
|
|
time.sleep(POLL_INTERVAL)
|
|
except Exception as e:
|
|
logger.error(f"Worker loop error: {e}")
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
def process_job(job, so_client: SuperOfficeClient, queue: JobQueue):
|
|
"""
|
|
Core logic for processing a single job.
|
|
Returns: (STATUS, MESSAGE)
|
|
STATUS: 'SUCCESS', 'SKIPPED', 'DELETED', 'RETRY', 'FAILED'
|
|
"""
|
|
logger.info(f"--- [WORKER v2.1.1 - FULL LOGIC RESTORED] Processing Job {job['id']} ({job['event_type']}) ---")
|
|
payload = job['payload']
|
|
event_low = job['event_type'].lower()
|
|
|
|
# --- Strict Whitelist Filter ---
|
|
# A job is only processed if the webhook indicates a change to a relevant field.
|
|
# This is the primary defense against noise and echoes.
|
|
|
|
changes = [c.lower() for c in payload.get("Changes", [])]
|
|
|
|
# Define fields that are significant enough to trigger a full re-assessment.
|
|
trigger_fields = ["name", "urladdress", "urls"]
|
|
|
|
# The ProgID for the Vertical UDF. A change to this field MUST trigger a resync.
|
|
vertical_udf_key = settings.UDF_VERTICAL.lower() if settings.UDF_VERTICAL else "superoffice:83"
|
|
|
|
# Check if any of the trigger fields or the specific vertical UDF is in the changes list.
|
|
has_trigger_field = any(f in changes for f in trigger_fields) or vertical_udf_key in changes
|
|
|
|
# For '...created' events, we always process. For all other events, we check the whitelist.
|
|
if "created" not in event_low and not has_trigger_field:
|
|
msg = f"⏭️ Skipping event '{event_low}': No relevant field changes in {changes}."
|
|
logger.info(msg)
|
|
return ("SKIPPED", msg)
|
|
|
|
# 0. ID Extraction & Early Exit for irrelevant jobs
|
|
person_id = None
|
|
contact_id = None
|
|
job_title = payload.get("JobTitle")
|
|
|
|
field_values = payload.get("FieldValues", {})
|
|
if "person_id" in field_values:
|
|
person_id = int(field_values["person_id"])
|
|
if "contact_id" in field_values:
|
|
contact_id = int(field_values["contact_id"])
|
|
if "title" in field_values and not job_title:
|
|
job_title = field_values["title"]
|
|
|
|
if not person_id:
|
|
if "PersonId" in payload:
|
|
person_id = int(payload["PersonId"])
|
|
elif "PrimaryKey" in payload and "person" in event_low:
|
|
person_id = int(payload["PrimaryKey"])
|
|
|
|
if not contact_id:
|
|
if "ContactId" in payload:
|
|
contact_id = int(payload["ContactId"])
|
|
elif "PrimaryKey" in payload and "contact" in event_low:
|
|
contact_id = int(payload["PrimaryKey"])
|
|
|
|
if not person_id and not contact_id:
|
|
msg = f"Skipping job: No ContactId or PersonId identified."
|
|
logger.warning(msg)
|
|
return ("SKIPPED", msg)
|
|
|
|
# Fallback Lookup
|
|
if person_id and (not job_title or not contact_id):
|
|
try:
|
|
person_details = so_client.get_person(person_id, select=["JobTitle", "Title", "Contact/ContactId"])
|
|
if person_details:
|
|
if not job_title: job_title = person_details.get("JobTitle") or person_details.get("Title")
|
|
if not contact_id:
|
|
contact_obj = person_details.get("Contact")
|
|
if contact_obj and isinstance(contact_obj, dict): contact_id = contact_obj.get("ContactId")
|
|
except ContactNotFoundException:
|
|
return ("DELETED", f"Person {person_id} not found.")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
|
|
|
|
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
|
|
return ("SKIPPED", f"Irrelevant event type: {job['event_type']}")
|
|
|
|
if not contact_id: raise ValueError(f"No ContactId found.")
|
|
|
|
logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}")
|
|
|
|
# 1b. Fetch full contact details
|
|
crm_name, crm_website, crm_industry_name, contact_details, campaign_tag = None, None, None, None, None
|
|
|
|
try:
|
|
contact_details = so_client.get_contact(contact_id, select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr", "Associate"])
|
|
crm_name = contact_details.get("Name", "Unknown")
|
|
assoc = contact_details.get("Associate") or {}
|
|
aname = assoc.get("Name", "").upper().strip()
|
|
queue.update_entity_name(job['id'], crm_name, associate_name=aname)
|
|
|
|
# --- DE-DUPLICATION SHIELD (Added March 2026) ---
|
|
if "contact.created" in event_low:
|
|
if queue.check_for_recent_duplicate(crm_name, job['id']):
|
|
msg = f"Duplicate 'contact.created' event for '{crm_name}'. This job will be skipped."
|
|
logger.info(f"🛡️ {msg}")
|
|
return ("SKIPPED", msg)
|
|
|
|
# ROBOPLANET FILTER
|
|
is_robo = False
|
|
if aname in settings.ROBOPLANET_WHITELIST:
|
|
is_robo = True
|
|
else:
|
|
try:
|
|
aid = assoc.get("AssociateId")
|
|
if aid and int(aid) in settings.ROBOPLANET_WHITELIST:
|
|
is_robo = True
|
|
except (ValueError, TypeError): pass
|
|
|
|
if not is_robo:
|
|
msg = f"Skipped, Wackler. Contact {contact_id} ('{crm_name}'): Owner '{aname}' is not in Roboplanet whitelist."
|
|
logger.info(f"⏭️ {msg}")
|
|
return ("SKIPPED", msg)
|
|
|
|
crm_website = contact_details.get("UrlAddress")
|
|
|
|
# Campaign Tag
|
|
if person_id:
|
|
try:
|
|
person_details = so_client.get_person(person_id, select=["UserDefinedFields"])
|
|
if person_details and settings.UDF_CAMPAIGN:
|
|
udfs = safe_get_udfs(person_details)
|
|
campaign_tag = udfs.get(f"{settings.UDF_CAMPAIGN}:DisplayText") or udfs.get(settings.UDF_CAMPAIGN)
|
|
except Exception: pass
|
|
|
|
# Current Vertical
|
|
if settings.UDF_VERTICAL:
|
|
udfs = safe_get_udfs(contact_details)
|
|
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
|
|
if so_vertical_val:
|
|
val_str = str(so_vertical_val).replace("[I:" ,"").replace("]","")
|
|
try:
|
|
v_map = json.loads(settings.VERTICAL_MAP_JSON)
|
|
crm_industry_name = {str(v): k for k, v in v_map.items()}.get(val_str)
|
|
except Exception: pass
|
|
|
|
except ContactNotFoundException:
|
|
return ("DELETED", f"Contact {contact_id} not found.")
|
|
except Exception as e:
|
|
raise Exception(f"SuperOffice API Failure: {e}")
|
|
|
|
# --- 3. Company Explorer Provisioning ---
|
|
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
|
|
ce_req = {"so_contact_id": contact_id, "so_person_id": person_id, "job_title": job_title, "crm_name": crm_name, "crm_website": crm_website, "crm_industry_name": crm_industry_name, "campaign_tag": campaign_tag}
|
|
|
|
try:
|
|
auth_tuple = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
|
|
resp = requests.post(ce_url, json=ce_req, auth=auth_tuple)
|
|
if resp.status_code == 404: return ("RETRY", "CE 404")
|
|
resp.raise_for_status()
|
|
provisioning_data = resp.json()
|
|
if provisioning_data.get("status") == "processing": return ("RETRY", "CE processing")
|
|
except Exception as e:
|
|
raise Exception(f"Company Explorer API failed: {e}")
|
|
|
|
# Fetch fresh Contact for comparison
|
|
contact_data = so_client.get_contact(contact_id)
|
|
if not contact_data: return ("SKIPPED", "Contact deleted post-analysis")
|
|
current_udfs = safe_get_udfs(contact_data)
|
|
contact_patch = {}
|
|
|
|
# --- A. Vertical Sync ---
|
|
vertical_name = provisioning_data.get("vertical_name")
|
|
if vertical_name:
|
|
v_map = json.loads(settings.VERTICAL_MAP_JSON)
|
|
vertical_id = v_map.get(vertical_name)
|
|
if vertical_id:
|
|
current_val = str(current_udfs.get(settings.UDF_VERTICAL, "")).replace("[I:" ,"").replace("]","")
|
|
if current_val != str(vertical_id):
|
|
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_VERTICAL] = str(vertical_id)
|
|
|
|
# --- B. Address & VAT Sync ---
|
|
ce_city, ce_street, ce_zip, ce_vat = provisioning_data.get("address_city"), provisioning_data.get("address_street"), provisioning_data.get("address_zip"), provisioning_data.get("vat_id")
|
|
if ce_city or ce_street or ce_zip:
|
|
for type_key in ["Postal", "Street"]:
|
|
cur_addr = (contact_data.get("Address") or {}).get(type_key, {})
|
|
if ce_city and cur_addr.get("City") != ce_city: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["City"] = ce_city
|
|
if ce_street and cur_addr.get("Address1") != ce_street: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Address1"] = ce_street
|
|
if ce_zip and cur_addr.get("Zipcode") != ce_zip: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Zipcode"] = ce_zip
|
|
|
|
if ce_vat and contact_data.get("OrgNr") != ce_vat:
|
|
contact_patch["OrgNr"] = ce_vat
|
|
|
|
# --- C. AI Openers & Summary Sync ---
|
|
p_opener = clean_text_for_so(provisioning_data.get("opener"), 200)
|
|
s_opener = clean_text_for_so(provisioning_data.get("opener_secondary"), 200)
|
|
ai_sum = clean_text_for_so(provisioning_data.get("summary"), 132)
|
|
|
|
if p_opener and current_udfs.get(settings.UDF_OPENER) != p_opener:
|
|
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER] = p_opener
|
|
|
|
if s_opener and current_udfs.get(settings.UDF_OPENER_SECONDARY) != s_opener:
|
|
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER_SECONDARY] = s_opener
|
|
|
|
if ai_sum and current_udfs.get(settings.UDF_SUMMARY) != ai_sum:
|
|
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_SUMMARY] = ai_sum
|
|
|
|
# --- D. Timestamps ---
|
|
if settings.UDF_LAST_UPDATE and contact_patch:
|
|
now_so = f"[D:{datetime.now().strftime('%m/%d/%Y %H:%M:%S')}]"
|
|
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_LAST_UPDATE] = now_so
|
|
|
|
if ce_website := provisioning_data.get("website"):
|
|
current_urls = contact_data.get("Urls") or []
|
|
if not any(u.get("Value") == ce_website for u in current_urls):
|
|
contact_patch.setdefault("Urls", []).append({"Value": ce_website, "Description": "AI Discovered"})
|
|
|
|
if contact_patch:
|
|
logger.info(f"🚀 Pushing combined PATCH for Contact {contact_id}: {list(contact_patch.get('UserDefinedFields', {}).keys())}")
|
|
so_client.patch_contact(contact_id, contact_patch)
|
|
else:
|
|
logger.info(f"✅ No changes detected for Contact {contact_id}.")
|
|
|
|
# 2d. Sync Person Position
|
|
role_name = provisioning_data.get("role_name")
|
|
if person_id and role_name:
|
|
try:
|
|
persona_map = json.loads(settings.PERSONA_MAP_JSON)
|
|
position_id = persona_map.get(role_name)
|
|
if position_id:
|
|
so_client.update_person_position(person_id, int(position_id))
|
|
except Exception as e:
|
|
logger.error(f"Error syncing position: {e}")
|
|
|
|
# 3. Update SuperOffice Texts (Person)
|
|
if person_id:
|
|
texts = provisioning_data.get("texts", {})
|
|
unsubscribe_link = provisioning_data.get("unsubscribe_link")
|
|
|
|
udf_update = {}
|
|
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
|
|
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
|
|
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
|
|
if unsubscribe_link and settings.UDF_UNSUBSCRIBE_LINK:
|
|
udf_update[settings.UDF_UNSUBSCRIBE_LINK] = unsubscribe_link
|
|
|
|
if udf_update:
|
|
logger.info(f"Applying text update to Person {person_id}.")
|
|
so_client.update_entity_udfs(person_id, "Person", udf_update)
|
|
|
|
# --- 4. Create Email Simulation Appointment ---
|
|
try:
|
|
opener = provisioning_data.get("opener") or ""
|
|
intro = texts.get("intro") or ""
|
|
proof = texts.get("social_proof") or ""
|
|
subject = texts.get("subject", "No Subject")
|
|
email_body = f"Betreff: {subject}\n\n{opener}\n\n{intro}\n\n{proof}\n\n(Generated via Gemini Marketing Engine)"
|
|
so_client.create_appointment(f"KI: {subject}", email_body, contact_id, person_id)
|
|
except Exception as e:
|
|
logger.error(f"Failed simulation: {e}")
|
|
|
|
return ("SUCCESS", "Processing complete")
|
|
|
|
if __name__ == "__main__":
|
|
run_worker()
|