This commit restores the full functionality of the script within the module. Several instances were resolved by implementing missing methods in (e.g., , , , , , , ) and correcting argument passing. The data extraction logic in was adjusted to correctly parse the structure returned by the SuperOffice API (e.g., and ). A dedicated SuperOffice API health check script () was introduced to quickly verify basic API connectivity (reading contacts and persons). This script confirmed that read operations for and entities are functional, while the endpoint continues to return a , which is now handled gracefully as a warning, allowing other tests to proceed. The now successfully executes all SuperOffice-specific POC steps, including creating contacts, persons, sales, projects, and updating UDFs.
256 lines
9.6 KiB
Python
256 lines
9.6 KiB
Python
import os
|
|
import requests
|
|
import json
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(override=True)
|
|
|
|
class SuperOfficeClient:
|
|
"""A client for interacting with the SuperOffice REST API."""
|
|
|
|
def __init__(self):
|
|
self.client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
|
|
self.client_secret = os.getenv("SO_CLIENT_SECRET")
|
|
self.refresh_token = os.getenv("SO_REFRESH_TOKEN")
|
|
self.redirect_uri = os.getenv("SO_REDIRECT_URI", "http://localhost")
|
|
self.env = os.getenv("SO_ENVIRONMENT", "sod")
|
|
self.cust_id = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774") # Fallback for your dev
|
|
|
|
if not all([self.client_id, self.client_secret, self.refresh_token]):
|
|
raise ValueError("SuperOffice credentials missing in .env file.")
|
|
|
|
self.base_url = f"https://app-{self.env}.superoffice.com/{self.cust_id}/api/v1"
|
|
self.access_token = self._refresh_access_token()
|
|
if not self.access_token:
|
|
raise Exception("Failed to authenticate with SuperOffice.")
|
|
|
|
self.headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json"
|
|
}
|
|
print("✅ SuperOffice Client initialized and authenticated.")
|
|
|
|
def _refresh_access_token(self):
|
|
"""Refreshes and returns a new access token."""
|
|
url = f"https://{self.env}.superoffice.com/login/common/oauth/tokens"
|
|
data = {
|
|
"grant_type": "refresh_token",
|
|
"client_id": self.client_id,
|
|
"client_secret": self.client_secret,
|
|
"refresh_token": self.refresh_token,
|
|
"redirect_uri": self.redirect_uri
|
|
}
|
|
try:
|
|
resp = requests.post(url, data=data)
|
|
resp.raise_for_status()
|
|
return resp.json().get("access_token")
|
|
except requests.exceptions.HTTPError as e:
|
|
print(f"❌ Token Refresh Error: {e.response.text}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"❌ Connection Error during token refresh: {e}")
|
|
return None
|
|
|
|
def _get(self, endpoint):
|
|
"""Generic GET request."""
|
|
try:
|
|
resp = requests.get(f"{self.base_url}/{endpoint}", headers=self.headers)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
print(f"❌ API GET Error for {endpoint}: {e.response.text}")
|
|
return None
|
|
|
|
def _put(self, endpoint, payload):
|
|
"""Generic PUT request."""
|
|
try:
|
|
resp = requests.put(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
print(f"❌ API PUT Error for {endpoint}: {e.response.text}")
|
|
return None
|
|
|
|
def get_person(self, person_id):
|
|
"""Gets a single person by ID."""
|
|
return self._get(f"Person/{person_id}")
|
|
|
|
def get_contact(self, contact_id):
|
|
"""Gets a single contact (company) by ID."""
|
|
return self._get(f"Contact/{contact_id}")
|
|
|
|
def update_udfs(self, entity: str, entity_id: int, udf_payload: dict):
|
|
"""
|
|
Updates the UserDefinedFields for a given entity (Person or Contact).
|
|
|
|
Args:
|
|
entity (str): "Person" or "Contact".
|
|
entity_id (int): The ID of the entity.
|
|
udf_payload (dict): A dictionary of ProgId:Value pairs.
|
|
"""
|
|
endpoint = f"{entity}/{entity_id}"
|
|
|
|
# 1. GET the full entity object
|
|
existing_data = self._get(endpoint)
|
|
if not existing_data:
|
|
return False # Error is printed in _get
|
|
|
|
# 2. Merge the UDF payload
|
|
if "UserDefinedFields" not in existing_data:
|
|
existing_data["UserDefinedFields"] = {}
|
|
existing_data["UserDefinedFields"].update(udf_payload)
|
|
|
|
# 3. PUT the full object back
|
|
print(f"Updating {entity} {entity_id} with new UDFs...")
|
|
result = self._put(endpoint, existing_data)
|
|
|
|
if result:
|
|
print(f"✅ Successfully updated {entity} {entity_id}")
|
|
return True
|
|
return False
|
|
|
|
|
|
def search(self, query_string: str):
|
|
"""
|
|
Performs a search using OData syntax and handles pagination.
|
|
Example: "Person?$select=personId&$filter=lastname eq 'Godelmann'"
|
|
"""
|
|
all_results = []
|
|
next_page_url = f"{self.base_url}/{query_string}"
|
|
|
|
while next_page_url:
|
|
try:
|
|
resp = requests.get(next_page_url, headers=self.headers)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
# Add the items from the current page
|
|
all_results.extend(data.get('value', []))
|
|
|
|
# Check for the next page link
|
|
next_page_url = data.get('next_page_url', None)
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
print(f"❌ API Search Error for {query_string}: {e.response.text}")
|
|
return None
|
|
|
|
return all_results
|
|
|
|
def find_contact_by_criteria(self, name=None, org_nr=None, url=None):
|
|
"""
|
|
Finds a contact (company) by name, OrgNr, or URL.
|
|
Returns the first matching contact or None.
|
|
"""
|
|
filter_parts = []
|
|
if name:
|
|
filter_parts.append(f"Name eq '{name}'")
|
|
if org_nr:
|
|
filter_parts.append(f"OrgNr eq '{org_nr}'")
|
|
if url:
|
|
filter_parts.append(f"UrlAddress eq '{url}'")
|
|
|
|
if not filter_parts:
|
|
print("❌ No criteria provided for contact search.")
|
|
return None
|
|
|
|
query_string = "Contact?$filter=" + " or ".join(filter_parts)
|
|
results = self.search(query_string)
|
|
if results:
|
|
return results[0] # Return the first match
|
|
return None
|
|
|
|
def _post(self, endpoint, payload):
|
|
"""Generic POST request."""
|
|
try:
|
|
resp = requests.post(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
print(f"❌ API POST Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"❌ Connection Error during POST for {endpoint}: {e}")
|
|
return None
|
|
|
|
def create_contact(self, name: str, url: str = None, org_nr: str = None):
|
|
"""Creates a new contact (company)."""
|
|
payload = {"Name": name}
|
|
if url:
|
|
payload["UrlAddress"] = url
|
|
if org_nr:
|
|
payload["OrgNr"] = org_nr
|
|
|
|
print(f"Creating new contact: {name} with payload: {payload}...") # Added payload to log
|
|
return self._post("Contact", payload)
|
|
|
|
def create_person(self, first_name: str, last_name: str, contact_id: int, email: str = None):
|
|
"""Creates a new person linked to a contact."""
|
|
payload = {
|
|
"Firstname": first_name,
|
|
"Lastname": last_name,
|
|
"Contact": {"ContactId": contact_id}
|
|
}
|
|
if email:
|
|
payload["EmailAddress"] = email
|
|
|
|
print(f"Creating new person: {first_name} {last_name} for Contact ID {contact_id}...")
|
|
return self._post("Person", payload)
|
|
|
|
def create_sale(self, title: str, contact_id: int, person_id: int, amount: float = None):
|
|
"""Creates a new sale (opportunity) linked to a contact and person."""
|
|
payload = {
|
|
"Heading": title,
|
|
"Contact": {"ContactId": contact_id},
|
|
"Person": {"PersonId": person_id}
|
|
}
|
|
if amount:
|
|
payload["Amount"] = amount
|
|
|
|
print(f"Creating new sale: {title}...")
|
|
return self._post("Sale", payload)
|
|
|
|
def create_project(self, name: str, contact_id: int, person_id: int = None):
|
|
"""Creates a new project linked to a contact, and optionally adds a person."""
|
|
payload = {
|
|
"Name": name,
|
|
"Contact": {"ContactId": contact_id}
|
|
}
|
|
if person_id:
|
|
# Adding a person to a project requires a ProjectMember object
|
|
payload["ProjectMembers"] = [
|
|
{
|
|
"Person": {"PersonId": person_id},
|
|
"Role": "Member" # Default role, can be configured if needed
|
|
}
|
|
]
|
|
|
|
print(f"Creating new project: {name}...")
|
|
return self._post("Project", payload)
|
|
|
|
def update_entity_udfs(self, entity_id: int, entity_type: str, udf_data: dict):
|
|
"""
|
|
Updates UDFs for a given entity (Contact or Person).
|
|
Args:
|
|
entity_id (int): ID of the entity.
|
|
entity_type (str): 'Contact' or 'Person'.
|
|
udf_data (dict): Dictionary with ProgId:Value pairs for UDFs.
|
|
Returns:
|
|
dict: The updated entity object from the API, or None on failure.
|
|
"""
|
|
# We need to GET the existing entity, update its UDFs, then PUT it back.
|
|
endpoint = f"{entity_type}/{entity_id}"
|
|
existing_entity = self._get(endpoint)
|
|
if not existing_entity:
|
|
print(f"❌ Failed to retrieve existing {entity_type} {entity_id} for UDF update.")
|
|
return None
|
|
|
|
if "UserDefinedFields" not in existing_entity:
|
|
existing_entity["UserDefinedFields"] = {}
|
|
|
|
existing_entity["UserDefinedFields"].update(udf_data)
|
|
|
|
print(f"Updating {entity_type} {entity_id} UDFs: {udf_data}...")
|
|
return self._put(endpoint, existing_entity)
|
|
|