410 lines
17 KiB
Python
410 lines
17 KiB
Python
import time
|
|
import logging
|
|
import os
|
|
import requests
|
|
import json
|
|
from queue_manager import JobQueue
|
|
from superoffice_client import SuperOfficeClient
|
|
from config import settings
|
|
|
|
# Setup Logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger("connector-worker")
|
|
|
|
# Poll Interval
|
|
POLL_INTERVAL = 5 # Seconds
|
|
|
|
def process_job(job, so_client: SuperOfficeClient):
|
|
"""
|
|
Core logic for processing a single job.
|
|
"""
|
|
logger.info(f"Processing Job {job['id']} ({job['event_type']})")
|
|
payload = job['payload']
|
|
event_low = job['event_type'].lower()
|
|
|
|
# 1. Extract IDs Early (Crucial for logging and logic)
|
|
person_id = None
|
|
contact_id = None
|
|
job_title = payload.get("JobTitle")
|
|
|
|
if "PersonId" in payload:
|
|
person_id = int(payload["PersonId"])
|
|
elif "PrimaryKey" in payload and "person" in event_low:
|
|
person_id = int(payload["PrimaryKey"])
|
|
|
|
if "ContactId" in payload:
|
|
contact_id = int(payload["ContactId"])
|
|
elif "PrimaryKey" in payload and "contact" in event_low:
|
|
contact_id = int(payload["PrimaryKey"])
|
|
|
|
# Fallback/Deep Lookup & Fetch JobTitle if missing
|
|
if person_id:
|
|
try:
|
|
person_details = so_client.get_person(person_id)
|
|
if person_details:
|
|
if not job_title:
|
|
job_title = person_details.get("JobTitle") or person_details.get("Title")
|
|
if not contact_id and "Contact" in person_details:
|
|
contact_id = person_details["Contact"].get("ContactId")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
|
|
|
|
# 2. Noise Reduction Logic
|
|
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
|
|
logger.info(f"Skipping irrelevant event type: {job['event_type']}")
|
|
return "SUCCESS"
|
|
|
|
changes = [c.lower() for c in payload.get("Changes", [])]
|
|
if changes:
|
|
relevant_contact = ["name", "department", "urladdress", "number1", "number2", "userdefinedfields"]
|
|
if settings.UDF_VERTICAL:
|
|
relevant_contact.append(settings.UDF_VERTICAL.lower())
|
|
|
|
relevant_person = ["jobtitle", "position", "title", "userdefinedfields", "person_id"]
|
|
technical_fields = ["updated", "updated_associate_id", "contact_id", "person_id", "registered", "registered_associate_id"]
|
|
actual_changes = [c for c in changes if c not in technical_fields]
|
|
|
|
is_relevant = False
|
|
|
|
if "contact" in event_low:
|
|
logger.info(f"Checking relevance for Contact {contact_id or 'Unknown'}. Changes: {actual_changes}")
|
|
if any(f in actual_changes for f in relevant_contact):
|
|
is_relevant = True
|
|
elif "urls" in actual_changes:
|
|
is_relevant = True
|
|
|
|
if "person" in event_low:
|
|
logger.info(f"Checking relevance for Person {person_id or 'Unknown'}. Changes: {actual_changes}")
|
|
if any(f in actual_changes for f in relevant_person):
|
|
is_relevant = True
|
|
|
|
if not is_relevant:
|
|
logger.info(f"Skipping technical/irrelevant changes: {changes}")
|
|
return "SUCCESS"
|
|
else:
|
|
logger.info("Change is deemed RELEVANT. Proceeding...")
|
|
|
|
if not contact_id:
|
|
raise ValueError(f"Could not identify ContactId in payload: {payload}")
|
|
|
|
logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}")
|
|
|
|
# --- Cascading Logic ---
|
|
if "contact" in event_low and not person_id:
|
|
logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.")
|
|
try:
|
|
persons = so_client.search(f"Person?$filter=contact/contactId eq {contact_id}")
|
|
if persons:
|
|
q = JobQueue()
|
|
for p in persons:
|
|
p_id = p.get("PersonId")
|
|
if p_id:
|
|
logger.info(f"Cascading: Enqueueing job for Person {p_id}")
|
|
q.add_job("person.changed", {"PersonId": p_id, "ContactId": contact_id, "Source": "Cascade"})
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cascade to persons for contact {contact_id}: {e}")
|
|
|
|
# 1b. Fetch full contact details for 'Double Truth' check (Master Data Sync)
|
|
crm_name = None
|
|
crm_website = None
|
|
crm_industry_name = None
|
|
contact_details = None
|
|
|
|
try:
|
|
contact_details = so_client.get_contact(contact_id)
|
|
if not contact_details:
|
|
raise ValueError(f"Contact {contact_id} not found (API returned None)")
|
|
|
|
crm_name = contact_details.get("Name")
|
|
crm_website = contact_details.get("UrlAddress")
|
|
if not crm_website and "Urls" in contact_details and contact_details["Urls"]:
|
|
crm_website = contact_details["Urls"][0].get("Value")
|
|
|
|
if settings.UDF_VERTICAL:
|
|
udfs = contact_details.get("UserDefinedFields", {})
|
|
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
|
|
|
|
if so_vertical_val:
|
|
val_str = str(so_vertical_val)
|
|
if val_str.startswith("[I:"):
|
|
val_str = val_str.split(":")[1].strip("]")
|
|
|
|
try:
|
|
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
|
|
vertical_map_rev = {str(v): k for k, v in vertical_map.items()}
|
|
if val_str in vertical_map_rev:
|
|
crm_industry_name = vertical_map_rev[val_str]
|
|
logger.info(f"Detected CRM Vertical Override: {so_vertical_val} -> {crm_industry_name}")
|
|
except Exception as ex:
|
|
logger.error(f"Error mapping vertical ID {val_str}: {ex}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch contact details for {contact_id}: {e}")
|
|
# Critical failure: Without contact details, we cannot provision correctly.
|
|
# Raising exception triggers a retry.
|
|
raise Exception(f"SuperOffice API Failure: {e}")
|
|
|
|
# --- 3. Company Explorer Provisioning ---
|
|
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
|
|
ce_req = {
|
|
"so_contact_id": contact_id,
|
|
"so_person_id": person_id,
|
|
"job_title": job_title,
|
|
"crm_name": crm_name,
|
|
"crm_website": crm_website,
|
|
"crm_industry_name": crm_industry_name
|
|
}
|
|
|
|
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
|
|
|
|
try:
|
|
resp = requests.post(ce_url, json=ce_req, auth=ce_auth)
|
|
if resp.status_code == 404:
|
|
logger.warning(f"Company Explorer returned 404. Retrying later.")
|
|
return "RETRY"
|
|
|
|
resp.raise_for_status()
|
|
provisioning_data = resp.json()
|
|
|
|
if provisioning_data.get("status") == "processing":
|
|
logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.")
|
|
return "RETRY"
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
raise Exception(f"Company Explorer API failed: {e}")
|
|
|
|
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
|
|
|
|
# Fetch fresh Contact Data for comparison
|
|
try:
|
|
contact_data = so_client.get_contact(contact_id)
|
|
if not contact_data:
|
|
logger.error(f"Contact {contact_id} not found in SuperOffice.")
|
|
return "FAILED"
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch contact {contact_id}: {e}")
|
|
return "RETRY"
|
|
|
|
dirty_standard = False
|
|
dirty_udfs = False
|
|
|
|
if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {}
|
|
|
|
# --- A. Vertical Sync ---
|
|
vertical_name = provisioning_data.get("vertical_name")
|
|
if vertical_name:
|
|
try:
|
|
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
|
|
vertical_id = vertical_map.get(vertical_name)
|
|
if vertical_id:
|
|
udf_key = settings.UDF_VERTICAL
|
|
current_val = contact_data["UserDefinedFields"].get(udf_key, "")
|
|
if current_val and str(current_val).startswith("[I:"):
|
|
current_val = str(current_val).split(":")[1].strip("]")
|
|
|
|
if str(current_val) != str(vertical_id):
|
|
logger.info(f"Change detected: Vertical {current_val} -> {vertical_id}")
|
|
contact_data["UserDefinedFields"][udf_key] = str(vertical_id)
|
|
dirty_udfs = True
|
|
except Exception as e:
|
|
logger.error(f"Vertical sync error: {e}")
|
|
|
|
# --- B. Address & VAT Sync ---
|
|
ce_city = provisioning_data.get("address_city")
|
|
ce_street = provisioning_data.get("address_street")
|
|
ce_zip = provisioning_data.get("address_zip")
|
|
ce_vat = provisioning_data.get("vat_id")
|
|
|
|
if ce_city or ce_street or ce_zip:
|
|
if "Address" not in contact_data or contact_data["Address"] is None:
|
|
contact_data["Address"] = {"Street": {}, "Postal": {}}
|
|
|
|
addr_obj = contact_data["Address"]
|
|
if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {}
|
|
if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {}
|
|
|
|
def update_addr_field(field_name, new_val, log_name):
|
|
nonlocal dirty_standard
|
|
if new_val:
|
|
for type_key in ["Postal", "Street"]:
|
|
cur = addr_obj[type_key].get(field_name, "")
|
|
if cur != new_val:
|
|
logger.info(f"Change detected: {type_key} {log_name} '{cur}' -> '{new_val}'")
|
|
addr_obj[type_key][field_name] = new_val
|
|
dirty_standard = True
|
|
|
|
update_addr_field("City", ce_city, "City")
|
|
update_addr_field("Address1", ce_street, "Street")
|
|
update_addr_field("Zipcode", ce_zip, "Zip")
|
|
|
|
if ce_vat:
|
|
current_vat = contact_data.get("OrgNr", "")
|
|
if current_vat != ce_vat:
|
|
logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'")
|
|
contact_data["OrgNr"] = ce_vat
|
|
dirty_standard = True
|
|
|
|
# --- C. AI Openers Sync ---
|
|
ce_opener = provisioning_data.get("opener")
|
|
ce_opener_secondary = provisioning_data.get("opener_secondary")
|
|
|
|
if ce_opener and ce_opener != "null":
|
|
current_opener = contact_data["UserDefinedFields"].get(settings.UDF_OPENER, "")
|
|
if current_opener != ce_opener:
|
|
logger.info("Change detected: Primary Opener")
|
|
contact_data["UserDefinedFields"][settings.UDF_OPENER] = ce_opener
|
|
dirty_udfs = True
|
|
|
|
if ce_opener_secondary and ce_opener_secondary != "null":
|
|
current_opener_sec = contact_data["UserDefinedFields"].get(settings.UDF_OPENER_SECONDARY, "")
|
|
if current_opener_sec != ce_opener_secondary:
|
|
logger.info("Change detected: Secondary Opener")
|
|
contact_data["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
|
|
dirty_udfs = True
|
|
|
|
# --- D. Apply Updates (Single Transaction) ---
|
|
if dirty_standard or dirty_udfs:
|
|
logger.info(f"Pushing combined updates for Contact {contact_id}...")
|
|
try:
|
|
so_client._put(f"Contact/{contact_id}", contact_data)
|
|
logger.info("✅ Contact Update Successful.")
|
|
except Exception as e:
|
|
logger.error(f"Failed to update Contact {contact_id}: {e}")
|
|
raise
|
|
|
|
# 2d. Sync Person Position (Role) - if Person exists
|
|
role_name = provisioning_data.get("role_name")
|
|
if person_id and role_name:
|
|
try:
|
|
persona_map = json.loads(settings.PERSONA_MAP_JSON)
|
|
position_id = persona_map.get(role_name)
|
|
if position_id:
|
|
logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}")
|
|
so_client.update_person_position(person_id, int(position_id))
|
|
except Exception as e:
|
|
logger.error(f"Error syncing position for Person {person_id}: {e}")
|
|
|
|
# 3. Update SuperOffice Texts (Only if person_id is present)
|
|
if not person_id:
|
|
logger.info("Sync complete (Company only).")
|
|
return "SUCCESS"
|
|
|
|
texts = provisioning_data.get("texts", {})
|
|
if not any(texts.values()):
|
|
logger.info("No texts returned from Matrix yet.")
|
|
return "SUCCESS"
|
|
|
|
udf_update = {}
|
|
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
|
|
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
|
|
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
|
|
|
|
if udf_update:
|
|
try:
|
|
current_person = so_client.get_person(person_id)
|
|
current_udfs = current_person.get("UserDefinedFields", {})
|
|
needs_update = False
|
|
for key, new_val in udf_update.items():
|
|
if current_udfs.get(key, "") != new_val:
|
|
needs_update = True
|
|
break
|
|
|
|
# Simulation Trigger: Either texts changed, OR it's a direct manual trigger
|
|
if needs_update or (person_id and not "Source" in payload):
|
|
if needs_update:
|
|
logger.info(f"Applying text update to Person {person_id}.")
|
|
so_client.update_entity_udfs(person_id, "Person", udf_update)
|
|
else:
|
|
logger.info(f"Texts already in sync for Person {person_id}, but triggering simulation.")
|
|
|
|
# --- 4. Create Email Simulation Appointment ---
|
|
try:
|
|
opener = provisioning_data.get("opener", "")
|
|
intro = texts.get("intro", "")
|
|
proof = texts.get("social_proof", "")
|
|
subject = texts.get("subject", "No Subject")
|
|
|
|
salutation = "Hallo"
|
|
p_data = so_client.get_person(person_id)
|
|
if p_data:
|
|
fname = p_data.get("Firstname", "")
|
|
lname = p_data.get("Lastname", "")
|
|
if fname or lname:
|
|
salutation = f"Hallo {fname} {lname}".strip()
|
|
|
|
cta = (
|
|
"H\u00e4tten Sie am kommenden Mittwoch gegen 11 Uhr kurz Zeit, f\u00fcr einen kurzen Austausch hierzu?\n"
|
|
"Gerne k\u00f6nnen Sie auch einen alternativen Termin in meinem Kalender buchen. (bookings Link)"
|
|
)
|
|
|
|
email_body = (
|
|
f"{salutation},\n\n"
|
|
f"{opener.strip()}\n\n"
|
|
f"{intro.strip()}\n\n"
|
|
f"{cta.strip()}\n\n"
|
|
f"{proof.strip()}\n\n"
|
|
"(Generated via Gemini Marketing Engine)"
|
|
)
|
|
|
|
from datetime import datetime
|
|
now_str = datetime.now().strftime("%H:%M")
|
|
appt_title = f"[{now_str}] KI: {subject}"
|
|
|
|
so_client.create_appointment(
|
|
subject=appt_title,
|
|
description=email_body,
|
|
contact_id=contact_id,
|
|
person_id=person_id
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create email simulation appointment: {e}")
|
|
|
|
else:
|
|
logger.info(f"Skipping update for Person {person_id}: Values match.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during Person update: {e}")
|
|
raise
|
|
|
|
logger.info("Job successfully processed.")
|
|
return "SUCCESS"
|
|
|
|
def run_worker():
|
|
queue = JobQueue()
|
|
so_client = None
|
|
while not so_client:
|
|
try:
|
|
so_client = SuperOfficeClient()
|
|
if not so_client.access_token: raise Exception("Auth failed")
|
|
except Exception as e:
|
|
logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...")
|
|
time.sleep(30)
|
|
|
|
logger.info("Worker started. Polling queue...")
|
|
while True:
|
|
try:
|
|
job = queue.get_next_job()
|
|
if job:
|
|
try:
|
|
result = process_job(job, so_client)
|
|
if result == "RETRY":
|
|
queue.retry_job_later(job['id'], delay_seconds=120, error_msg="CE is processing...")
|
|
elif result == "FAILED":
|
|
queue.fail_job(job['id'], "Job failed with FAILED status")
|
|
else:
|
|
queue.complete_job(job['id'])
|
|
except Exception as e:
|
|
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
|
|
queue.fail_job(job['id'], str(e))
|
|
else:
|
|
time.sleep(POLL_INTERVAL)
|
|
except Exception as e:
|
|
logger.error(f"Worker loop error: {e}")
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
if __name__ == "__main__":
|
|
run_worker()
|