Files
Brancheneinstufung2/connector-superoffice/superoffice_client.py
Floke b7265ff1e7 [2ff88f42] 1. Umfassende Entitäten-Erstellung: Wir haben erfolgreich Methoden implementiert, um die Kern-SuperOffice-Entitäten per API zu erstellen:
1. Umfassende Entitäten-Erstellung: Wir haben erfolgreich Methoden implementiert, um die Kern-SuperOffice-Entitäten per API zu erstellen:
       * Firmen (`Contact`)
       * Personen (`Person`)
       * Verkäufe (`Sale`) (entspricht D365 Opportunity)
       * Projekte (`Project`) (entspricht D365 Campaign), inklusive der Verknüpfung von Personen als Projektmitglieder.
   2. Robuste UDF-Aktualisierung: Wir haben eine generische und fehlertolerante Methode (update_entity_udfs) implementiert, die benutzerdefinierte Felder (UDFs) für sowohl Contact- als
      auch Person-Entitäten aktualisieren kann. Diese Methode ruft zuerst das bestehende Objekt ab, um die Konsistenz zu gewährleisten.
   3. UDF-ID-Discovery: Durch eine iterative Inspektionsmethode haben wir erfolgreich alle internen SuperOffice-IDs für die Listenwerte deines MA Status-Feldes (Ready_to_Send, Sent_Week1,
      Sent_Week2, Bounced, Soft_Denied, Interested, Out_of_Office, Unsubscribed) ermittelt und im Connector hinterlegt.
   4. Vollständiger End-to-End Test-Workflow: Unser main.py-Skript demonstriert nun einen kompletten Ablauf, der alle diese Schritte von der Erstellung bis zur UDF-Aktualisierung umfasst.
   5. Architekturplan für Marketing Automation: Wir haben einen detaillierten "Butler-Service"-Architekturplan für die Marketing-Automatisierung entworfen, der den Connector für die
      Textgenerierung und SuperOffice für den Versand und das Status-Management nutzt.
   6. Identifikation des E-Mail-Blockers: Wir haben festgestellt, dass das Erstellen von E-Mail-Aktivitäten per API in deiner aktuellen SuperOffice-Entwicklungsumgebung aufgrund fehlender
      Lizenzierung/Konfiguration des E-Mail-Moduls blockiert ist (500 Internal Server Error).
2026-02-10 11:06:32 +00:00

404 lines
14 KiB
Python

