feat(superoffice): POC API handshake & auth flow [2ff88f42]

Establishes the initial structure for the SuperOffice connector. Implements the complete, iterative authentication process, culminating in a successful refresh token exchange. Documents the process and the final blocker (API authorization) in the integration plan, awaiting IT action to change the application type to 'Server to server'.
This commit is contained in:
2026-02-06 13:52:44 +00:00
parent e6a3a24750
commit ff62024ef7
10 changed files with 310 additions and 24 deletions

View File

@@ -0,0 +1,121 @@
import os
import requests
import logging
import time
logger = logging.getLogger(__name__)
class AuthHandler:
"""
Handles authentication against the SuperOffice API using an initial Authorization Code
exchange and subsequent Refresh Token flow.
"""
def __init__(self):
self.client_id = os.getenv("SO_SOD")
self.client_secret = os.getenv("SO_CLIENT_SECRET")
self.refresh_token = os.getenv("SO_REFRESH_TOKEN") # Will be set after initial code exchange
# Token endpoint for both initial code exchange and refresh token flow
self.token_endpoint = "https://sod.superoffice.com/login/common/oauth/tokens"
self._access_token = None
self._token_expiry_time = 0
if not self.client_id or not self.client_secret:
raise ValueError("SO_SOD and SO_CLIENT_SECRET must be set in the environment.")
def _is_token_valid(self):
"""Check if the current access token is still valid."""
return self._access_token and time.time() < self._token_expiry_time
def exchange_code_for_tokens(self, auth_code: str, redirect_uri: str):
"""
Exchanges an Authorization Code for an access token and a refresh token.
This is a one-time operation.
"""
logger.info("Exchanging Authorization Code for tokens...")
payload = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': auth_code,
'redirect_uri': redirect_uri
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
try:
response = requests.post(self.token_endpoint, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
self._access_token = token_data['access_token']
self.refresh_token = token_data['refresh_token'] # Store the refresh token
self._token_expiry_time = time.time() + token_data['expires_in'] - 60
logger.info("Successfully exchanged code for tokens.")
return token_data
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error during code exchange: {http_err} - {response.text}")
raise
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error during code exchange: {req_err}")
raise
except KeyError:
logger.error(f"Unexpected response format from token endpoint during code exchange: {response.text}")
raise ValueError("Could not parse tokens from code exchange response.")
def get_access_token(self):
"""
Retrieves an access token. If a valid token exists, it's returned from cache.
Otherwise, a new one is requested using the refresh token.
"""
if self._is_token_valid():
logger.info("Returning cached access token.")
return self._access_token
if not self.refresh_token:
raise ValueError("Refresh token is not available. Please perform initial code exchange.")
logger.info(f"Requesting new access token using refresh token from {self.token_endpoint}...")
payload = {
'grant_type': 'refresh_token',
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': self.refresh_token
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
try:
response = requests.post(self.token_endpoint, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
self._access_token = token_data['access_token']
# Update refresh token if it's renewed (optional, but good practice)
if 'refresh_token' in token_data:
self.refresh_token = token_data['refresh_token']
logger.info("Refresh token was renewed.")
self._token_expiry_time = time.time() + token_data['expires_in'] - 60
logger.info("Successfully obtained new access token via refresh token.")
return self._access_token
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error occurred during refresh token use: {http_err} - {response.text}")
raise
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error occurred during refresh token use: {req_err}")
raise
except KeyError:
logger.error(f"Unexpected response format from token endpoint: {response.text}")
raise ValueError("Could not parse access token from refresh token response.")