import os import requests import logging import time logger = logging.getLogger(__name__) class AuthHandler: """ Handles authentication against the SuperOffice API using an initial Authorization Code exchange and subsequent Refresh Token flow. """ def __init__(self): self.client_id = os.getenv("SO_SOD") self.client_secret = os.getenv("SO_CLIENT_SECRET") self.refresh_token = os.getenv("SO_REFRESH_TOKEN") # Will be set after initial code exchange # Token endpoint for both initial code exchange and refresh token flow self.token_endpoint = "https://sod.superoffice.com/login/common/oauth/tokens" self._access_token = None self._token_expiry_time = 0 if not self.client_id or not self.client_secret: raise ValueError("SO_SOD and SO_CLIENT_SECRET must be set in the environment.") def _is_token_valid(self): """Check if the current access token is still valid.""" return self._access_token and time.time() < self._token_expiry_time def exchange_code_for_tokens(self, auth_code: str, redirect_uri: str): """ Exchanges an Authorization Code for an access token and a refresh token. This is a one-time operation. """ logger.info("Exchanging Authorization Code for tokens...") payload = { 'grant_type': 'authorization_code', 'client_id': self.client_id, 'client_secret': self.client_secret, 'code': auth_code, 'redirect_uri': redirect_uri } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json' } try: response = requests.post(self.token_endpoint, data=payload, headers=headers) response.raise_for_status() token_data = response.json() self._access_token = token_data['access_token'] self.refresh_token = token_data['refresh_token'] # Store the refresh token self._token_expiry_time = time.time() + token_data['expires_in'] - 60 logger.info("Successfully exchanged code for tokens.") return token_data except requests.exceptions.HTTPError as http_err: logger.error(f"HTTP error during code exchange: {http_err} - {response.text}") raise except requests.exceptions.RequestException as req_err: logger.error(f"Request error during code exchange: {req_err}") raise except KeyError: logger.error(f"Unexpected response format from token endpoint during code exchange: {response.text}") raise ValueError("Could not parse tokens from code exchange response.") def get_access_token(self): """ Retrieves an access token. If a valid token exists, it's returned from cache. Otherwise, a new one is requested using the refresh token. """ if self._is_token_valid(): logger.info("Returning cached access token.") return self._access_token if not self.refresh_token: raise ValueError("Refresh token is not available. Please perform initial code exchange.") logger.info(f"Requesting new access token using refresh token from {self.token_endpoint}...") payload = { 'grant_type': 'refresh_token', 'client_id': self.client_id, 'client_secret': self.client_secret, 'refresh_token': self.refresh_token } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json' } try: response = requests.post(self.token_endpoint, data=payload, headers=headers) response.raise_for_status() token_data = response.json() self._access_token = token_data['access_token'] # Update refresh token if it's renewed (optional, but good practice) if 'refresh_token' in token_data: self.refresh_token = token_data['refresh_token'] logger.info("Refresh token was renewed.") self._token_expiry_time = time.time() + token_data['expires_in'] - 60 logger.info("Successfully obtained new access token via refresh token.") return self._access_token except requests.exceptions.HTTPError as http_err: logger.error(f"HTTP error occurred during refresh token use: {http_err} - {response.text}") raise except requests.exceptions.RequestException as req_err: logger.error(f"Request error occurred during refresh token use: {req_err}") raise except KeyError: logger.error(f"Unexpected response format from token endpoint: {response.text}") raise ValueError("Could not parse access token from refresh token response.")