import requests
import logging
from auth_handler import AuthHandler
logger = logging.getLogger(__name__)
class SuperOfficeClient:
"""
A client for interacting with the SuperOffice REST API using OAuth 2.0 Bearer tokens.
"""
def __init__(self, auth_handler: AuthHandler):
self.auth_handler = auth_handler
self.session = requests.Session()
# Mapping for UDF fields for Contact entity
self.udf_contact_mapping = {
"ai_challenge_sentence": "SuperOffice:1",
"ai_sentence_timestamp": "SuperOffice:2",
"ai_sentence_source_hash": "SuperOffice:3",
"ai_last_outreach_date": "SuperOffice:4"
}
# Mapping for UDF fields for Person entity
self.udf_person_mapping = {
"ai_email_draft": "SuperOffice:1", # NOTE: This is currently a Date field in SO and needs to be changed to Text (Long/Memo)
"ma_status": "SuperOffice:2"
}
# Mapping for MA Status list values (Text Label -> SO ID)
self.ma_status_id_map = {
"Ready_to_Send": 11,
"Sent_Week1": 12,
"Sent_Week2": 13,
"Bounced": 14,
"Soft_Denied": 15,
"Interested": 16,
"Out_of_Office": 17,
"Unsubscribed": 18
}
def _get_headers(self):
"""Returns the authorization headers with Bearer token."""
access_token, _ = self.auth_handler.get_ticket()
return {
'Authorization': f'Bearer {access_token}',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
def _get_url(self, path):
"""Constructs the full URL for a given API path."""
_, webapi_url = self.auth_handler.get_ticket()
base = webapi_url.rstrip('/')
p = path.lstrip('/')
return f"{base}/api/{p}"
def test_connection(self):
"""Tests the connection by fetching the current user."""
url = self._get_url("v1/User/currentPrincipal")
try:
resp = self.session.get(url, headers=self._get_headers())
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Connection test failed: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def find_contact_by_criteria(self, name=None, url=None, org_nr=None):
"""Searches for a contact by OrgNr, URL, or Name."""
filters = []
if org_nr:
filters.append(f"orgNr eq '{org_nr}'")
if url:
filters.append(f"urlAddress eq '{url}'")
if not filters and name:
filters.append(f"name contains '{name}'")
if not filters:
return None
query = " and ".join(filters)
path = f"v1/Contact?$filter={query}"
try:
full_url = self._get_url(path)
resp = self.session.get(full_url, headers=self._get_headers())
resp.raise_for_status()
data = resp.json()
results = data.get("value", [])
if results:
logger.info(f"Found {len(results)} matching contacts.")
return results[0]
return None
except Exception as e:
logger.error(f"Error searching for contact: {e}")
return None
def create_contact(self, name, url=None, org_nr=None):
"""Creates a new contact (company) in SuperOffice with basic details."""
url = self._get_url("v1/Contact")
payload = {
"Name": name,
"OrgNr": org_nr,
"UrlAddress": url,
"ActivePublications": [], # Required field, can be empty
"Emails": [], # Required field, can be empty
"Phones": [] # Required field, can be empty
}
# Remove None values
payload = {k: v for k, v in payload.items() if v is not None}
try:
logger.info(f"Attempting to create contact: {name}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_contact = resp.json()
logger.info(f"Successfully created contact: {created_contact.get('Name')} (ID: {created_contact.get('ContactId')})")
return created_contact
except Exception as e:
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def create_person(self, first_name, last_name, contact_id, email=None):
"""Creates a new person linked to a contact (company)."""
url = self._get_url("v1/Person")
payload = {
"Firstname": first_name,
"Lastname": last_name,
"Contact": {
"ContactId": contact_id
},
"Emails": []
}
if email:
payload["Emails"].append({
"Value": email,
"Rank": 1,
"Description": "Work" # Optional description
})
try:
logger.info(f"Attempting to create person: {first_name} {last_name} for Contact ID {contact_id}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_person = resp.json()
logger.info(f"Successfully created person: {created_person.get('Firstname')} {created_person.get('Lastname')} (ID: {created_person.get('PersonId')})")
return created_person
except Exception as e:
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def create_sale(self, title, contact_id, person_id=None, amount=0.0):
"""Creates a new Sale (Opportunity) linked to a contact and optionally a person."""
url = self._get_url("v1/Sale")
payload = {
"Heading": title,
"Contact": {
"ContactId": contact_id
},
"Amount": amount,
"SaleType": { # Assuming default ID 1 exists
"Id": 1
},
"SaleStage": { # Assuming default ID for the first stage is 1
"Id": 1
},
"Probability": 10 # Default probability
}
if person_id:
payload["Person"] = {
"PersonId": person_id
}
try:
logger.info(f"Attempting to create sale: '{title}' for Contact ID {contact_id}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_sale = resp.json()
logger.info(f"Successfully created sale: {created_sale.get('Heading')} (ID: {created_sale.get('SaleId')})")
return created_sale
except Exception as e:
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def create_project(self, name, contact_id, person_id=None):
"""Creates a new Project linked to a contact and optionally adds a person as a member."""
url = self._get_url("v1/Project")
payload = {
"Name": name,
"Contact": {
"ContactId": contact_id
},
"ProjectType": { # Assuming default ID 1 exists
"Id": 1
},
"ProjectStatus": { # Assuming default ID 1 for 'In progress' exists
"Id": 1
},
"ProjectMembers": []
}
if person_id:
payload["ProjectMembers"].append({
"PersonId": person_id
})
try:
logger.info(f"Attempting to create project: '{name}' for Contact ID {contact_id}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_project = resp.json()
logger.info(f"Successfully created project: {created_project.get('Name')} (ID: {created_project.get('ProjectId')})")
return created_project
except Exception as e:
logger.error(f"Error creating project: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def update_entity_udfs(self, entity_id, entity_type, udf_data: dict):
"""Updates user-defined fields for a given entity (Contact or Person)."""
if entity_type not in ["Contact", "Person"]:
logger.error(f"Invalid entity_type: {entity_type}. Must be 'Contact' or 'Person'.")
return None
# 1. Retrieve the existing entity to ensure all required fields are present in the PUT payload
get_url = self._get_url(f"v1/{entity_type}/{entity_id}")
try:
get_resp = self.session.get(get_url, headers=self._get_headers())
get_resp.raise_for_status()
existing_entity = get_resp.json()
logger.info(f"Successfully retrieved existing {entity_type} ID {entity_id}.")
except Exception as e:
logger.error(f"Error retrieving existing {entity_type} ID {entity_id}: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
# Use the existing entity data as the base for the PUT payload
payload = existing_entity
if "UserDefinedFields" not in payload:
payload["UserDefinedFields"] = {}
# Select the correct mapping based on entity type
udf_mapping = self.udf_contact_mapping if entity_type == "Contact" else self.udf_person_mapping
for key, value in udf_data.items():
prog_id = udf_mapping.get(key)
if prog_id:
if key == "ma_status" and entity_type == "Person":
# For MA Status, we need to send the internal ID directly as an integer
internal_id = self.ma_status_id_map.get(value)
if internal_id:
payload["UserDefinedFields"][prog_id] = internal_id
else:
logger.warning(f"Unknown MA Status value '{value}'. Skipping update for {key}.")
else:
# For other UDFs, send the value directly
payload["UserDefinedFields"][prog_id] = value
else:
logger.warning(f"Unknown UDF key for {entity_type}: {key}. Skipping.")
if not payload["UserDefinedFields"]:
logger.info(f"No valid UDF data to update for {entity_type} ID {entity_id}.")
return None
# 2. Send the updated entity (including all original fields + modified UDFs) via PUT
put_url = self._get_url(f"v1/{entity_type}/{entity_id}")
try:
logger.info(f"Attempting to update UDFs for {entity_type} ID {entity_id} with: {payload['UserDefinedFields']}")
resp = self.session.put(put_url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
updated_entity = resp.json()
logger.info(f"Successfully updated UDFs for {entity_type} ID {entity_id}.")
return updated_entity
except Exception as e:
logger.error(f"Error updating UDFs for {entity_type} ID {entity_id}: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
# NOTE: The create_email_activity method is currently blocked due to SuperOffice environment limitations.
# Attempting to create an Email Activity via API results in a 500 Internal Server Error,
# likely because the email module is not licensed or configured in the SOD environment.
# This method is temporarily commented out.
#
# def create_email_activity(self, person_id, contact_id, subject, body):
# """Creates an Email Activity linked to a person and contact."""
# url = self._get_url("v1/Activity")
#
# payload = {
# "Type": { # Assuming ID 2 for "Email" ActivityType
# "Id": 2
# },
# "Title": subject,
# "Details": body,
# "Person": {
# "PersonId": person_id
# },
# "Contact": {
# "ContactId": contact_id
# }
# }
#
# try:
# logger.info(f"Attempting to create Email Activity with subject '{subject}' for Person ID {person_id} and Contact ID {contact_id}")
# resp = self.session.post(url, headers=self._get_headers(), json=payload)
# resp.raise_for_status()
# created_activity = resp.json()
# logger.info(f"Successfully created Email Activity: '{created_activity.get('Title')}' (ID: {created_activity.get('ActivityId')})")
# return created_activity
# except Exception as e:
# logger.error(f"Error creating Email Activity: {e}")
# if hasattr(e, 'response') and e.response is not None:
# logger.error(f"Response: {e.response.text}")
# return None