Files
Brancheneinstufung2/connector-superoffice/worker.py
Floke b41b6c38b8 Enhance: Address/VAT Sync & Infrastructure Hardening [30e88f42]
- Implemented Address (City) and VAT (OrgNumber) sync back to SuperOffice.
- Hardened Infrastructure: Removed Pydantic dependency in config for better Docker compatibility.
- Improved SuperOffice Client error logging and handled empty SO_ENVIRONMENT variables.
- Updated Matrix Generator: Switched to gemini-2.0-flash, added industry filtering, and robust JSON parsing.
- Updated Documentation with session findings and troubleshooting steps.
2026-02-21 21:26:57 +00:00

390 lines
16 KiB
Python

import time
import logging
import os
import requests
import json
from queue_manager import JobQueue
from superoffice_client import SuperOfficeClient
from config import settings
# Setup Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("connector-worker")
# Poll Interval
POLL_INTERVAL = 5 # Seconds
def process_job(job, so_client: SuperOfficeClient):
"""
Core logic for processing a single job.
"""
logger.info(f"Processing Job {job['id']} ({job['event_type']})")
payload = job['payload']
event_low = job['event_type'].lower()
# 0. Fast-Fail on Irrelevant Events (Noise Reduction)
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
logger.info(f"Skipping irrelevant event type: {job['event_type']}")
return "SUCCESS"
# 0b. Fast-Fail on Irrelevant Field Changes
# Only if 'Changes' list is provided by Webhook
changes = [c.lower() for c in payload.get("Changes", [])]
if changes:
# Define what we care about (Strategic triggers for re-evaluation)
# Company: Name/Department (Identity), Urls (Source), Numbers (Matching)
relevant_contact = ["name", "department", "urladdress", "number1", "number2"]
# Add Vertical UDF to relevant changes
if settings.UDF_VERTICAL:
relevant_contact.append(settings.UDF_VERTICAL.lower())
# Also catch generic "userdefinedfields" if specific key not present
relevant_contact.append("userdefinedfields")
# Person: JobTitle (Persona Logic), Position (Role Logic)
relevant_person = ["jobtitle", "position"]
is_relevant = False
if "contact" in event_low:
if any(f in changes for f in relevant_contact):
is_relevant = True
elif "urls" in changes: # Website might be in Urls collection
is_relevant = True
if "person" in event_low:
if any(f in changes for f in relevant_person):
is_relevant = True
if not is_relevant:
logger.info(f"Skipping irrelevant changes: {changes}")
return "SUCCESS"
# 1. Extract IDs from Webhook Payload
person_id = None
contact_id = None
if "PersonId" in payload:
person_id = int(payload["PersonId"])
elif "PrimaryKey" in payload and "person" in event_low:
person_id = int(payload["PrimaryKey"])
if "ContactId" in payload:
contact_id = int(payload["ContactId"])
elif "PrimaryKey" in payload and "contact" in event_low:
contact_id = int(payload["PrimaryKey"])
# Fallback/Deep Lookup
if not contact_id and person_id:
person_data = so_client.get_person(person_id)
if person_data and "Contact" in person_data:
contact_id = person_data["Contact"].get("ContactId")
if not contact_id:
raise ValueError(f"Could not identify ContactId in payload: {payload}")
logger.info(f"Target: Person {person_id}, Contact {contact_id}")
# --- Cascading Logic ---
# If a company changes, we want to update all its persons eventually.
if "contact" in event_low and not person_id:
logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.")
try:
persons = so_client.search(f"Person?$filter=contact/contactId eq {contact_id}")
if persons:
q = JobQueue()
for p in persons:
p_id = p.get("PersonId")
if p_id:
logger.info(f"Cascading: Enqueueing job for Person {p_id}")
q.add_job("person.changed", {"PersonId": p_id, "ContactId": contact_id, "Source": "Cascade"})
except Exception as e:
logger.warning(f"Failed to cascade to persons for contact {contact_id}: {e}")
# 1b. Fetch full contact details for 'Double Truth' check (Master Data Sync)
crm_name = None
crm_website = None
crm_industry_name = None
contact_details = None
try:
contact_details = so_client.get_contact(contact_id)
if contact_details:
crm_name = contact_details.get("Name")
crm_website = contact_details.get("UrlAddress")
if not crm_website and "Urls" in contact_details and contact_details["Urls"]:
crm_website = contact_details["Urls"][0].get("Value")
# Extract Vertical (if set in SO)
if settings.UDF_VERTICAL:
udfs = contact_details.get("UserDefinedFields", {})
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
if so_vertical_val:
# Normalize "[I:23]" -> "23"
val_str = str(so_vertical_val)
if val_str.startswith("[I:"):
val_str = val_str.split(":")[1].strip("]")
# Reverse Map ID -> Name
try:
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
vertical_map_rev = {str(v): k for k, v in vertical_map.items()}
if val_str in vertical_map_rev:
crm_industry_name = vertical_map_rev[val_str]
logger.info(f"Detected CRM Vertical Override: {so_vertical_val} -> {crm_industry_name}")
except Exception as ex:
logger.error(f"Error mapping vertical ID {val_str}: {ex}")
except Exception as e:
logger.warning(f"Failed to fetch contact details for {contact_id}: {e}")
# 2. Call Company Explorer Provisioning API
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_req = {
"so_contact_id": contact_id,
"so_person_id": person_id,
"job_title": payload.get("JobTitle"),
"crm_name": crm_name,
"crm_website": crm_website,
"crm_industry_name": crm_industry_name
}
# Simple Basic Auth for internal API
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
try:
resp = requests.post(ce_url, json=ce_req, auth=ce_auth)
if resp.status_code == 404:
logger.warning(f"Company Explorer returned 404. Retrying later.")
return "RETRY"
resp.raise_for_status()
provisioning_data = resp.json()
if provisioning_data.get("status") == "processing":
logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.")
return "RETRY"
except requests.exceptions.RequestException as e:
raise Exception(f"Company Explorer API failed: {e}")
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
# 2b. Sync Vertical to SuperOffice (Company Level)
vertical_name = provisioning_data.get("vertical_name")
if vertical_name:
try:
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
except:
vertical_map = {}
logger.error("Failed to parse VERTICAL_MAP_JSON from config.")
vertical_id = vertical_map.get(vertical_name)
if vertical_id:
logger.info(f"Identified Vertical '{vertical_name}' -> ID {vertical_id}")
try:
# Check current value to avoid loops
current_contact = so_client.get_contact(contact_id)
current_udfs = current_contact.get("UserDefinedFields", {})
# Use Config UDF key
udf_key = settings.UDF_VERTICAL
current_val = current_udfs.get(udf_key, "")
# Normalize SO list ID format (e.g., "[I:26]" -> "26")
if current_val and current_val.startswith("[I:"):
current_val = current_val.split(":")[1].strip("]")
if str(current_val) != str(vertical_id):
logger.info(f"Updating Contact {contact_id} Vertical: {current_val} -> {vertical_id}")
so_client.update_entity_udfs(contact_id, "Contact", {udf_key: str(vertical_id)})
else:
logger.info(f"Vertical for Contact {contact_id} already in sync ({vertical_id}).")
except Exception as e:
logger.error(f"Failed to sync vertical for Contact {contact_id}: {e}")
else:
logger.warning(f"Vertical '{vertical_name}' not found in internal mapping.")
# 2b.2 Sync Address & VAT (Standard Fields)
# Check if we have address data to sync
ce_city = provisioning_data.get("address_city")
ce_country = provisioning_data.get("address_country") # Assuming 'DE' code or similar
ce_vat = provisioning_data.get("vat_id")
if ce_city or ce_vat:
try:
# Re-fetch contact to be safe (or use cached if optimal)
contact_data = so_client.get_contact(contact_id)
changed = False
# City (PostalAddress)
if ce_city:
# SuperOffice Address structure is complex. Simplified check on PostalAddress.
# Address: { "PostalAddress": { "City": "..." } }
current_city = contact_data.get("PostalAddress", {}).get("City", "")
if current_city != ce_city:
if "PostalAddress" not in contact_data: contact_data["PostalAddress"] = {}
contact_data["PostalAddress"]["City"] = ce_city
changed = True
logger.info(f"Updating City: {current_city} -> {ce_city}")
# VAT (OrgNumber)
if ce_vat:
current_vat = contact_data.get("OrgNumber", "")
if current_vat != ce_vat:
contact_data["OrgNumber"] = ce_vat
changed = True
logger.info(f"Updating VAT: {current_vat} -> {ce_vat}")
if changed:
logger.info(f"Pushing standard field updates for Contact {contact_id}...")
so_client._put(f"Contact/{contact_id}", contact_data)
except Exception as e:
logger.error(f"Failed to sync Address/VAT for Contact {contact_id}: {e}")
# 2c. Sync Website (Company Level)
# TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue)
# Re-enable via config if needed
if settings.ENABLE_WEBSITE_SYNC:
website = provisioning_data.get("website")
if website and website != "k.A.":
try:
contact_data = so_client.get_contact(contact_id)
current_url = contact_data.get("UrlAddress", "")
def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else ""
if norm(current_url) != norm(website):
logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}")
# Update Urls collection (Rank 1)
new_urls = []
if "Urls" in contact_data:
found = False
for u in contact_data["Urls"]:
if u.get("Rank") == 1:
u["Value"] = website
found = True
new_urls.append(u)
if not found:
new_urls.append({"Value": website, "Rank": 1, "Description": "Website"})
contact_data["Urls"] = new_urls
else:
contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}]
if not current_url:
contact_data["UrlAddress"] = website
so_client._put(f"Contact/{contact_id}", contact_data)
else:
logger.info(f"Website for Contact {contact_id} already in sync.")
except Exception as e:
logger.error(f"Failed to sync website for Contact {contact_id}: {e}")
# 2d. Sync Person Position (Role) - if Person exists
role_name = provisioning_data.get("role_name")
if person_id and role_name:
try:
persona_map = json.loads(settings.PERSONA_MAP_JSON)
except:
persona_map = {}
logger.error("Failed to parse PERSONA_MAP_JSON from config.")
position_id = persona_map.get(role_name)
if position_id:
logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}")
try:
success = so_client.update_person_position(person_id, int(position_id))
if not success:
logger.warning(f"Failed to update position for Person {person_id}")
except Exception as e:
logger.error(f"Error syncing position for Person {person_id}: {e}")
else:
logger.info(f"Role '{role_name}' has no mapped Position ID in config. Skipping update.")
# 3. Update SuperOffice Texts (Only if person_id is present)
if not person_id:
logger.info("Sync complete (Company only). No texts to write back.")
return "SUCCESS"
texts = provisioning_data.get("texts", {})
if not any(texts.values()):
logger.info("No texts returned from Matrix (yet). Skipping write-back.")
return "SUCCESS"
udf_update = {}
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
if udf_update:
# Loop Prevention
try:
current_person = so_client.get_person(person_id)
current_udfs = current_person.get("UserDefinedFields", {})
needs_update = False
for key, new_val in udf_update.items():
if current_udfs.get(key, "") != new_val:
needs_update = True
break
if needs_update:
logger.info(f"Applying update to Person {person_id} (Changes detected).")
success = so_client.update_entity_udfs(person_id, "Person", udf_update)
if not success:
raise Exception("Failed to update SuperOffice UDFs")
else:
logger.info(f"Skipping update for Person {person_id}: Values match (Loop Prevention).")
except Exception as e:
logger.error(f"Error during pre-update check: {e}")
raise
logger.info("Job successfully processed.")
return "SUCCESS"
def run_worker():
queue = JobQueue()
# Initialize SO Client with retry
so_client = None
while not so_client:
try:
so_client = SuperOfficeClient()
if not so_client.access_token: # Check if auth worked
raise Exception("Auth failed")
except Exception as e:
logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...")
time.sleep(30)
logger.info("Worker started. Polling queue...")
while True:
try:
job = queue.get_next_job()
if job:
try:
result = process_job(job, so_client)
if result == "RETRY":
queue.retry_job_later(job['id'], delay_seconds=120)
else:
queue.complete_job(job['id'])
except Exception as e:
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
queue.fail_job(job['id'], str(e))
else:
time.sleep(POLL_INTERVAL)
except Exception as e:
logger.error(f"Worker loop error: {e}")
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
run_worker()