diff --git a/connector-superoffice/README.md b/connector-superoffice/README.md new file mode 100644 index 00000000..ab5f77ae --- /dev/null +++ b/connector-superoffice/README.md @@ -0,0 +1,75 @@ +# SuperOffice Connector (POC) + +Dies ist der Microservice zur Anbindung von **SuperOffice CRM** (SOD / Cloud) an die **GTM-Engine**. +Der Connector nutzt die **REST API (v1)** und authentifiziert sich via **OAuth 2.0** (Refresh Token Flow). + +## 1. Status Quo (Februar 2026) + +Der **Proof of Concept (POC)** ist erfolgreich abgeschlossen. +- **Verbindung:** Erfolgreich hergestellt (`Cust55774`). +- **Authentifizierung:** Funktioniert automatisch via Refresh Token. +- **Datenabruf:** Stammdaten (Contact ID 1) können gelesen werden. +- **Daten schreiben:** Vorbereitet (Code ist fertig), erfordert jedoch noch das Anlegen der UDF-Felder im SuperOffice Admin-Panel. + +## 2. Einrichtung & Installation + +### Voraussetzungen +* Python 3.11+ +* Zugriff auf die `.env` Datei im Root-Verzeichnis (siehe unten). + +### Installation +```bash +cd connector-superoffice +../.venv/bin/pip install -r requirements.txt +``` + +### Konfiguration (.env) +Die Authentifizierung erfordert folgende Variablen in der zentralen `.env`: + +```ini +# SuperOffice Client Configuration +SO_SOD="" # Aus dem Developer Portal +SO_CLIENT_SECRET="" # Aus dem Developer Portal +SO_CONTEXT_IDENTIFIER="CustXXXXX" # Deine Tenant ID (z.B. Cust55774) + +# Authentication Token +SO_REFRESH_TOKEN="" + +# (Optional / Legacy S2S Configuration - aktuell nicht genutzt) +# SO_PRIVATE_KEY="..." +# SO_SYSTEM_USER_TOKEN="..." +``` + +## 3. Nutzung + +### Verbindungstest (Main Script) +Führt einen Login durch und sucht nach einer Test-Firma. + +```bash +cd connector-superoffice +../.venv/bin/python main.py +``` + +### Felder entdecken +Listet alle verfügbaren Felder (inkl. UDFs) auf, um die technischen Namen (`ProgId`) für das Mapping zu finden. + +```bash +../.venv/bin/python discover_fields.py +``` + +## 4. Technische Details & "Gotchas" + +### Subdomain-Auflösung (`app-sod`) +Eine wichtige Erkenntnis des POCs war, dass die Standard-API-URL `sod.superoffice.com` nicht für alle Tenants funktioniert. +Der `AuthHandler` wurde so angepasst, dass er (für diesen Tenant) **`https://app-sod.superoffice.com`** als Basis-URL erzwingt. +* *Code-Stelle:* `auth_handler.py` -> `refresh_access_token` + +### OAuth vs. System User +Ursprünglich war ein "Server-to-Server" (S2S) Flow mittels RSA-Zertifikaten geplant. Da der Zugriff auf den System-User-Token im SOD-Tenant `Cust55774` aufgrund von UI-Änderungen und Berechtigungen erschwert war, wurde auf den **OAuth 2.0 Refresh Token Flow** ("Plan B") gewechselt. +* **Vorteil:** Einfacher einzurichten, Token ist "ewig" gültig (solange er genutzt wird). +* **Nachteil:** Muss einmalig manuell via Browser generiert werden (bereits erledigt). + +## 5. Nächste Schritte +1. **Felder anlegen:** In SuperOffice Admin -> Felder -> Firma die UDFs (`AI Robotics Potential`, `AI Summary` etc.) anlegen. +2. **Mapping aktualisieren:** Die technischen Namen (`ProgId`) in `superoffice_client.py` eintragen. +3. **Schreib-Test:** `main.py` ausführen, um Daten zurückzuschreiben. diff --git a/connector-superoffice/auth_handler.py b/connector-superoffice/auth_handler.py index a33f0b7f..e20bb4a5 100644 --- a/connector-superoffice/auth_handler.py +++ b/connector-superoffice/auth_handler.py @@ -1,121 +1,60 @@ import os -import requests import logging import time +import requests logger = logging.getLogger(__name__) class AuthHandler: - """ - Handles authentication against the SuperOffice API using an initial Authorization Code - exchange and subsequent Refresh Token flow. - """ def __init__(self): - self.client_id = os.getenv("SO_SOD") + # Load configuration from environment + self.client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD") self.client_secret = os.getenv("SO_CLIENT_SECRET") - self.refresh_token = os.getenv("SO_REFRESH_TOKEN") # Will be set after initial code exchange - - # Token endpoint for both initial code exchange and refresh token flow - self.token_endpoint = "https://sod.superoffice.com/login/common/oauth/tokens" + self.refresh_token = os.getenv("SO_REFRESH_TOKEN") + self.tenant_id = os.getenv("SO_CONTEXT_IDENTIFIER") # e.g., Cust55774 + + # OAuth Token Endpoint for SOD + self.token_url = "https://sod.superoffice.com/login/common/oauth/tokens" self._access_token = None - self._token_expiry_time = 0 + self._webapi_url = None + self._expiry = 0 - if not self.client_id or not self.client_secret: - raise ValueError("SO_SOD and SO_CLIENT_SECRET must be set in the environment.") + def get_ticket(self): + if self._access_token and time.time() < self._expiry: + return self._access_token, self._webapi_url + return self.refresh_access_token() - def _is_token_valid(self): - """Check if the current access token is still valid.""" - return self._access_token and time.time() < self._token_expiry_time - - def exchange_code_for_tokens(self, auth_code: str, redirect_uri: str): - """ - Exchanges an Authorization Code for an access token and a refresh token. - This is a one-time operation. - """ - logger.info("Exchanging Authorization Code for tokens...") - - payload = { - 'grant_type': 'authorization_code', - 'client_id': self.client_id, - 'client_secret': self.client_secret, - 'code': auth_code, - 'redirect_uri': redirect_uri - } - - headers = { - 'Content-Type': 'application/x-www-form-urlencoded', - 'Accept': 'application/json' - } - - try: - response = requests.post(self.token_endpoint, data=payload, headers=headers) - response.raise_for_status() - token_data = response.json() - - self._access_token = token_data['access_token'] - self.refresh_token = token_data['refresh_token'] # Store the refresh token - self._token_expiry_time = time.time() + token_data['expires_in'] - 60 - - logger.info("Successfully exchanged code for tokens.") - return token_data - except requests.exceptions.HTTPError as http_err: - logger.error(f"HTTP error during code exchange: {http_err} - {response.text}") - raise - except requests.exceptions.RequestException as req_err: - logger.error(f"Request error during code exchange: {req_err}") - raise - except KeyError: - logger.error(f"Unexpected response format from token endpoint during code exchange: {response.text}") - raise ValueError("Could not parse tokens from code exchange response.") - - def get_access_token(self): - """ - Retrieves an access token. If a valid token exists, it's returned from cache. - Otherwise, a new one is requested using the refresh token. - """ - if self._is_token_valid(): - logger.info("Returning cached access token.") - return self._access_token - - if not self.refresh_token: - raise ValueError("Refresh token is not available. Please perform initial code exchange.") - - logger.info(f"Requesting new access token using refresh token from {self.token_endpoint}...") + def refresh_access_token(self): + logger.info(f"Refreshing Access Token for Client ID: {self.client_id[:5]}...") payload = { - 'grant_type': 'refresh_token', - 'client_id': self.client_id, - 'client_secret': self.client_secret, - 'refresh_token': self.refresh_token + "grant_type": "refresh_token", + "client_id": self.client_id, + "client_secret": self.client_secret, + "refresh_token": self.refresh_token } headers = { - 'Content-Type': 'application/x-www-form-urlencoded', - 'Accept': 'application/json' + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json" } try: - response = requests.post(self.token_endpoint, data=payload, headers=headers) - response.raise_for_status() + resp = requests.post(self.token_url, data=payload, headers=headers, timeout=30) + resp.raise_for_status() + data = resp.json() - token_data = response.json() - self._access_token = token_data['access_token'] - # Update refresh token if it's renewed (optional, but good practice) - if 'refresh_token' in token_data: - self.refresh_token = token_data['refresh_token'] - logger.info("Refresh token was renewed.") - self._token_expiry_time = time.time() + token_data['expires_in'] - 60 + self._access_token = data.get("access_token") + # Based on user's browser URL + self._webapi_url = f"https://app-sod.superoffice.com/{self.tenant_id}" - logger.info("Successfully obtained new access token via refresh token.") - return self._access_token + self._expiry = time.time() + int(data.get("expires_in", 3600)) - 60 + logger.info("Successfully refreshed Access Token.") + return self._access_token, self._webapi_url - except requests.exceptions.HTTPError as http_err: - logger.error(f"HTTP error occurred during refresh token use: {http_err} - {response.text}") - raise - except requests.exceptions.RequestException as req_err: - logger.error(f"Request error occurred during refresh token use: {req_err}") - raise - except KeyError: - logger.error(f"Unexpected response format from token endpoint: {response.text}") - raise ValueError("Could not parse access token from refresh token response.") + except Exception as e: + logger.error(f"Error refreshing access token: {e}") + if hasattr(e, 'response') and e.response is not None: + logger.error(f"Response: {e.response.text}") + raise \ No newline at end of file diff --git a/connector-superoffice/discover_fields.py b/connector-superoffice/discover_fields.py new file mode 100644 index 00000000..cabd807e --- /dev/null +++ b/connector-superoffice/discover_fields.py @@ -0,0 +1,36 @@ +import os +import logging +import json +from dotenv import load_dotenv +from auth_handler import AuthHandler +from superoffice_client import SuperOfficeClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def discover_fields(): + load_dotenv(dotenv_path="../.env") + auth = AuthHandler() + client = SuperOfficeClient(auth) + + logger.info("Fetching metadata to discover UDF fields...") + + # Get metadata for Contact (Company) + url = client._get_url("v1/Metadata/Contact/UserDefinedFields") + try: + resp = client.session.get(url, headers=client._get_headers()) + resp.raise_for_status() + fields = resp.json() + + print("\n--- AVAILABLE UDF FIELDS ---") + for field in fields.get("value", []): + print(f"Label: {field.get('FieldLabel')} -> Technical Name: {field.get('ProgId')} (Type: {field.get('FieldType')})") + + except Exception as e: + logger.error(f"Failed to fetch metadata: {e}") + if hasattr(e, 'response') and e.response is not None: + print(f"Details: {e.response.text}") + +if __name__ == "__main__": + discover_fields() + diff --git a/connector-superoffice/generate_keys.py b/connector-superoffice/generate_keys.py new file mode 100644 index 00000000..d5cf014c --- /dev/null +++ b/connector-superoffice/generate_keys.py @@ -0,0 +1,34 @@ +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + +# Generate private key +private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, +) + +# Serialize private key to PEM +pem_private = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() +) + +# Generate public key +public_key = private_key.public_key() +pem_public = public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo +) + +# Save to files +with open("private_key.pem", "wb") as f: + f.write(pem_private) + +with open("public_key.pem", "wb") as f: + f.write(pem_public) + +print("Keys generated successfully!") +print(f"Private Key (save to .env as SO_PRIVATE_KEY):\n{pem_private.decode('utf-8')}") +print("-" * 20) +print(f"Public Key (upload to SuperOffice Dev Portal):\n{pem_public.decode('utf-8')}") diff --git a/connector-superoffice/get_refresh_token.py b/connector-superoffice/get_refresh_token.py new file mode 100644 index 00000000..29794483 --- /dev/null +++ b/connector-superoffice/get_refresh_token.py @@ -0,0 +1,71 @@ +import os +import requests +import json +from dotenv import load_dotenv + +# Load config +load_dotenv(dotenv_path="../.env") +client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD") +client_secret = os.getenv("SO_CLIENT_SECRET") +redirect_uri = "http://localhost" # Ensure this is in "Allowed Redirect URLs" in Dev Portal + +if not client_id or not client_secret: + print("Error: Please set SO_CLIENT_ID (or SO_SOD) and SO_CLIENT_SECRET in .env first.") + exit(1) + +# Step 1: Generate Authorization URL +auth_url = ( + f"https://sod.superoffice.com/login/common/oauth/authorize" + f"?client_id={client_id}" + f"&redirect_uri={redirect_uri}" + f"&response_type=code" + f"&scope=openid" # Scope might be needed, try without first +) + +print(f"\n--- STEP 1: Authorization ---") +print(f"Please open this URL in your browser:\n\n{auth_url}\n") +print("1. Log in to your test tenant (Cust55774).") +print("2. Click 'Allow' / 'Zulassen'.") +print("3. You will be redirected to a localhost URL (it might fail to load, that's fine).") +print("4. Copy the full URL from your browser's address bar and paste it here.") + +redirected_url = input("\nPaste the full redirect URL here: ").strip() + +# Extract code +try: + from urllib.parse import urlparse, parse_qs + parsed = urlparse(redirected_url) + code = parse_qs(parsed.query)['code'][0] +except Exception as e: + print(f"Error extracting code: {e}") + exit(1) + +# Step 2: Exchange Code for Tokens +print(f"\n--- STEP 2: Token Exchange ---") +token_url = "https://sod.superoffice.com/login/common/oauth/tokens" +payload = { + "grant_type": "authorization_code", + "client_id": client_id, + "client_secret": client_secret, + "code": code, + "redirect_uri": redirect_uri +} + +try: + resp = requests.post(token_url, data=payload) + resp.raise_for_status() + tokens = resp.json() + + refresh_token = tokens.get("refresh_token") + access_token = tokens.get("access_token") + + print("\nSUCCESS! Here are your tokens:") + print(f"\nSO_REFRESH_TOKEN=\"{refresh_token}\"\n") + print(f"\n(Access Token for testing: {access_token[:20]}...)") + + print("\nAction: Please update your .env file with the SO_REFRESH_TOKEN value above!") + +except Exception as e: + print(f"Error exchanging code: {e}") + if hasattr(e, 'response') and e.response: + print(f"Response: {e.response.text}") diff --git a/connector-superoffice/inspect_contact.py b/connector-superoffice/inspect_contact.py new file mode 100644 index 00000000..db956136 --- /dev/null +++ b/connector-superoffice/inspect_contact.py @@ -0,0 +1,45 @@ +import os +import logging +import json +from dotenv import load_dotenv +from auth_handler import AuthHandler +from superoffice_client import SuperOfficeClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def discover_contact_structure(): + load_dotenv(dotenv_path="../.env") + auth = AuthHandler() + client = SuperOfficeClient(auth) + + logger.info("Fetching a single contact to inspect structure...") + + # Try to get the first contact (usually ID 1 exists or can be found) + # If not, we try to create a dummy and then inspect it. + url = client._get_url("v1/Contact/1") + try: + resp = client.session.get(url, headers=client._get_headers()) + if resp.status_code == 200: + contact = resp.json() + print("\n--- CONTACT STRUCTURE ---") + print(json.dumps(contact, indent=2)) + else: + logger.warning(f"Contact 1 not found (Status {resp.status_code}). Trying to list contacts...") + url = client._get_url("v1/Contact?$top=1") + resp = client.session.get(url, headers=client._get_headers()) + resp.raise_for_status() + contacts = resp.json().get("value", []) + if contacts: + print("\n--- CONTACT STRUCTURE (from list) ---") + print(json.dumps(contacts[0], indent=2)) + else: + print("\nNo contacts found in tenant. Please create one manually in the UI or stay tuned.") + + except Exception as e: + logger.error(f"Failed to fetch contact: {e}") + if hasattr(e, 'response') and e.response is not None: + print(f"Details: {e.response.text}") + +if __name__ == "__main__": + discover_contact_structure() diff --git a/connector-superoffice/main.py b/connector-superoffice/main.py index 5f9d3c40..62c04f44 100644 --- a/connector-superoffice/main.py +++ b/connector-superoffice/main.py @@ -1,72 +1,59 @@ import os import logging from dotenv import load_dotenv +from auth_handler import AuthHandler +from superoffice_client import SuperOfficeClient -# Import the new classes -from .auth_handler import AuthHandler -from .superoffice_client import SuperOfficeClient - -# Configure logging +# Setup logging logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler() - ] + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def main(): - """ - Main function to initialize and run the SuperOffice connector. - """ - logger.info("Starting SuperOffice Connector Proof of Concept...") - - # Load environment variables from the root .env file - dotenv_path = os.path.join(os.path.dirname(__file__), '..', '.env') - logger.info(f"Attempting to load .env file from: {dotenv_path}") + # Load .env from the root directory + load_dotenv(dotenv_path="../.env") + + logger.info("Starting SuperOffice Connector S2S POC...") - if os.path.exists(dotenv_path): - load_dotenv(dotenv_path=dotenv_path) - logger.info(".env file loaded successfully.") - else: - logger.error(f".env file not found at the expected location: {dotenv_path}") - logger.error("Please ensure the .env file exists in the project root directory.") - return - try: - # 1. Initialize AuthHandler - auth_handler = AuthHandler() - - # Define the tenant ID - TENANT_ID = "Cust26720" # As provided in the initial task description - - # 2. Get a fresh access token (this will also update the refresh token if needed) - full_access_token = auth_handler.get_access_token() - logger.info(f"Full Access Token for curl: {full_access_token}") - - # 3. Initialize SuperOfficeClient with the tenant ID - so_client = SuperOfficeClient(auth_handler, TENANT_ID) - - # 3. Perform test connection - logger.info("--- Performing Connection Test ---") - connection_result = so_client.test_connection() - logger.info("--- Connection Test Finished ---") - - if connection_result: - logger.info("POC Succeeded: Connection to SuperOffice API was successful.") + # Initialize Auth + auth = AuthHandler() + + # Initialize Client + client = SuperOfficeClient(auth) + + # 1. Test Connection + logger.info("Step 1: Testing connection...") + user_info = client.test_connection() + if user_info: + logger.info(f"Connected successfully as: {user_info.get('Name')}") else: - logger.error("POC Failed: Could not establish a connection to SuperOffice API.") + logger.error("Connection test failed.") + return + + # 2. Search for a test contact + test_company = "Wackler Holding" + logger.info(f"Step 2: Searching for company '{test_company}'...") + contact = client.find_contact_by_criteria(name=test_company) + + if contact: + logger.info(f"Found contact: {contact.get('Name')} (ID: {contact.get('ContactId')})") + + # 3. Try to update UDFs (Warning: technical names might be wrong) + # logger.info("Step 3: Attempting UDF update (experimental)...") + # ai_test_data = { + # "potential": "High", + # "industry": "Facility Management", + # "summary": "KI-Analyse erfolgreich durchgeführt." + # } + # client.update_udfs(contact.get('ContactId'), ai_test_data) + else: + logger.info(f"Contact '{test_company}' not found.") - except ValueError as ve: - logger.error(f"Configuration Error: {ve}") - logger.error("POC Failed due to missing configuration.") except Exception as e: - logger.error(f"An unexpected error occurred during the POC execution: {e}", exc_info=True) - logger.error("POC Failed due to an unexpected error.") - - - logger.info("SuperOffice Connector POC finished.") + logger.error(f"An error occurred: {e}", exc_info=True) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/connector-superoffice/requirements.txt b/connector-superoffice/requirements.txt index df7458c2..4c725708 100644 --- a/connector-superoffice/requirements.txt +++ b/connector-superoffice/requirements.txt @@ -1,2 +1,5 @@ requests python-dotenv +cryptography +pyjwt +xmltodict \ No newline at end of file diff --git a/connector-superoffice/superoffice_client.py b/connector-superoffice/superoffice_client.py index 14655fd1..c7076804 100644 --- a/connector-superoffice/superoffice_client.py +++ b/connector-superoffice/superoffice_client.py @@ -1,55 +1,91 @@ import requests import logging -from .auth_handler import AuthHandler +from auth_handler import AuthHandler logger = logging.getLogger(__name__) class SuperOfficeClient: """ - A client for interacting with the SuperOffice API. + A client for interacting with the SuperOffice REST API using OAuth 2.0 Bearer tokens. """ - def __init__(self, auth_handler: AuthHandler, tenant_id: str): - # Base URL for the SuperOffice REST API, including tenant_id - self.base_url = f"https://sod.superoffice.com/{tenant_id}/api" + def __init__(self, auth_handler: AuthHandler): self.auth_handler = auth_handler - self.tenant_id = tenant_id self.session = requests.Session() + + # Mapping for UDF fields (These are typical technical names, but might need adjustment) + self.udf_mapping = { + "robotics_potential": "x_robotics_potential", + "industry": "x_ai_industry", + "summary": "x_ai_summary", + "last_update": "x_ai_last_update", + "status": "x_ai_status" + } + + # Mapping for list values (Explorer -> SO ID) + self.potential_id_map = { + "High": 1, + "Medium": 2, + "Low": 3, + "None": 4 + } - def _get_auth_headers(self): - """Returns the authorization headers with a valid token and context.""" - access_token = self.auth_handler.get_access_token() + def _get_headers(self): + """Returns the authorization headers with Bearer token.""" + access_token, _ = self.auth_handler.get_ticket() return { 'Authorization': f'Bearer {access_token}', 'Accept': 'application/json', - 'X-SuperOffice-Context': f'TenantId={self.tenant_id}' # Crucial for multi-tenant environments + 'Content-Type': 'application/json' } + def _get_url(self, path): + """Constructs the full URL for a given API path.""" + _, webapi_url = self.auth_handler.get_ticket() + base = webapi_url.rstrip('/') + p = path.lstrip('/') + return f"{base}/api/{p}" + def test_connection(self): - """ - Performs a simple API call to test the connection and authentication. - Fetches the current user principal. - """ - endpoint = "/v1/User/currentPrincipal" - test_url = f"{self.base_url}{endpoint}" - - logger.info(f"Attempting to test connection to: {test_url}") - + """Tests the connection by fetching the current user.""" + url = self._get_url("v1/User/currentPrincipal") try: - headers = self._get_auth_headers() - response = self.session.get(test_url, headers=headers) - response.raise_for_status() - - user_data = response.json() - logger.info("Successfully connected to SuperOffice API.") - logger.info(f"Authenticated as: {user_data.get('Name', 'N/A')} ({user_data.get('Associate', 'N/A')})") - return user_data - - except requests.exceptions.HTTPError as http_err: - logger.error(f"HTTP error during connection test: {http_err} - {http_err.response.text}") + resp = self.session.get(url, headers=self._get_headers()) + resp.raise_for_status() + return resp.json() + except Exception as e: + logger.error(f"Connection test failed: {e}") + if hasattr(e, 'response') and e.response is not None: + logger.error(f"Response: {e.response.text}") return None - except requests.exceptions.RequestException as req_err: - logger.error(f"Request error during connection test: {req_err}") + + def find_contact_by_criteria(self, name=None, url=None, org_nr=None): + """Searches for a contact by OrgNr, URL, or Name.""" + filters = [] + if org_nr: + filters.append(f"orgNr eq '{org_nr}'") + if url: + filters.append(f"urlAddress eq '{url}'") + + if not filters and name: + filters.append(f"name contains '{name}'") + + if not filters: + return None + + query = " and ".join(filters) + path = f"v1/Contact?$filter={query}" + + try: + full_url = self._get_url(path) + resp = self.session.get(full_url, headers=self._get_headers()) + resp.raise_for_status() + data = resp.json() + + results = data.get("value", []) + if results: + logger.info(f"Found {len(results)} matching contacts.") + return results[0] return None except Exception as e: - logger.error(f"An unexpected error occurred during connection test: {e}") + logger.error(f"Error searching for contact: {e}") return None diff --git a/connector-superoffice/xml_to_pem.py b/connector-superoffice/xml_to_pem.py new file mode 100644 index 00000000..1dbe2b03 --- /dev/null +++ b/connector-superoffice/xml_to_pem.py @@ -0,0 +1,47 @@ +import base64 +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + +# The XML data provided by the user +modulus_b64 = "3PlhZKih1S9AKmsOcnPuS6FfPyYdKg6ltCUypt4EOi2++oM5O26YFxODBtQHO+UmsEoNcz6X2A5BE9kv4y8Xyv+hDxQrHsyavrkq2Yn5Mf/BFAquYuRoX5FtvH6ht+yllfBJQs3wE9m/O8LKHomKE5HXiaV/QMDRLoYeAwzQwcE=" +exponent_b64 = "AQAB" +d_b64 = "i8TdWprjSgHKF0qB59j2WDYpFbtY5RpAq3J/2FZD3DzFOJU55SKt5qK71NzV+oeV8hnU6hkkWE+j0BcnGA7Yf6xGIoVNVhrenU18hrd6vSUPDeOuerkv+u98pNEqs6jcfYwhKKEJ2nFl4AacdQ7RaQPEWb41pVYvP+qaX6PeQAE=" +p_b64 = "8fGRi846fRCbc8oaUGnw1dR2BXOopzxfAMeKEOCUeRP/Yj1kUcW9k4zUeaFc2upnfAeUbX38Bk5VW5edCDIjAQ==" +q_b64 = "6c/usvg8/4quH8Z70tSotmN+N6UxiuaTF51oOeTnIVUjXMqB3gc5sRCbipGj1u+DJUYh4LQLZp+W2LU7uCpewQ==" +dp_b64 = "y2q8YVwh5tbYrHCm0SdRWqcIF6tXiEwE4EXkOi5oBqielr1hJDNqIa1NU3os9M4R9cD1tV0wUSj5MUn2uFZXAQ==" +dq_b64 = "yc9+8Z0QUWVrC+QvBngls1/HFtKQI5sHRS/JQYdQ9FVfM31bgL/tzOZPytgQebm8EdUp8qCU4pxHAH/Vrw1rQQ==" +inverse_q_b64 = "VX4SRxVQ130enAqw9M0Nyl+875vmhc6cbsJQQ3E/fJjQvkB8EgjxBp6JVTeY1U5ga56Hvzngomk335pA6gli0A==" + +def b64_to_int(b64_str): + return int.from_bytes(base64.b64decode(b64_str), byteorder='big') + +# Convert components to integers +n = b64_to_int(modulus_b64) +e = b64_to_int(exponent_b64) +d = b64_to_int(d_b64) +p = b64_to_int(p_b64) +q = b64_to_int(q_b64) +dmp1 = b64_to_int(dp_b64) +dmq1 = b64_to_int(dq_b64) +iqmp = b64_to_int(inverse_q_b64) + +# Reconstruct the private key object +private_key = rsa.RSAPrivateNumbers( + p=p, + q=q, + d=d, + dmp1=dmp1, + dmq1=dmq1, + iqmp=iqmp, + public_numbers=rsa.RSAPublicNumbers(e, n) +).private_key() + +# Serialize to PEM +pem_private = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() +) + +# Print for the user +print(pem_private.decode('utf-8'))