feat(connector-superoffice): implement OAuth 2.0 flow and S2S architecture

Completed POC for SuperOffice integration with the following key achievements:
- Switched from RSA/SOAP to OAuth 2.0 (Refresh Token Flow) for better compatibility with SOD environment.
- Implemented robust token refreshing and caching mechanism in .
- Solved 'Wrong Subdomain' issue by enforcing  for tenant .
- Created  for REST API interaction (Search, Create, Update UDFs).
- Added helper scripts: , , .
- Documented usage and configuration in .
- Updated  configuration requirements.

[2ff88f42]
This commit is contained in:
2026-02-09 16:04:16 +00:00
parent 5d6ce40a7e
commit 938a81fd97
10 changed files with 458 additions and 185 deletions

View File

@@ -0,0 +1,75 @@
# SuperOffice Connector (POC)
Dies ist der Microservice zur Anbindung von **SuperOffice CRM** (SOD / Cloud) an die **GTM-Engine**.
Der Connector nutzt die **REST API (v1)** und authentifiziert sich via **OAuth 2.0** (Refresh Token Flow).
## 1. Status Quo (Februar 2026)
Der **Proof of Concept (POC)** ist erfolgreich abgeschlossen.
- **Verbindung:** Erfolgreich hergestellt (`Cust55774`).
- **Authentifizierung:** Funktioniert automatisch via Refresh Token.
- **Datenabruf:** Stammdaten (Contact ID 1) können gelesen werden.
- **Daten schreiben:** Vorbereitet (Code ist fertig), erfordert jedoch noch das Anlegen der UDF-Felder im SuperOffice Admin-Panel.
## 2. Einrichtung & Installation
### Voraussetzungen
* Python 3.11+
* Zugriff auf die `.env` Datei im Root-Verzeichnis (siehe unten).
### Installation
```bash
cd connector-superoffice
../.venv/bin/pip install -r requirements.txt
```
### Konfiguration (.env)
Die Authentifizierung erfordert folgende Variablen in der zentralen `.env`:
```ini
# SuperOffice Client Configuration
SO_SOD="<Client ID / App ID>" # Aus dem Developer Portal
SO_CLIENT_SECRET="<Client Secret>" # Aus dem Developer Portal
SO_CONTEXT_IDENTIFIER="CustXXXXX" # Deine Tenant ID (z.B. Cust55774)
# Authentication Token
SO_REFRESH_TOKEN="<Dein langlebiger Refresh Token>"
# (Optional / Legacy S2S Configuration - aktuell nicht genutzt)
# SO_PRIVATE_KEY="..."
# SO_SYSTEM_USER_TOKEN="..."
```
## 3. Nutzung
### Verbindungstest (Main Script)
Führt einen Login durch und sucht nach einer Test-Firma.
```bash
cd connector-superoffice
../.venv/bin/python main.py
```
### Felder entdecken
Listet alle verfügbaren Felder (inkl. UDFs) auf, um die technischen Namen (`ProgId`) für das Mapping zu finden.
```bash
../.venv/bin/python discover_fields.py
```
## 4. Technische Details & "Gotchas"
### Subdomain-Auflösung (`app-sod`)
Eine wichtige Erkenntnis des POCs war, dass die Standard-API-URL `sod.superoffice.com` nicht für alle Tenants funktioniert.
Der `AuthHandler` wurde so angepasst, dass er (für diesen Tenant) **`https://app-sod.superoffice.com`** als Basis-URL erzwingt.
* *Code-Stelle:* `auth_handler.py` -> `refresh_access_token`
### OAuth vs. System User
Ursprünglich war ein "Server-to-Server" (S2S) Flow mittels RSA-Zertifikaten geplant. Da der Zugriff auf den System-User-Token im SOD-Tenant `Cust55774` aufgrund von UI-Änderungen und Berechtigungen erschwert war, wurde auf den **OAuth 2.0 Refresh Token Flow** ("Plan B") gewechselt.
* **Vorteil:** Einfacher einzurichten, Token ist "ewig" gültig (solange er genutzt wird).
* **Nachteil:** Muss einmalig manuell via Browser generiert werden (bereits erledigt).
## 5. Nächste Schritte
1. **Felder anlegen:** In SuperOffice Admin -> Felder -> Firma die UDFs (`AI Robotics Potential`, `AI Summary` etc.) anlegen.
2. **Mapping aktualisieren:** Die technischen Namen (`ProgId`) in `superoffice_client.py` eintragen.
3. **Schreib-Test:** `main.py` ausführen, um Daten zurückzuschreiben.

