Files
Brancheneinstufung2/connector-superoffice/auth_handler.py
Floke 53b92c76de feat(superoffice): POC API handshake & auth flow [2ff88f42]
Establishes the initial structure for the SuperOffice connector. Implements the complete, iterative authentication process, culminating in a successful refresh token exchange. Documents the process and the final blocker (API authorization) in the integration plan, awaiting IT action to change the application type to 'Server to server'.
2026-02-06 13:52:44 +00:00

122 lines
4.9 KiB
Python

import os
import requests
import logging
import time
logger = logging.getLogger(__name__)
class AuthHandler:
"""
Handles authentication against the SuperOffice API using an initial Authorization Code
exchange and subsequent Refresh Token flow.
"""
def __init__(self):
self.client_id = os.getenv("SO_SOD")
self.client_secret = os.getenv("SO_CLIENT_SECRET")
self.refresh_token = os.getenv("SO_REFRESH_TOKEN") # Will be set after initial code exchange
# Token endpoint for both initial code exchange and refresh token flow
self.token_endpoint = "https://sod.superoffice.com/login/common/oauth/tokens"
self._access_token = None
self._token_expiry_time = 0
if not self.client_id or not self.client_secret:
raise ValueError("SO_SOD and SO_CLIENT_SECRET must be set in the environment.")
def _is_token_valid(self):
"""Check if the current access token is still valid."""
return self._access_token and time.time() < self._token_expiry_time
def exchange_code_for_tokens(self, auth_code: str, redirect_uri: str):
"""
Exchanges an Authorization Code for an access token and a refresh token.
This is a one-time operation.
"""
logger.info("Exchanging Authorization Code for tokens...")
payload = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': auth_code,
'redirect_uri': redirect_uri
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
try:
response = requests.post(self.token_endpoint, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
self._access_token = token_data['access_token']
self.refresh_token = token_data['refresh_token'] # Store the refresh token
self._token_expiry_time = time.time() + token_data['expires_in'] - 60
logger.info("Successfully exchanged code for tokens.")
return token_data
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error during code exchange: {http_err} - {response.text}")
raise
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error during code exchange: {req_err}")
raise
except KeyError:
logger.error(f"Unexpected response format from token endpoint during code exchange: {response.text}")
raise ValueError("Could not parse tokens from code exchange response.")
def get_access_token(self):
"""
Retrieves an access token. If a valid token exists, it's returned from cache.
Otherwise, a new one is requested using the refresh token.
"""
if self._is_token_valid():
logger.info("Returning cached access token.")
return self._access_token
if not self.refresh_token:
raise ValueError("Refresh token is not available. Please perform initial code exchange.")
logger.info(f"Requesting new access token using refresh token from {self.token_endpoint}...")
payload = {
'grant_type': 'refresh_token',
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': self.refresh_token
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
try:
response = requests.post(self.token_endpoint, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
self._access_token = token_data['access_token']
# Update refresh token if it's renewed (optional, but good practice)
if 'refresh_token' in token_data:
self.refresh_token = token_data['refresh_token']
logger.info("Refresh token was renewed.")
self._token_expiry_time = time.time() + token_data['expires_in'] - 60
logger.info("Successfully obtained new access token via refresh token.")
return self._access_token
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error occurred during refresh token use: {http_err} - {response.text}")
raise
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error occurred during refresh token use: {req_err}")
raise
except KeyError:
logger.error(f"Unexpected response format from token endpoint: {response.text}")
raise ValueError("Could not parse access token from refresh token response.")