Files
Brancheneinstufung2/connector-superoffice/worker.py
Floke d7fca780c2 fix: [30388f42] Worker v1.9.10 - Präzisiere Personen-Filter für Stammdaten
- Erweitert den Filter für  Events auf Änderungen in , ,  oder .
- Schließt die Lücke, die  Events in eine Endlosschleife schicken konnte.
- Stellt sicher, dass nur wirklich relevante Personen-Updates eine Verarbeitung triggern.
2026-03-06 18:16:15 +00:00

507 lines
16 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import logging
import os
import requests
import json
from datetime import datetime
from queue_manager import JobQueue
from superoffice_client import SuperOfficeClient, ContactNotFoundException
from config import settings
# Setup Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("connector-worker")
# Poll Interval
POLL_INTERVAL = 5 # Seconds
def safe_get_udfs(entity_data):
"""
Safely retrieves UserDefinedFields from an entity dictionary.
Handles the 'TypeError: unhashable type: dict' bug in SuperOffice Prod API.
"""
if not entity_data: return {}
try:
return entity_data.get("UserDefinedFields", {})
except TypeError:
logger.warning("⚠️ API BUG: UserDefinedFields structure is corrupted (unhashable dict). Treating as empty.")
return {}
except Exception as e:
logger.error(f"Error reading UDFs: {e}")
return {}
def clean_text_for_so(text, limit=200):
"""Clean and truncate text for SuperOffice UDF compatibility."""
if not text or text == "null": return ""
# Strip whitespace and truncate to safe limit
return str(text).strip()[:limit]
def process_job(job, so_client: SuperOfficeClient, queue: JobQueue):
"""
Core logic for processing a single job.
Returns: (STATUS, MESSAGE)
STATUS: 'SUCCESS', 'SKIPPED', 'DELETED', 'RETRY', 'FAILED'
"""
logger.info(f"--- [WORKER v1.9.9 - ABSOLUTE FILTERS] Processing Job {job['id']} ({job['event_type']}) ---")
payload = job['payload']
event_low = job['event_type'].lower()
# --- NOISE REDUCTION: STRICT STAMMDATEN FILTER (v1.9.9) ---
# We ONLY react to changes in critical fields.
# For .created events, we also check if name/urladdress are explicitly in the payload changes.
changes = [c.lower() for c in payload.get("Changes", [])]
# RULE: If it's a person event, we ONLY care if name, email, or jobtitle changed (for role mapping).
# If it's a contact event, we ONLY care if name or website changed.
if "person" in event_low:
if "name" not in changes and "email" not in changes and "jobtitle" not in changes and "contact_id" not in changes:
msg = f"Skipping person event: No relevant changes (Name/Email/JobTitle/Mapping) in {changes}."
logger.info(f"⏭️ {msg}")
return ("SKIPPED", msg)
elif "contact" in event_low:
if "name" not in changes and "urladdress" not in changes:
msg = f"Skipping contact event: No relevant changes (Name/Website) in {changes}."
logger.info(f"⏭️ {msg}")
return ("SKIPPED", msg)
# ---------------------------------------------------------
# --- CIRCUIT BREAKER: DETECT ECHOES ---
changed_by = payload.get("ChangedByAssociateId")
if changed_by == 528:
logger.info(f" Potential Echo: Event triggered by Associate 528. Proceeding to check for meaningful changes.")
# --------------------------------------------
# 0. ID Extraction & Early Exit for irrelevant jobs
person_id = None
contact_id = None
job_title = payload.get("JobTitle")
field_values = payload.get("FieldValues", {})
if "person_id" in field_values:
person_id = int(field_values["person_id"])
if "contact_id" in field_values:
contact_id = int(field_values["contact_id"])
if "title" in field_values and not job_title:
job_title = field_values["title"]
if not person_id:
if "PersonId" in payload:
person_id = int(payload["PersonId"])
elif "PrimaryKey" in payload and "person" in event_low:
person_id = int(payload["PrimaryKey"])
if not contact_id:
if "ContactId" in payload:
contact_id = int(payload["ContactId"])
elif "PrimaryKey" in payload and "contact" in event_low:
contact_id = int(payload["PrimaryKey"])
if not person_id and not contact_id:
msg = f"Skipping job: No ContactId or PersonId identified."
logger.warning(msg)
return ("SKIPPED", msg)
# Fallback Lookup
if person_id and (not job_title or not contact_id):
try:
person_details = so_client.get_person(person_id, select=["JobTitle", "Title", "Contact/ContactId"])
if person_details:
if not job_title: job_title = person_details.get("JobTitle") or person_details.get("Title")
if not contact_id:
contact_obj = person_details.get("Contact")
if contact_obj and isinstance(contact_obj, dict): contact_id = contact_obj.get("ContactId")
except ContactNotFoundException:
return ("DELETED", f"Person {person_id} not found.")
except Exception as e:
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
return ("SKIPPED", f"Irrelevant event type: {job['event_type']}")
if not contact_id: raise ValueError(f"No ContactId found.")
logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}")
# 1b. Fetch full contact details
crm_name, crm_website, crm_industry_name, contact_details, campaign_tag = None, None, None, None, None
try:
contact_details = so_client.get_contact(contact_id, select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr", "Associate"])
crm_name = contact_details.get("Name", "Unknown")
assoc = contact_details.get("Associate") or {}
aname = assoc.get("Name", "").upper().strip()
queue.update_entity_name(job['id'], crm_name, associate_name=aname)
if aname not in settings.ROBOPLANET_WHITELIST and assoc.get("AssociateId") not in settings.ROBOPLANET_WHITELIST:
return ("SKIPPED", f"Owner '{aname}' not in Roboplanet whitelist.")
crm_website = contact_details.get("UrlAddress")
# Campaign Tag
if person_id:
try:
person_details = so_client.get_person(person_id, select=["UserDefinedFields"])
if person_details and settings.UDF_CAMPAIGN:
udfs = safe_get_udfs(person_details)
campaign_tag = udfs.get(f"{settings.UDF_CAMPAIGN}:DisplayText") or udfs.get(settings.UDF_CAMPAIGN)
except Exception: pass
# Current Vertical
if settings.UDF_VERTICAL:
udfs = safe_get_udfs(contact_details)
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
if so_vertical_val:
val_str = str(so_vertical_val).replace("[I:","").replace("]","")
try:
v_map = json.loads(settings.VERTICAL_MAP_JSON)
crm_industry_name = {str(v): k for k, v in v_map.items()}.get(val_str)
except Exception: pass
except ContactNotFoundException:
return ("DELETED", f"Contact {contact_id} not found.")
except Exception as e:
raise Exception(f"SuperOffice API Failure: {e}")
# --- 3. Company Explorer Provisioning ---
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_req = {"so_contact_id": contact_id, "so_person_id": person_id, "job_title": job_title, "crm_name": crm_name, "crm_website": crm_website, "crm_industry_name": crm_industry_name, "campaign_tag": campaign_tag}
try:
resp = requests.post(ce_url, json=ce_req, auth=(os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini")))
if resp.status_code == 404: return ("RETRY", "CE 404")
resp.raise_for_status()
provisioning_data = resp.json()
if provisioning_data.get("status") == "processing": return ("RETRY", "CE processing")
except Exception as e:
raise Exception(f"Company Explorer API failed: {e}")
# Fetch fresh Contact for comparison
contact_data = so_client.get_contact(contact_id)
if not contact_data: return ("SKIPPED", "Contact deleted post-analysis")
current_udfs = safe_get_udfs(contact_data)
contact_patch = {}
# --- A. Vertical Sync ---
vertical_name = provisioning_data.get("vertical_name")
if vertical_name:
v_map = json.loads(settings.VERTICAL_MAP_JSON)
vertical_id = v_map.get(vertical_name)
if vertical_id:
current_val = str(current_udfs.get(settings.UDF_VERTICAL, "")).replace("[I:","").replace("]","")
logger.info(f"Checking Vertical: CRM='{current_val}' vs AI='{vertical_id}'")
if current_val != str(vertical_id):
logger.info(f" -> UPDATE: Vertical")
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_VERTICAL] = str(vertical_id)
# --- B. Address & VAT Sync ---
ce_city, ce_street, ce_zip, ce_vat = provisioning_data.get("address_city"), provisioning_data.get("address_street"), provisioning_data.get("address_zip"), provisioning_data.get("vat_id")
if ce_city or ce_street or ce_zip:
for type_key in ["Postal", "Street"]:
cur_addr = (contact_data.get("Address") or {}).get(type_key, {})
logger.info(f"Checking Address ({type_key}): CRM='{cur_addr.get('City')}' vs AI='{ce_city}'")
if ce_city and cur_addr.get("City") != ce_city: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["City"] = ce_city
if ce_street and cur_addr.get("Address1") != ce_street: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Address1"] = ce_street
if ce_zip and cur_addr.get("Zipcode") != ce_zip: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Zipcode"] = ce_zip
if ce_vat and contact_data.get("OrgNr") != ce_vat:
logger.info(f"Checking VAT: CRM='{contact_data.get('OrgNr')}' vs AI='{ce_vat}'")
contact_patch["OrgNr"] = ce_vat
# --- C. AI Openers & Summary Sync ---
p_opener = clean_text_for_so(provisioning_data.get("opener"), 200)
s_opener = clean_text_for_so(provisioning_data.get("opener_secondary"), 200)
ai_sum = clean_text_for_so(provisioning_data.get("summary"), 132)
logger.info(f"Checking Opener Primary: CRM='{current_udfs.get(settings.UDF_OPENER)}' vs AI='{p_opener}'")
if p_opener and current_udfs.get(settings.UDF_OPENER) != p_opener:
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER] = p_opener
logger.info(f"Checking Opener Secondary: CRM='{current_udfs.get(settings.UDF_OPENER_SECONDARY)}' vs AI='{s_opener}'")
if s_opener and current_udfs.get(settings.UDF_OPENER_SECONDARY) != s_opener:
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_OPENER_SECONDARY] = s_opener
logger.info(f"Checking AI Summary: CRM='{current_udfs.get(settings.UDF_SUMMARY)}' vs AI='{ai_sum}'")
if ai_sum and current_udfs.get(settings.UDF_SUMMARY) != ai_sum:
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_SUMMARY] = ai_sum
# --- D. Timestamps ---
if settings.UDF_LAST_UPDATE and contact_patch:
now_so = f"[D:{datetime.now().strftime('%m/%d/%Y %H:%M:%S')}]"
contact_patch.setdefault("UserDefinedFields", {})[settings.UDF_LAST_UPDATE] = now_so
if ce_website := provisioning_data.get("website"):
current_urls = contact_data.get("Urls") or []
if not any(u.get("Value") == ce_website for u in current_urls):
contact_patch.setdefault("Urls", []).append({"Value": ce_website, "Description": "AI Discovered"})
if contact_patch:
logger.info(f"🚀 Pushing combined PATCH for Contact {contact_id}: {list(contact_patch.get('UserDefinedFields', {}).keys())}")
so_client.patch_contact(contact_id, contact_patch)
return ("SUCCESS", "Contact updated")
logger.info(f"✅ No changes detected for Contact {contact_id}.")
return ("SUCCESS", "Idempotent - no changes needed")
# 2d. Sync Person Position
role_name = provisioning_data.get("role_name")
if person_id and role_name:
try:
persona_map = json.loads(settings.PERSONA_MAP_JSON)
position_id = persona_map.get(role_name)
if position_id:
so_client.update_person_position(person_id, int(position_id))
except Exception as e:
logger.error(f"Error syncing position: {e}")
# 3. Update SuperOffice Texts (Person)
if person_id:
texts = provisioning_data.get("texts", {})
unsubscribe_link = provisioning_data.get("unsubscribe_link")
udf_update = {}
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
if unsubscribe_link and settings.UDF_UNSUBSCRIBE_LINK:
udf_update[settings.UDF_UNSUBSCRIBE_LINK] = unsubscribe_link
if udf_update:
logger.info(f"Applying text update to Person {person_id}.")
so_client.update_entity_udfs(person_id, "Person", udf_update)
# --- 4. Create Email Simulation Appointment ---
try:
opener = provisioning_data.get("opener") or ""
intro = texts.get("intro") or ""
proof = texts.get("social_proof") or ""
subject = texts.get("subject", "No Subject")
email_body = f"Betreff: {subject}\n\n{opener}\n\n{intro}\n\n{proof}\n\n(Generated via Gemini Marketing Engine)"
so_client.create_appointment(f"KI: {subject}", email_body, contact_id, person_id)
except Exception as e:
logger.error(f"Failed simulation: {e}")
return ("SUCCESS", None)
def run_worker():
queue = JobQueue()
so_client = None
while not so_client:
try:
so_client = SuperOfficeClient()
if not so_client.access_token: raise Exception("Auth failed")
except Exception as e:
logger.critical(f"Failed to initialize SO Client. Retrying in 30s...")
time.sleep(30)
logger.info("Worker started. Polling queue...")
while True:
try:
job = queue.get_next_job()
if job:
try:
# process_job now takes (job, client, queue)
status, msg = process_job(job, so_client, queue)
if status == "RETRY":
queue.retry_job_later(job['id'], delay_seconds=120, error_msg=msg)
elif status == "FAILED":
queue.fail_job(job['id'], msg or "Job failed status")
elif status == "SKIPPED":
queue.skip_job(job['id'], msg or "Skipped")
elif status == "DELETED":
queue.mark_as_deleted(job['id'], msg or "Deleted in SuperOffice")
else:
queue.complete_job(job['id'])
except Exception as e:
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
queue.fail_job(job['id'], str(e))
else:
time.sleep(POLL_INTERVAL)
except Exception as e:
logger.error(f"Worker loop error: {e}")
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
run_worker()