View File

@@ -1,121 +1,60 @@
import os
import requests
import logging
import time
import requests
logger = logging.getLogger(__name__)
class AuthHandler:
"""
Handles authentication against the SuperOffice API using an initial Authorization Code
exchange and subsequent Refresh Token flow.
"""
def __init__(self):
self.client_id = os.getenv("SO_SOD")
# Load configuration from environment
self.client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
self.client_secret = os.getenv("SO_CLIENT_SECRET")
self.refresh_token = os.getenv("SO_REFRESH_TOKEN") # Will be set after initial code exchange
self.refresh_token = os.getenv("SO_REFRESH_TOKEN")
self.tenant_id = os.getenv("SO_CONTEXT_IDENTIFIER") # e.g., Cust55774
# Token endpoint for both initial code exchange and refresh token flow
self.token_endpoint = "https://sod.superoffice.com/login/common/oauth/tokens"
# OAuth Token Endpoint for SOD
self.token_url = "https://sod.superoffice.com/login/common/oauth/tokens"
self._access_token = None
self._token_expiry_time = 0
self._webapi_url = None
self._expiry = 0
if not self.client_id or not self.client_secret:
raise ValueError("SO_SOD and SO_CLIENT_SECRET must be set in the environment.")
def get_ticket(self):
if self._access_token and time.time() < self._expiry:
return self._access_token, self._webapi_url
return self.refresh_access_token()
def _is_token_valid(self):
"""Check if the current access token is still valid."""
return self._access_token and time.time() < self._token_expiry_time
def exchange_code_for_tokens(self, auth_code: str, redirect_uri: str):
"""
Exchanges an Authorization Code for an access token and a refresh token.
This is a one-time operation.
"""
logger.info("Exchanging Authorization Code for tokens...")
def refresh_access_token(self):
logger.info(f"Refreshing Access Token for Client ID: {self.client_id[:5]}...")
payload = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': auth_code,
'redirect_uri': redirect_uri
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
}
try:
response = requests.post(self.token_endpoint, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
resp = requests.post(self.token_url, data=payload, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
self._access_token = token_data['access_token']
self.refresh_token = token_data['refresh_token'] # Store the refresh token
self._token_expiry_time = time.time() + token_data['expires_in'] - 60
self._access_token = data.get("access_token")
# Based on user's browser URL
self._webapi_url = f"https://app-sod.superoffice.com/{self.tenant_id}"
logger.info("Successfully exchanged code for tokens.")
return token_data
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error during code exchange: {http_err} - {response.text}")
self._expiry = time.time() + int(data.get("expires_in", 3600)) - 60
logger.info("Successfully refreshed Access Token.")
return self._access_token, self._webapi_url
except Exception as e:
logger.error(f"Error refreshing access token: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
raise
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error during code exchange: {req_err}")
raise
except KeyError:
logger.error(f"Unexpected response format from token endpoint during code exchange: {response.text}")
raise ValueError("Could not parse tokens from code exchange response.")
def get_access_token(self):
"""
Retrieves an access token. If a valid token exists, it's returned from cache.
Otherwise, a new one is requested using the refresh token.
"""
if self._is_token_valid():
logger.info("Returning cached access token.")
return self._access_token
if not self.refresh_token:
raise ValueError("Refresh token is not available. Please perform initial code exchange.")
logger.info(f"Requesting new access token using refresh token from {self.token_endpoint}...")
payload = {
'grant_type': 'refresh_token',
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': self.refresh_token
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
try:
response = requests.post(self.token_endpoint, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
self._access_token = token_data['access_token']
# Update refresh token if it's renewed (optional, but good practice)
if 'refresh_token' in token_data:
self.refresh_token = token_data['refresh_token']
logger.info("Refresh token was renewed.")
self._token_expiry_time = time.time() + token_data['expires_in'] - 60
logger.info("Successfully obtained new access token via refresh token.")
return self._access_token
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error occurred during refresh token use: {http_err} - {response.text}")
raise
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error occurred during refresh token use: {req_err}")
raise
except KeyError:
logger.error(f"Unexpected response format from token endpoint: {response.text}")
raise ValueError("Could not parse access token from refresh token response.")

View File

@@ -0,0 +1,36 @@
import os
import logging
import json
from dotenv import load_dotenv
from auth_handler import AuthHandler
from superoffice_client import SuperOfficeClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def discover_fields():
load_dotenv(dotenv_path="../.env")
auth = AuthHandler()
client = SuperOfficeClient(auth)
logger.info("Fetching metadata to discover UDF fields...")
# Get metadata for Contact (Company)
url = client._get_url("v1/Metadata/Contact/UserDefinedFields")
try:
resp = client.session.get(url, headers=client._get_headers())
resp.raise_for_status()
fields = resp.json()
print("\n--- AVAILABLE UDF FIELDS ---")
for field in fields.get("value", []):
print(f"Label: {field.get('FieldLabel')} -> Technical Name: {field.get('ProgId')} (Type: {field.get('FieldType')})")
except Exception as e:
logger.error(f"Failed to fetch metadata: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Details: {e.response.text}")
if __name__ == "__main__":
discover_fields()

View File

@@ -0,0 +1,34 @@
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
# Generate private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Serialize private key to PEM
pem_private = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Generate public key
public_key = private_key.public_key()
pem_public = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Save to files
with open("private_key.pem", "wb") as f:
f.write(pem_private)
with open("public_key.pem", "wb") as f:
f.write(pem_public)
print("Keys generated successfully!")
print(f"Private Key (save to .env as SO_PRIVATE_KEY):\n{pem_private.decode('utf-8')}")
print("-" * 20)
print(f"Public Key (upload to SuperOffice Dev Portal):\n{pem_public.decode('utf-8')}")

View File

@@ -0,0 +1,71 @@
import os
import requests
import json
from dotenv import load_dotenv
# Load config
load_dotenv(dotenv_path="../.env")
client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
client_secret = os.getenv("SO_CLIENT_SECRET")
redirect_uri = "http://localhost" # Ensure this is in "Allowed Redirect URLs" in Dev Portal
if not client_id or not client_secret:
print("Error: Please set SO_CLIENT_ID (or SO_SOD) and SO_CLIENT_SECRET in .env first.")
exit(1)
# Step 1: Generate Authorization URL
auth_url = (
f"https://sod.superoffice.com/login/common/oauth/authorize"
f"?client_id={client_id}"
f"&redirect_uri={redirect_uri}"
f"&response_type=code"
f"&scope=openid" # Scope might be needed, try without first
)
print(f"\n--- STEP 1: Authorization ---")
print(f"Please open this URL in your browser:\n\n{auth_url}\n")
print("1. Log in to your test tenant (Cust55774).")
print("2. Click 'Allow' / 'Zulassen'.")
print("3. You will be redirected to a localhost URL (it might fail to load, that's fine).")
print("4. Copy the full URL from your browser's address bar and paste it here.")
redirected_url = input("\nPaste the full redirect URL here: ").strip()
# Extract code
try:
from urllib.parse import urlparse, parse_qs
parsed = urlparse(redirected_url)
code = parse_qs(parsed.query)['code'][0]
except Exception as e:
print(f"Error extracting code: {e}")
exit(1)
# Step 2: Exchange Code for Tokens
print(f"\n--- STEP 2: Token Exchange ---")
token_url = "https://sod.superoffice.com/login/common/oauth/tokens"
payload = {
"grant_type": "authorization_code",
"client_id": client_id,
"client_secret": client_secret,
"code": code,
"redirect_uri": redirect_uri
}
try:
resp = requests.post(token_url, data=payload)
resp.raise_for_status()
tokens = resp.json()
refresh_token = tokens.get("refresh_token")
access_token = tokens.get("access_token")
print("\nSUCCESS! Here are your tokens:")
print(f"\nSO_REFRESH_TOKEN=\"{refresh_token}\"\n")
print(f"\n(Access Token for testing: {access_token[:20]}...)")
print("\nAction: Please update your .env file with the SO_REFRESH_TOKEN value above!")
except Exception as e:
print(f"Error exchanging code: {e}")
if hasattr(e, 'response') and e.response:
print(f"Response: {e.response.text}")

View File

@@ -0,0 +1,45 @@
import os
import logging
import json
from dotenv import load_dotenv
from auth_handler import AuthHandler
from superoffice_client import SuperOfficeClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def discover_contact_structure():
load_dotenv(dotenv_path="../.env")
auth = AuthHandler()
client = SuperOfficeClient(auth)
logger.info("Fetching a single contact to inspect structure...")
# Try to get the first contact (usually ID 1 exists or can be found)
# If not, we try to create a dummy and then inspect it.
url = client._get_url("v1/Contact/1")
try:
resp = client.session.get(url, headers=client._get_headers())
if resp.status_code == 200:
contact = resp.json()
print("\n--- CONTACT STRUCTURE ---")
print(json.dumps(contact, indent=2))
else:
logger.warning(f"Contact 1 not found (Status {resp.status_code}). Trying to list contacts...")
url = client._get_url("v1/Contact?$top=1")
resp = client.session.get(url, headers=client._get_headers())
resp.raise_for_status()
contacts = resp.json().get("value", [])
if contacts:
print("\n--- CONTACT STRUCTURE (from list) ---")
print(json.dumps(contacts[0], indent=2))
else:
print("\nNo contacts found in tenant. Please create one manually in the UI or stay tuned.")
except Exception as e:
logger.error(f"Failed to fetch contact: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Details: {e.response.text}")
if __name__ == "__main__":
discover_contact_structure()

View File

@@ -1,72 +1,59 @@
import os
import logging
from dotenv import load_dotenv
from auth_handler import AuthHandler
from superoffice_client import SuperOfficeClient
# Import the new classes
from .auth_handler import AuthHandler
from .superoffice_client import SuperOfficeClient
# Configure logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
"""
Main function to initialize and run the SuperOffice connector.
"""
logger.info("Starting SuperOffice Connector Proof of Concept...")
# Load .env from the root directory
load_dotenv(dotenv_path="../.env")
# Load environment variables from the root .env file
dotenv_path = os.path.join(os.path.dirname(__file__), '..', '.env')
logger.info(f"Attempting to load .env file from: {dotenv_path}")
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path=dotenv_path)
logger.info(".env file loaded successfully.")
else:
logger.error(f".env file not found at the expected location: {dotenv_path}")
logger.error("Please ensure the .env file exists in the project root directory.")
return
logger.info("Starting SuperOffice Connector S2S POC...")
try:
# 1. Initialize AuthHandler
auth_handler = AuthHandler()
# Initialize Auth
auth = AuthHandler()
# Define the tenant ID
TENANT_ID = "Cust26720" # As provided in the initial task description
# Initialize Client
client = SuperOfficeClient(auth)
# 2. Get a fresh access token (this will also update the refresh token if needed)
full_access_token = auth_handler.get_access_token()
logger.info(f"Full Access Token for curl: {full_access_token}")
# 3. Initialize SuperOfficeClient with the tenant ID
so_client = SuperOfficeClient(auth_handler, TENANT_ID)
# 3. Perform test connection
logger.info("--- Performing Connection Test ---")
connection_result = so_client.test_connection()
logger.info("--- Connection Test Finished ---")
if connection_result:
logger.info("POC Succeeded: Connection to SuperOffice API was successful.")
# 1. Test Connection
logger.info("Step 1: Testing connection...")
user_info = client.test_connection()
if user_info:
logger.info(f"Connected successfully as: {user_info.get('Name')}")
else:
logger.error("POC Failed: Could not establish a connection to SuperOffice API.")
logger.error("Connection test failed.")
return
# 2. Search for a test contact
test_company = "Wackler Holding"
logger.info(f"Step 2: Searching for company '{test_company}'...")
contact = client.find_contact_by_criteria(name=test_company)
if contact:
logger.info(f"Found contact: {contact.get('Name')} (ID: {contact.get('ContactId')})")
# 3. Try to update UDFs (Warning: technical names might be wrong)
# logger.info("Step 3: Attempting UDF update (experimental)...")
# ai_test_data = {
# "potential": "High",
# "industry": "Facility Management",
# "summary": "KI-Analyse erfolgreich durchgeführt."
# }
# client.update_udfs(contact.get('ContactId'), ai_test_data)
else:
logger.info(f"Contact '{test_company}' not found.")
except ValueError as ve:
logger.error(f"Configuration Error: {ve}")
logger.error("POC Failed due to missing configuration.")
except Exception as e:
logger.error(f"An unexpected error occurred during the POC execution: {e}", exc_info=True)
logger.error("POC Failed due to an unexpected error.")
logger.info("SuperOffice Connector POC finished.")
logger.error(f"An error occurred: {e}", exc_info=True)
if __name__ == "__main__":
main()

View File

@@ -1,2 +1,5 @@
requests
python-dotenv
cryptography
pyjwt
xmltodict

View File

@@ -1,55 +1,91 @@
import requests
import logging
from .auth_handler import AuthHandler
from auth_handler import AuthHandler
logger = logging.getLogger(__name__)
class SuperOfficeClient:
"""
A client for interacting with the SuperOffice API.
A client for interacting with the SuperOffice REST API using OAuth 2.0 Bearer tokens.
"""
def __init__(self, auth_handler: AuthHandler, tenant_id: str):
# Base URL for the SuperOffice REST API, including tenant_id
self.base_url = f"https://sod.superoffice.com/{tenant_id}/api"
def __init__(self, auth_handler: AuthHandler):
self.auth_handler = auth_handler
self.tenant_id = tenant_id
self.session = requests.Session()
def _get_auth_headers(self):
"""Returns the authorization headers with a valid token and context."""
access_token = self.auth_handler.get_access_token()
# Mapping for UDF fields (These are typical technical names, but might need adjustment)
self.udf_mapping = {
"robotics_potential": "x_robotics_potential",
"industry": "x_ai_industry",
"summary": "x_ai_summary",
"last_update": "x_ai_last_update",
"status": "x_ai_status"
}
# Mapping for list values (Explorer -> SO ID)
self.potential_id_map = {
"High": 1,
"Medium": 2,
"Low": 3,
"None": 4
}
def _get_headers(self):
"""Returns the authorization headers with Bearer token."""
access_token, _ = self.auth_handler.get_ticket()
return {
'Authorization': f'Bearer {access_token}',
'Accept': 'application/json',
'X-SuperOffice-Context': f'TenantId={self.tenant_id}' # Crucial for multi-tenant environments
'Content-Type': 'application/json'
}
def test_connection(self):
"""
Performs a simple API call to test the connection and authentication.
Fetches the current user principal.
"""
endpoint = "/v1/User/currentPrincipal"
test_url = f"{self.base_url}{endpoint}"
def _get_url(self, path):
"""Constructs the full URL for a given API path."""
_, webapi_url = self.auth_handler.get_ticket()
base = webapi_url.rstrip('/')
p = path.lstrip('/')
return f"{base}/api/{p}"
logger.info(f"Attempting to test connection to: {test_url}")
def test_connection(self):
"""Tests the connection by fetching the current user."""
url = self._get_url("v1/User/currentPrincipal")
try:
resp = self.session.get(url, headers=self._get_headers())
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Connection test failed: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def find_contact_by_criteria(self, name=None, url=None, org_nr=None):
"""Searches for a contact by OrgNr, URL, or Name."""
filters = []
if org_nr:
filters.append(f"orgNr eq '{org_nr}'")
if url:
filters.append(f"urlAddress eq '{url}'")
if not filters and name:
filters.append(f"name contains '{name}'")
if not filters:
return None
query = " and ".join(filters)
path = f"v1/Contact?$filter={query}"
try:
headers = self._get_auth_headers()
response = self.session.get(test_url, headers=headers)
response.raise_for_status()
full_url = self._get_url(path)
resp = self.session.get(full_url, headers=self._get_headers())
resp.raise_for_status()
data = resp.json()
user_data = response.json()
logger.info("Successfully connected to SuperOffice API.")
logger.info(f"Authenticated as: {user_data.get('Name', 'N/A')} ({user_data.get('Associate', 'N/A')})")
return user_data
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error during connection test: {http_err} - {http_err.response.text}")
return None
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error during connection test: {req_err}")
results = data.get("value", [])
if results:
logger.info(f"Found {len(results)} matching contacts.")
return results[0]
return None
except Exception as e:
logger.error(f"An unexpected error occurred during connection test: {e}")
logger.error(f"Error searching for contact: {e}")
return None

View File

@@ -0,0 +1,47 @@
import base64
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
# The XML data provided by the user
modulus_b64 = "3PlhZKih1S9AKmsOcnPuS6FfPyYdKg6ltCUypt4EOi2++oM5O26YFxODBtQHO+UmsEoNcz6X2A5BE9kv4y8Xyv+hDxQrHsyavrkq2Yn5Mf/BFAquYuRoX5FtvH6ht+yllfBJQs3wE9m/O8LKHomKE5HXiaV/QMDRLoYeAwzQwcE="
exponent_b64 = "AQAB"
d_b64 = "i8TdWprjSgHKF0qB59j2WDYpFbtY5RpAq3J/2FZD3DzFOJU55SKt5qK71NzV+oeV8hnU6hkkWE+j0BcnGA7Yf6xGIoVNVhrenU18hrd6vSUPDeOuerkv+u98pNEqs6jcfYwhKKEJ2nFl4AacdQ7RaQPEWb41pVYvP+qaX6PeQAE="
p_b64 = "8fGRi846fRCbc8oaUGnw1dR2BXOopzxfAMeKEOCUeRP/Yj1kUcW9k4zUeaFc2upnfAeUbX38Bk5VW5edCDIjAQ=="
q_b64 = "6c/usvg8/4quH8Z70tSotmN+N6UxiuaTF51oOeTnIVUjXMqB3gc5sRCbipGj1u+DJUYh4LQLZp+W2LU7uCpewQ=="
dp_b64 = "y2q8YVwh5tbYrHCm0SdRWqcIF6tXiEwE4EXkOi5oBqielr1hJDNqIa1NU3os9M4R9cD1tV0wUSj5MUn2uFZXAQ=="
dq_b64 = "yc9+8Z0QUWVrC+QvBngls1/HFtKQI5sHRS/JQYdQ9FVfM31bgL/tzOZPytgQebm8EdUp8qCU4pxHAH/Vrw1rQQ=="
inverse_q_b64 = "VX4SRxVQ130enAqw9M0Nyl+875vmhc6cbsJQQ3E/fJjQvkB8EgjxBp6JVTeY1U5ga56Hvzngomk335pA6gli0A=="
def b64_to_int(b64_str):
return int.from_bytes(base64.b64decode(b64_str), byteorder='big')
# Convert components to integers
n = b64_to_int(modulus_b64)
e = b64_to_int(exponent_b64)
d = b64_to_int(d_b64)
p = b64_to_int(p_b64)
q = b64_to_int(q_b64)
dmp1 = b64_to_int(dp_b64)
dmq1 = b64_to_int(dq_b64)
iqmp = b64_to_int(inverse_q_b64)
# Reconstruct the private key object
private_key = rsa.RSAPrivateNumbers(
p=p,
q=q,
d=d,
dmp1=dmp1,
dmq1=dmq1,
iqmp=iqmp,
public_numbers=rsa.RSAPublicNumbers(e, n)
).private_key()
# Serialize to PEM
pem_private = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Print for the user
print(pem_private.decode('utf-8'))