Files
Brancheneinstufung2/connector-superoffice/worker.py
Floke 735cd77b68 fix: [30388f42] Unterbreche Webhook-Endlosschleife
- Aktualisiert den Zeitstempel in SuperOffice nur noch dann, wenn auch andere inhaltliche Änderungen vorliegen.
- Dies verhindert, dass der Worker durch seine eigene Zeitstempel-Aktualisierung ständig neue Webhooks triggert (besonders relevant, da User und Bot die gleiche ID 528 teilen).
- Beruhigt das System und führt zu stabilen 'SUCCESS' Zuständen im Dashboard.
2026-03-06 14:33:53 +00:00

411 lines
18 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import logging
import os
import requests
import json
from datetime import datetime
from queue_manager import JobQueue
from superoffice_client import SuperOfficeClient, ContactNotFoundException
from config import settings
# Setup Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("connector-worker")
# Poll Interval
POLL_INTERVAL = 5 # Seconds
def safe_get_udfs(entity_data):
"""
Safely retrieves UserDefinedFields from an entity dictionary.
Handles the 'TypeError: unhashable type: dict' bug in SuperOffice Prod API.
"""
if not entity_data: return {}
try:
return entity_data.get("UserDefinedFields", {})
except TypeError:
logger.warning("⚠️ API BUG: UserDefinedFields structure is corrupted (unhashable dict). Treating as empty.")
return {}
except Exception as e:
logger.error(f"Error reading UDFs: {e}")
return {}
def process_job(job, so_client: SuperOfficeClient, queue: JobQueue):
"""
Core logic for processing a single job.
Returns: (STATUS, MESSAGE)
STATUS: 'SUCCESS', 'SKIPPED', 'DELETED', 'RETRY', 'FAILED'
"""
logger.info(f"--- [WORKER v1.9.3] Processing Job {job['id']} ({job['event_type']}) ---")
payload = job['payload']
event_low = job['event_type'].lower()
# --- CIRCUIT BREAKER: DETECT ECHOES ---
# We log if the event was triggered by our own API user (Associate 528)
# but we NO LONGER SKIP IT, to allow manual changes by the user who shares the same ID.
changed_by = payload.get("ChangedByAssociateId")
if changed_by == 528:
logger.info(f" Potential Echo: Event triggered by Associate 528. Proceeding to allow manual user updates.")
# --------------------------------------------
# 0. ID Extraction & Early Exit for irrelevant jobs
person_id = None
contact_id = None
job_title = payload.get("JobTitle")
field_values = payload.get("FieldValues", {})
if "person_id" in field_values:
person_id = int(field_values["person_id"])
if "contact_id" in field_values:
contact_id = int(field_values["contact_id"])
if "title" in field_values and not job_title:
job_title = field_values["title"]
if not person_id:
if "PersonId" in payload:
person_id = int(payload["PersonId"])
elif "PrimaryKey" in payload and "person" in event_low:
person_id = int(payload["PrimaryKey"])
if not contact_id:
if "ContactId" in payload:
contact_id = int(payload["ContactId"])
elif "PrimaryKey" in payload and "contact" in event_low:
contact_id = int(payload["PrimaryKey"])
# If after all checks, we have no ID, we can't process it.
if not person_id and not contact_id:
msg = f"Skipping job: No ContactId or PersonId could be identified in the payload."
logger.warning(msg)
return ("SKIPPED", msg)
# Fallback/Deep Lookup & Fetch JobTitle if missing
if person_id and (not job_title or not contact_id):
try:
person_details = so_client.get_person(
person_id,
select=["JobTitle", "Title", "Contact/ContactId", "FirstName", "LastName", "UserDefinedFields", "Position"]
)
if person_details:
if not job_title:
job_title = person_details.get("JobTitle") or person_details.get("Title")
if not contact_id:
contact_obj = person_details.get("Contact")
if contact_obj and isinstance(contact_obj, dict):
contact_id = contact_obj.get("ContactId")
elif "ContactId" in person_details:
contact_id = person_details.get("ContactId")
except ContactNotFoundException:
msg = f"Skipping job because Person ID {person_id} was not found in SuperOffice (likely deleted)."
logger.warning(msg)
return ("DELETED", msg)
except Exception as e:
logger.warning(f"Failed to fetch person details for {person_id}: {e}")
# 2. Noise Reduction Logic (Event Type)
if any(x in event_low for x in ["sale.", "project.", "appointment.", "document.", "selection."]):
msg = f"Skipping irrelevant event type: {job['event_type']}"
logger.info(msg)
return ("SKIPPED", msg)
if not contact_id:
raise ValueError(f"Could not identify ContactId in payload: {payload}")
logger.info(f"Target Identified -> Person: {person_id}, Contact: {contact_id}, JobTitle: {job_title}")
# 1b. Fetch full contact details for 'Double Truth' check
crm_name = None
crm_website = None
crm_industry_name = None
contact_details = None
campaign_tag = None
try:
# Request Associate details explicitly
contact_details = so_client.get_contact(
contact_id,
select=["Name", "UrlAddress", "Urls", "UserDefinedFields", "Address", "OrgNr", "Associate"]
)
crm_name = contact_details.get("Name", "Unknown")
# Safely get Associate object
assoc = contact_details.get("Associate") or {}
aid = assoc.get("AssociateId")
aname = assoc.get("Name", "").upper().strip() if assoc.get("Name") else ""
# PERSIST DETAILS TO DASHBOARD early
queue.update_entity_name(job['id'], crm_name, associate_name=aname)
# --- ROBOPLANET FILTER LOGIC ---
# Check both numerical ID and shortname
is_robo = False
if aname in settings.ROBOPLANET_WHITELIST:
is_robo = True
else:
try:
if aid and int(aid) in settings.ROBOPLANET_WHITELIST:
is_robo = True
except (ValueError, TypeError):
pass
if not is_robo:
msg = f"WACKLER FILTER: Contact {contact_id} ('{crm_name}') belongs to Associate '{aname}' (not in whitelist). Skipping."
logger.info(f"⏭️ {msg}")
return ("SKIPPED", msg)
logger.info(f"✅ Filter Passed: Contact '{crm_name}' belongs to Roboplanet Associate '{aname}'.")
# -------------------------------
crm_website = contact_details.get("UrlAddress")
# --- Fetch Person UDFs for Campaign Tag ---
if person_id:
try:
# We fetch the person again specifically for UDFs to ensure we get DisplayTexts
person_details = so_client.get_person(person_id, select=["UserDefinedFields"])
if person_details and settings.UDF_CAMPAIGN:
# SAFE GET
udfs = safe_get_udfs(person_details)
# SuperOffice REST returns DisplayText for lists as 'ProgID:DisplayText'
display_key = f"{settings.UDF_CAMPAIGN}:DisplayText"
campaign_tag = udfs.get(display_key)
if not campaign_tag:
# Fallback to manual resolution if DisplayText is missing
raw_tag = udfs.get(settings.UDF_CAMPAIGN, "")
if raw_tag:
campaign_tag = str(raw_tag).strip()
if campaign_tag:
logger.info(f"🎯 CAMPAIGN DETECTED: '{campaign_tag}'")
else:
logger.info(" No Campaign Tag found (Field is empty).")
except ContactNotFoundException:
# This is not critical, we can proceed without a campaign tag
logger.warning(f"Could not fetch campaign tag: Person {person_id} not found.")
except Exception as e:
logger.warning(f"Could not fetch campaign tag: {e}")
if settings.UDF_VERTICAL:
# SAFE GET
udfs = safe_get_udfs(contact_details)
so_vertical_val = udfs.get(settings.UDF_VERTICAL)
if so_vertical_val:
val_str = str(so_vertical_val).replace("[I:","").replace("]","")
try:
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
vertical_map_rev = {str(v): k for k, v in vertical_map.items()}
if val_str in vertical_map_rev:
crm_industry_name = vertical_map_rev[val_str]
except Exception as ex:
logger.error(f"Error mapping vertical ID {val_str}: {ex}")
except ContactNotFoundException:
msg = f"Skipping job because Contact ID {contact_id} was not found in SuperOffice (likely deleted)."
logger.warning(msg)
return ("DELETED", msg)
except Exception as e:
logger.error(f"Failed to fetch contact details for {contact_id}: {e}")
raise Exception(f"SuperOffice API Failure: {e}")
# --- 3. Company Explorer Provisioning ---
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_req = {
"so_contact_id": contact_id,
"so_person_id": person_id,
"job_title": job_title,
"crm_name": crm_name,
"crm_website": crm_website,
"crm_industry_name": crm_industry_name,
"campaign_tag": campaign_tag
}
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
try:
resp = requests.post(ce_url, json=ce_req, auth=ce_auth)
if resp.status_code == 404:
return ("RETRY", "CE returned 404")
resp.raise_for_status()
provisioning_data = resp.json()
if provisioning_data.get("status") == "processing":
return ("RETRY", "CE is processing")
except requests.exceptions.RequestException as e:
raise Exception(f"Company Explorer API failed: {e}")
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
# Fetch fresh Contact Data for comparison
contact_data = so_client.get_contact(contact_id)
if not contact_data:
# This can happen if the contact was deleted between the CE call and now
logger.warning(f"Could not re-fetch contact {contact_id} for patch (deleted?). Skipping patch.")
return ("SKIPPED", "Contact deleted post-analysis")
# SAFE GET FOR COMPARISON
current_udfs = safe_get_udfs(contact_data)
contact_patch = {}
# --- A. Vertical Sync ---
vertical_name = provisioning_data.get("vertical_name")
if vertical_name:
try:
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
vertical_id = vertical_map.get(vertical_name)
if vertical_id:
udf_key = settings.UDF_VERTICAL
current_val = current_udfs.get(udf_key, "")
if str(current_val).replace("[I:","").replace("]","") != str(vertical_id):
logger.info(f"Change detected: Vertical -> {vertical_id}")
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][udf_key] = str(vertical_id)
except Exception as e:
logger.error(f"Vertical sync error: {e}")
# --- B. Address & VAT Sync ---
ce_city = provisioning_data.get("address_city")
ce_street = provisioning_data.get("address_street")
ce_zip = provisioning_data.get("address_zip")
ce_vat = provisioning_data.get("vat_id")
if ce_city or ce_street or ce_zip:
for type_key in ["Postal", "Street"]:
cur_addr = (contact_data.get("Address") or {}).get(type_key, {})
if ce_city and cur_addr.get("City") != ce_city: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["City"] = ce_city
if ce_street and cur_addr.get("Address1") != ce_street: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Address1"] = ce_street
if ce_zip and cur_addr.get("Zipcode") != ce_zip: contact_patch.setdefault("Address", {}).setdefault(type_key, {})["Zipcode"] = ce_zip
if ce_vat and contact_data.get("OrgNr") != ce_vat:
contact_patch["OrgNr"] = ce_vat
# --- C. AI Openers & Summary Sync ---
ce_opener = provisioning_data.get("opener")
ce_opener_secondary = provisioning_data.get("opener_secondary")
ce_summary = provisioning_data.get("summary")
if ce_opener and ce_opener != "null" and current_udfs.get(settings.UDF_OPENER) != ce_opener:
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_OPENER] = ce_opener
if ce_opener_secondary and ce_opener_secondary != "null" and current_udfs.get(settings.UDF_OPENER_SECONDARY) != ce_opener_secondary:
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
if ce_summary and ce_summary != "null":
short_summary = (ce_summary[:132] + "...") if len(ce_summary) > 135 else ce_summary
if current_udfs.get(settings.UDF_SUMMARY) != short_summary:
logger.info("Change detected: AI Summary")
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_SUMMARY] = short_summary
# --- D. Timestamps & Website Sync ---
# CRITICAL: We only update the timestamp if we actually have OTHER changes to push.
# Otherwise, we create an infinite loop of self-triggering webhooks.
if settings.UDF_LAST_UPDATE and contact_patch:
now_so = f"[D:{datetime.now().strftime('%m/%d/%Y %H:%M:%S')}]"
if "UserDefinedFields" not in contact_patch: contact_patch["UserDefinedFields"] = {}
contact_patch["UserDefinedFields"][settings.UDF_LAST_UPDATE] = now_so
ce_website = provisioning_data.get("website")
if ce_website and (not contact_data.get("Urls") or settings.ENABLE_WEBSITE_SYNC):
current_urls = contact_data.get("Urls") or []
if not any(u.get("Value") == ce_website for u in current_urls):
logger.info(f"Syncing Website: {ce_website}")
if "Urls" not in contact_patch: contact_patch["Urls"] = []
contact_patch["Urls"] = [{"Value": ce_website, "Description": "AI Discovered"}] + current_urls
# --- E. Apply Updates (Single PATCH) ---
if contact_patch:
logger.info(f"Pushing combined PATCH for Contact {contact_id}: {list(contact_patch.keys())}")
so_client.patch_contact(contact_id, contact_patch)
logger.info("✅ Contact Update Successful.")
# 2d. Sync Person Position
role_name = provisioning_data.get("role_name")
if person_id and role_name:
try:
persona_map = json.loads(settings.PERSONA_MAP_JSON)
position_id = persona_map.get(role_name)
if position_id:
so_client.update_person_position(person_id, int(position_id))
except Exception as e:
logger.error(f"Error syncing position: {e}")
# 3. Update SuperOffice Texts (Person)
if person_id:
texts = provisioning_data.get("texts", {})
unsubscribe_link = provisioning_data.get("unsubscribe_link")
udf_update = {}
if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"]
if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"]
if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"]
if unsubscribe_link and settings.UDF_UNSUBSCRIBE_LINK:
udf_update[settings.UDF_UNSUBSCRIBE_LINK] = unsubscribe_link
if udf_update:
logger.info(f"Applying text update to Person {person_id}.")
so_client.update_entity_udfs(person_id, "Person", udf_update)
# --- 4. Create Email Simulation Appointment ---
try:
opener = provisioning_data.get("opener") or ""
intro = texts.get("intro") or ""
proof = texts.get("social_proof") or ""
subject = texts.get("subject", "No Subject")
email_body = f"Betreff: {subject}\n\n{opener}\n\n{intro}\n\n{proof}\n\n(Generated via Gemini Marketing Engine)"
so_client.create_appointment(f"KI: {subject}", email_body, contact_id, person_id)
except Exception as e:
logger.error(f"Failed simulation: {e}")
return ("SUCCESS", None)
def run_worker():
queue = JobQueue()
so_client = None
while not so_client:
try:
so_client = SuperOfficeClient()
if not so_client.access_token: raise Exception("Auth failed")
except Exception as e:
logger.critical(f"Failed to initialize SO Client. Retrying in 30s...")
time.sleep(30)
logger.info("Worker started. Polling queue...")
while True:
try:
job = queue.get_next_job()
if job:
try:
# process_job now takes (job, client, queue)
status, msg = process_job(job, so_client, queue)
if status == "RETRY":
queue.retry_job_later(job['id'], delay_seconds=120, error_msg=msg)
elif status == "FAILED":
queue.fail_job(job['id'], msg or "Job failed status")
elif status == "SKIPPED":
queue.skip_job(job['id'], msg or "Skipped")
elif status == "DELETED":
queue.mark_as_deleted(job['id'], msg or "Deleted in SuperOffice")
else:
queue.complete_job(job['id'])
except Exception as e:
logger.error(f"Job {job['id']} failed: {e}", exc_info=True)
queue.fail_job(job['id'], str(e))
else:
time.sleep(POLL_INTERVAL)
except Exception as e:
logger.error(f"Worker loop error: {e}")
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
run_worker()