feat(connector-superoffice): implement OAuth 2.0 flow and S2S architecture
Completed POC for SuperOffice integration with the following key achievements: - Switched from RSA/SOAP to OAuth 2.0 (Refresh Token Flow) for better compatibility with SOD environment. - Implemented robust token refreshing and caching mechanism in . - Solved 'Wrong Subdomain' issue by enforcing for tenant . - Created for REST API interaction (Search, Create, Update UDFs). - Added helper scripts: , , . - Documented usage and configuration in . - Updated configuration requirements. [2ff88f42]
This commit is contained in:
@@ -1,55 +1,91 @@
|
||||
import requests
|
||||
import logging
|
||||
from .auth_handler import AuthHandler
|
||||
from auth_handler import AuthHandler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SuperOfficeClient:
|
||||
"""
|
||||
A client for interacting with the SuperOffice API.
|
||||
A client for interacting with the SuperOffice REST API using OAuth 2.0 Bearer tokens.
|
||||
"""
|
||||
def __init__(self, auth_handler: AuthHandler, tenant_id: str):
|
||||
# Base URL for the SuperOffice REST API, including tenant_id
|
||||
self.base_url = f"https://sod.superoffice.com/{tenant_id}/api"
|
||||
def __init__(self, auth_handler: AuthHandler):
|
||||
self.auth_handler = auth_handler
|
||||
self.tenant_id = tenant_id
|
||||
self.session = requests.Session()
|
||||
|
||||
# Mapping for UDF fields (These are typical technical names, but might need adjustment)
|
||||
self.udf_mapping = {
|
||||
"robotics_potential": "x_robotics_potential",
|
||||
"industry": "x_ai_industry",
|
||||
"summary": "x_ai_summary",
|
||||
"last_update": "x_ai_last_update",
|
||||
"status": "x_ai_status"
|
||||
}
|
||||
|
||||
# Mapping for list values (Explorer -> SO ID)
|
||||
self.potential_id_map = {
|
||||
"High": 1,
|
||||
"Medium": 2,
|
||||
"Low": 3,
|
||||
"None": 4
|
||||
}
|
||||
|
||||
def _get_auth_headers(self):
|
||||
"""Returns the authorization headers with a valid token and context."""
|
||||
access_token = self.auth_handler.get_access_token()
|
||||
def _get_headers(self):
|
||||
"""Returns the authorization headers with Bearer token."""
|
||||
access_token, _ = self.auth_handler.get_ticket()
|
||||
return {
|
||||
'Authorization': f'Bearer {access_token}',
|
||||
'Accept': 'application/json',
|
||||
'X-SuperOffice-Context': f'TenantId={self.tenant_id}' # Crucial for multi-tenant environments
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
def _get_url(self, path):
|
||||
"""Constructs the full URL for a given API path."""
|
||||
_, webapi_url = self.auth_handler.get_ticket()
|
||||
base = webapi_url.rstrip('/')
|
||||
p = path.lstrip('/')
|
||||
return f"{base}/api/{p}"
|
||||
|
||||
def test_connection(self):
|
||||
"""
|
||||
Performs a simple API call to test the connection and authentication.
|
||||
Fetches the current user principal.
|
||||
"""
|
||||
endpoint = "/v1/User/currentPrincipal"
|
||||
test_url = f"{self.base_url}{endpoint}"
|
||||
|
||||
logger.info(f"Attempting to test connection to: {test_url}")
|
||||
|
||||
"""Tests the connection by fetching the current user."""
|
||||
url = self._get_url("v1/User/currentPrincipal")
|
||||
try:
|
||||
headers = self._get_auth_headers()
|
||||
response = self.session.get(test_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
user_data = response.json()
|
||||
logger.info("Successfully connected to SuperOffice API.")
|
||||
logger.info(f"Authenticated as: {user_data.get('Name', 'N/A')} ({user_data.get('Associate', 'N/A')})")
|
||||
return user_data
|
||||
|
||||
except requests.exceptions.HTTPError as http_err:
|
||||
logger.error(f"HTTP error during connection test: {http_err} - {http_err.response.text}")
|
||||
resp = self.session.get(url, headers=self._get_headers())
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
except Exception as e:
|
||||
logger.error(f"Connection test failed: {e}")
|
||||
if hasattr(e, 'response') and e.response is not None:
|
||||
logger.error(f"Response: {e.response.text}")
|
||||
return None
|
||||
except requests.exceptions.RequestException as req_err:
|
||||
logger.error(f"Request error during connection test: {req_err}")
|
||||
|
||||
def find_contact_by_criteria(self, name=None, url=None, org_nr=None):
|
||||
"""Searches for a contact by OrgNr, URL, or Name."""
|
||||
filters = []
|
||||
if org_nr:
|
||||
filters.append(f"orgNr eq '{org_nr}'")
|
||||
if url:
|
||||
filters.append(f"urlAddress eq '{url}'")
|
||||
|
||||
if not filters and name:
|
||||
filters.append(f"name contains '{name}'")
|
||||
|
||||
if not filters:
|
||||
return None
|
||||
|
||||
query = " and ".join(filters)
|
||||
path = f"v1/Contact?$filter={query}"
|
||||
|
||||
try:
|
||||
full_url = self._get_url(path)
|
||||
resp = self.session.get(full_url, headers=self._get_headers())
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
results = data.get("value", [])
|
||||
if results:
|
||||
logger.info(f"Found {len(results)} matching contacts.")
|
||||
return results[0]
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"An unexpected error occurred during connection test: {e}")
|
||||
logger.error(f"Error searching for contact: {e}")
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user