feat: Build complete POC for Butler model (client, matrix, daemon)

This commit is contained in:
Jarvis
2026-02-12 14:18:52 +00:00
parent 5b1939c881
commit 5a19a9c85f
13 changed files with 1288 additions and 352 deletions

View File

@@ -1,312 +1,138 @@
import os
import requests
import logging
from auth_handler import AuthHandler
from config import Config
from logging_config import setup_logging
import json
from dotenv import load_dotenv
# Use the centralized logging configuration
logger = setup_logging(__name__)
load_dotenv(override=True)
class SuperOfficeClient:
"""
A client for interacting with the SuperOffice REST API using OAuth 2.0 Bearer tokens.
"""
def __init__(self, auth_handler: AuthHandler):
self.auth_handler = auth_handler
self.session = requests.Session()
"""A client for interacting with the SuperOffice REST API."""
def __init__(self):
self.client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
self.client_secret = os.getenv("SO_CLIENT_SECRET")
self.refresh_token = os.getenv("SO_REFRESH_TOKEN")
self.redirect_uri = os.getenv("SO_REDIRECT_URI", "http://localhost")
self.env = os.getenv("SO_ENVIRONMENT", "sod")
self.cust_id = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774") # Fallback for your dev
if not all([self.client_id, self.client_secret, self.refresh_token]):
raise ValueError("SuperOffice credentials missing in .env file.")
self.base_url = f"https://app-{self.env}.superoffice.com/{self.cust_id}/api/v1"
self.access_token = self._refresh_access_token()
if not self.access_token:
raise Exception("Failed to authenticate with SuperOffice.")
# Load mappings from Config
self.udf_contact_mapping = Config.UDF_CONTACT_MAPPING
self.udf_person_mapping = Config.UDF_PERSON_MAPPING
self.ma_status_id_map = Config.MA_STATUS_ID_MAP
def _get_headers(self):
"""Returns the authorization headers with Bearer token."""
access_token, _ = self.auth_handler.get_ticket()
return {
'Authorization': f'Bearer {access_token}',
'Accept': 'application/json',
'Content-Type': 'application/json'
self.headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
print("✅ SuperOffice Client initialized and authenticated.")
def _get_url(self, path):
"""Constructs the full URL for a given API path."""
_, webapi_url = self.auth_handler.get_ticket()
base = webapi_url.rstrip('/')
p = path.lstrip('/')
return f"{base}/api/{p}"
def test_connection(self):
"""Tests the connection by fetching the current user."""
url = self._get_url("v1/User/currentPrincipal")
def _refresh_access_token(self):
"""Refreshes and returns a new access token."""
url = f"https://{self.env}.superoffice.com/login/common/oauth/tokens"
data = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
"redirect_uri": self.redirect_uri
}
try:
resp = self.session.get(url, headers=self._get_headers())
resp = requests.post(url, data=data)
resp.raise_for_status()
return resp.json().get("access_token")
except requests.exceptions.HTTPError as e:
print(f"❌ Token Refresh Error: {e.response.text}")
return None
except Exception as e:
print(f"❌ Connection Error during token refresh: {e}")
return None
def _get(self, endpoint):
"""Generic GET request."""
try:
resp = requests.get(f"{self.base_url}/{endpoint}", headers=self.headers)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Connection test failed: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
except requests.exceptions.HTTPError as e:
print(f"❌ API GET Error for {endpoint}: {e.response.text}")
return None
def find_contact_by_criteria(self, name=None, url=None, org_nr=None):
"""Searches for a contact by OrgNr, URL, or Name."""
filters = []
if org_nr:
filters.append(f"orgNr eq '{org_nr}'")
if url:
filters.append(f"urlAddress eq '{url}'")
if not filters and name:
filters.append(f"name contains '{name}'")
if not filters:
return None
query = " and ".join(filters)
path = f"v1/Contact?$filter={query}"
def _put(self, endpoint, payload):
"""Generic PUT request."""
try:
full_url = self._get_url(path)
resp = self.session.get(full_url, headers=self._get_headers())
resp = requests.put(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload)
resp.raise_for_status()
data = resp.json()
results = data.get("value", [])
if results:
logger.info(f"Found {len(results)} matching contacts.")
return results[0]
return None
except Exception as e:
logger.error(f"Error searching for contact: {e}")
return resp.json()
except requests.exceptions.HTTPError as e:
print(f"❌ API PUT Error for {endpoint}: {e.response.text}")
return None
def create_contact(self, name, url=None, org_nr=None):
"""Creates a new contact (company) in SuperOffice with basic details."""
url = self._get_url("v1/Contact")
payload = {
"Name": name,
"OrgNr": org_nr,
"UrlAddress": url,
"ActivePublications": [], # Required field, can be empty
"Emails": [], # Required field, can be empty
"Phones": [] # Required field, can be empty
}
def get_person(self, person_id):
"""Gets a single person by ID."""
return self._get(f"Person/{person_id}")
def get_contact(self, contact_id):
"""Gets a single contact (company) by ID."""
return self._get(f"Contact/{contact_id}")
def update_udfs(self, entity: str, entity_id: int, udf_payload: dict):
"""
Updates the UserDefinedFields for a given entity (Person or Contact).
# Remove None values
payload = {k: v for k, v in payload.items() if v is not None}
try:
logger.info(f"Attempting to create contact: {name}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_contact = resp.json()
logger.info(f"Successfully created contact: {created_contact.get('Name')} (ID: {created_contact.get('ContactId')})")
return created_contact
except Exception as e:
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def create_person(self, first_name, last_name, contact_id, email=None):
"""Creates a new person linked to a contact (company)."""
url = self._get_url("v1/Person")
Args:
entity (str): "Person" or "Contact".
entity_id (int): The ID of the entity.
udf_payload (dict): A dictionary of ProgId:Value pairs.
"""
endpoint = f"{entity}/{entity_id}"
payload = {
"Firstname": first_name,
"Lastname": last_name,
"Contact": {
"ContactId": contact_id
},
"Emails": []
}
# 1. GET the full entity object
existing_data = self._get(endpoint)
if not existing_data:
return False # Error is printed in _get
# 2. Merge the UDF payload
if "UserDefinedFields" not in existing_data:
existing_data["UserDefinedFields"] = {}
existing_data["UserDefinedFields"].update(udf_payload)
# 3. PUT the full object back
print(f"Updating {entity} {entity_id} with new UDFs...")
result = self._put(endpoint, existing_data)
if result:
print(f"✅ Successfully updated {entity} {entity_id}")
return True
return False
def search(self, query_string: str):
"""
Performs a search using OData syntax and handles pagination.
Example: "Person?$select=personId&$filter=lastname eq 'Godelmann'"
"""
all_results = []
next_page_url = f"{self.base_url}/{query_string}"
while next_page_url:
try:
resp = requests.get(next_page_url, headers=self.headers)
resp.raise_for_status()
data = resp.json()
# Add the items from the current page
all_results.extend(data.get('value', []))
# Check for the next page link
next_page_url = data.get('next_page_url', None)
except requests.exceptions.HTTPError as e:
print(f"❌ API Search Error for {query_string}: {e.response.text}")
return None
if email:
payload["Emails"].append({
"Value": email,
"Rank": 1,
"Description": "Work" # Optional description
})
try:
logger.info(f"Attempting to create person: {first_name} {last_name} for Contact ID {contact_id}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_person = resp.json()
logger.info(f"Successfully created person: {created_person.get('Firstname')} {created_person.get('Lastname')} (ID: {created_person.get('PersonId')})")
return created_person
except Exception as e:
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def create_sale(self, title, contact_id, person_id=None, amount=0.0):
"""Creates a new Sale (Opportunity) linked to a contact and optionally a person."""
url = self._get_url("v1/Sale")
payload = {
"Heading": title,
"Contact": {
"ContactId": contact_id
},
"Amount": amount,
"SaleType": { # Assuming default ID 1 exists
"Id": 1
},
"SaleStage": { # Assuming default ID for the first stage is 1
"Id": 1
},
"Probability": 10 # Default probability
}
if person_id:
payload["Person"] = {
"PersonId": person_id
}
try:
logger.info(f"Attempting to create sale: '{title}' for Contact ID {contact_id}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_sale = resp.json()
logger.info(f"Successfully created sale: {created_sale.get('Heading')} (ID: {created_sale.get('SaleId')})")
return created_sale
except Exception as e:
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def create_project(self, name, contact_id, person_id=None):
"""Creates a new Project linked to a contact and optionally adds a person as a member."""
url = self._get_url("v1/Project")
payload = {
"Name": name,
"Contact": {
"ContactId": contact_id
},
"ProjectType": { # Assuming default ID 1 exists
"Id": 1
},
"ProjectStatus": { # Assuming default ID 1 for 'In progress' exists
"Id": 1
},
"ProjectMembers": []
}
if person_id:
payload["ProjectMembers"].append({
"PersonId": person_id
})
try:
logger.info(f"Attempting to create project: '{name}' for Contact ID {contact_id}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_project = resp.json()
logger.info(f"Successfully created project: {created_project.get('Name')} (ID: {created_project.get('ProjectId')})")
return created_project
except Exception as e:
logger.error(f"Error creating project: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def update_entity_udfs(self, entity_id, entity_type, udf_data: dict):
"""Updates user-defined fields for a given entity (Contact or Person)."""
if entity_type not in ["Contact", "Person"]:
logger.error(f"Invalid entity_type: {entity_type}. Must be 'Contact' or 'Person'.")
return None
# 1. Retrieve the existing entity to ensure all required fields are present in the PUT payload
get_url = self._get_url(f"v1/{entity_type}/{entity_id}")
try:
get_resp = self.session.get(get_url, headers=self._get_headers())
get_resp.raise_for_status()
existing_entity = get_resp.json()
logger.info(f"Successfully retrieved existing {entity_type} ID {entity_id}.")
except Exception as e:
logger.error(f"Error retrieving existing {entity_type} ID {entity_id}: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
# Use the existing entity data as the base for the PUT payload
payload = existing_entity
if "UserDefinedFields" not in payload:
payload["UserDefinedFields"] = {}
# Select the correct mapping based on entity type
udf_mapping = self.udf_contact_mapping if entity_type == "Contact" else self.udf_person_mapping
for key, value in udf_data.items():
prog_id = udf_mapping.get(key)
if prog_id:
if key == "ma_status" and entity_type == "Person":
# For MA Status, we need to send the internal ID directly as an integer
internal_id = self.ma_status_id_map.get(value)
if internal_id:
payload["UserDefinedFields"][prog_id] = internal_id
else:
logger.warning(f"Unknown MA Status value '{value}'. Skipping update for {key}.")
else:
# For other UDFs, send the value directly
payload["UserDefinedFields"][prog_id] = value
else:
logger.warning(f"Unknown UDF key for {entity_type}: {key}. Skipping.")
if not payload["UserDefinedFields"]:
logger.info(f"No valid UDF data to update for {entity_type} ID {entity_id}.")
return None
# 2. Send the updated entity (including all original fields + modified UDFs) via PUT
put_url = self._get_url(f"v1/{entity_type}/{entity_id}")
try:
logger.info(f"Attempting to update UDFs for {entity_type} ID {entity_id} with: {payload['UserDefinedFields']}")
resp = self.session.put(put_url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
updated_entity = resp.json()
logger.info(f"Successfully updated UDFs for {entity_type} ID {entity_id}.")
return updated_entity
except Exception as e:
logger.error(f"Error updating UDFs for {entity_type} ID {entity_id}: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
# NOTE: The create_email_activity method is currently blocked due to SuperOffice environment limitations.
# Attempting to create an Email Activity via API results in a 500 Internal Server Error,
# likely because the email module is not licensed or configured in the SOD environment.
# This method is temporarily commented out.
#
# def create_email_activity(self, person_id, contact_id, subject, body):
# """Creates an Email Activity linked to a person and contact."""
# url = self._get_url("v1/Activity")
#
# payload = {
# "Type": { # Assuming ID 2 for "Email" ActivityType
# "Id": 2
# },
# "Title": subject,
# "Details": body,
# "Person": {
# "PersonId": person_id
# },
# "Contact": {
# "ContactId": contact_id
# }
# }
#
# try:
# logger.info(f"Attempting to create Email Activity with subject '{subject}' for Person ID {person_id} and Contact ID {contact_id}")
# resp = self.session.post(url, headers=self._get_headers(), json=payload)
# resp.raise_for_status()
# created_activity = resp.json()
# logger.info(f"Successfully created Email Activity: '{created_activity.get('Title')}' (ID: {created_activity.get('ActivityId')})")
# return created_activity
# except Exception as e:
# logger.error(f"Error creating Email Activity: {e}")
# if hasattr(e, 'response') and e.response is not None:
# logger.error(f"Response: {e.response.text}")
# return None
return all_results