Files
Brancheneinstufung2/connector-superoffice/superoffice_client.py
Floke 005c947dae [2ff88f42] refactor(connector-superoffice): finalize production readiness cleanup
- Integrated centralized logging system in all modules.
- Extracted all IDs and ProgIds into a separate .
- Refactored  and  for cleaner dependency management.
- Included updated discovery and inspection utilities.
- Verified end-to-end workflow stability.
2026-02-10 12:43:26 +00:00

312 lines
13 KiB
Python

import requests
import logging
from auth_handler import AuthHandler
from config import Config
from logging_config import setup_logging
# Use the centralized logging configuration
logger = setup_logging(__name__)
class SuperOfficeClient:
"""
A client for interacting with the SuperOffice REST API using OAuth 2.0 Bearer tokens.
"""
def __init__(self, auth_handler: AuthHandler):
self.auth_handler = auth_handler
self.session = requests.Session()
# Load mappings from Config
self.udf_contact_mapping = Config.UDF_CONTACT_MAPPING
self.udf_person_mapping = Config.UDF_PERSON_MAPPING
self.ma_status_id_map = Config.MA_STATUS_ID_MAP
def _get_headers(self):
"""Returns the authorization headers with Bearer token."""
access_token, _ = self.auth_handler.get_ticket()
return {
'Authorization': f'Bearer {access_token}',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
def _get_url(self, path):
"""Constructs the full URL for a given API path."""
_, webapi_url = self.auth_handler.get_ticket()
base = webapi_url.rstrip('/')
p = path.lstrip('/')
return f"{base}/api/{p}"
def test_connection(self):
"""Tests the connection by fetching the current user."""
url = self._get_url("v1/User/currentPrincipal")
try:
resp = self.session.get(url, headers=self._get_headers())
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Connection test failed: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def find_contact_by_criteria(self, name=None, url=None, org_nr=None):
"""Searches for a contact by OrgNr, URL, or Name."""
filters = []
if org_nr:
filters.append(f"orgNr eq '{org_nr}'")
if url:
filters.append(f"urlAddress eq '{url}'")
if not filters and name:
filters.append(f"name contains '{name}'")
if not filters:
return None
query = " and ".join(filters)
path = f"v1/Contact?$filter={query}"
try:
full_url = self._get_url(path)
resp = self.session.get(full_url, headers=self._get_headers())
resp.raise_for_status()
data = resp.json()
results = data.get("value", [])
if results:
logger.info(f"Found {len(results)} matching contacts.")
return results[0]
return None
except Exception as e:
logger.error(f"Error searching for contact: {e}")
return None
def create_contact(self, name, url=None, org_nr=None):
"""Creates a new contact (company) in SuperOffice with basic details."""
url = self._get_url("v1/Contact")
payload = {
"Name": name,
"OrgNr": org_nr,
"UrlAddress": url,
"ActivePublications": [], # Required field, can be empty
"Emails": [], # Required field, can be empty
"Phones": [] # Required field, can be empty
}
# Remove None values
payload = {k: v for k, v in payload.items() if v is not None}
try:
logger.info(f"Attempting to create contact: {name}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_contact = resp.json()
logger.info(f"Successfully created contact: {created_contact.get('Name')} (ID: {created_contact.get('ContactId')})")
return created_contact
except Exception as e:
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def create_person(self, first_name, last_name, contact_id, email=None):
"""Creates a new person linked to a contact (company)."""
url = self._get_url("v1/Person")
payload = {
"Firstname": first_name,
"Lastname": last_name,
"Contact": {
"ContactId": contact_id
},
"Emails": []
}
if email:
payload["Emails"].append({
"Value": email,
"Rank": 1,
"Description": "Work" # Optional description
})
try:
logger.info(f"Attempting to create person: {first_name} {last_name} for Contact ID {contact_id}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_person = resp.json()
logger.info(f"Successfully created person: {created_person.get('Firstname')} {created_person.get('Lastname')} (ID: {created_person.get('PersonId')})")
return created_person
except Exception as e:
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def create_sale(self, title, contact_id, person_id=None, amount=0.0):
"""Creates a new Sale (Opportunity) linked to a contact and optionally a person."""
url = self._get_url("v1/Sale")
payload = {
"Heading": title,
"Contact": {
"ContactId": contact_id
},
"Amount": amount,
"SaleType": { # Assuming default ID 1 exists
"Id": 1
},
"SaleStage": { # Assuming default ID for the first stage is 1
"Id": 1
},
"Probability": 10 # Default probability
}
if person_id:
payload["Person"] = {
"PersonId": person_id
}
try:
logger.info(f"Attempting to create sale: '{title}' for Contact ID {contact_id}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_sale = resp.json()
logger.info(f"Successfully created sale: {created_sale.get('Heading')} (ID: {created_sale.get('SaleId')})")
return created_sale
except Exception as e:
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def create_project(self, name, contact_id, person_id=None):
"""Creates a new Project linked to a contact and optionally adds a person as a member."""
url = self._get_url("v1/Project")
payload = {
"Name": name,
"Contact": {
"ContactId": contact_id
},
"ProjectType": { # Assuming default ID 1 exists
"Id": 1
},
"ProjectStatus": { # Assuming default ID 1 for 'In progress' exists
"Id": 1
},
"ProjectMembers": []
}
if person_id:
payload["ProjectMembers"].append({
"PersonId": person_id
})
try:
logger.info(f"Attempting to create project: '{name}' for Contact ID {contact_id}")
resp = self.session.post(url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
created_project = resp.json()
logger.info(f"Successfully created project: {created_project.get('Name')} (ID: {created_project.get('ProjectId')})")
return created_project
except Exception as e:
logger.error(f"Error creating project: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
def update_entity_udfs(self, entity_id, entity_type, udf_data: dict):
"""Updates user-defined fields for a given entity (Contact or Person)."""
if entity_type not in ["Contact", "Person"]:
logger.error(f"Invalid entity_type: {entity_type}. Must be 'Contact' or 'Person'.")
return None
# 1. Retrieve the existing entity to ensure all required fields are present in the PUT payload
get_url = self._get_url(f"v1/{entity_type}/{entity_id}")
try:
get_resp = self.session.get(get_url, headers=self._get_headers())
get_resp.raise_for_status()
existing_entity = get_resp.json()
logger.info(f"Successfully retrieved existing {entity_type} ID {entity_id}.")
except Exception as e:
logger.error(f"Error retrieving existing {entity_type} ID {entity_id}: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
# Use the existing entity data as the base for the PUT payload
payload = existing_entity
if "UserDefinedFields" not in payload:
payload["UserDefinedFields"] = {}
# Select the correct mapping based on entity type
udf_mapping = self.udf_contact_mapping if entity_type == "Contact" else self.udf_person_mapping
for key, value in udf_data.items():
prog_id = udf_mapping.get(key)
if prog_id:
if key == "ma_status" and entity_type == "Person":
# For MA Status, we need to send the internal ID directly as an integer
internal_id = self.ma_status_id_map.get(value)
if internal_id:
payload["UserDefinedFields"][prog_id] = internal_id
else:
logger.warning(f"Unknown MA Status value '{value}'. Skipping update for {key}.")
else:
# For other UDFs, send the value directly
payload["UserDefinedFields"][prog_id] = value
else:
logger.warning(f"Unknown UDF key for {entity_type}: {key}. Skipping.")
if not payload["UserDefinedFields"]:
logger.info(f"No valid UDF data to update for {entity_type} ID {entity_id}.")
return None
# 2. Send the updated entity (including all original fields + modified UDFs) via PUT
put_url = self._get_url(f"v1/{entity_type}/{entity_id}")
try:
logger.info(f"Attempting to update UDFs for {entity_type} ID {entity_id} with: {payload['UserDefinedFields']}")
resp = self.session.put(put_url, headers=self._get_headers(), json=payload)
resp.raise_for_status()
updated_entity = resp.json()
logger.info(f"Successfully updated UDFs for {entity_type} ID {entity_id}.")
return updated_entity
except Exception as e:
logger.error(f"Error updating UDFs for {entity_type} ID {entity_id}: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
return None
# NOTE: The create_email_activity method is currently blocked due to SuperOffice environment limitations.
# Attempting to create an Email Activity via API results in a 500 Internal Server Error,
# likely because the email module is not licensed or configured in the SOD environment.
# This method is temporarily commented out.
#
# def create_email_activity(self, person_id, contact_id, subject, body):
# """Creates an Email Activity linked to a person and contact."""
# url = self._get_url("v1/Activity")
#
# payload = {
# "Type": { # Assuming ID 2 for "Email" ActivityType
# "Id": 2
# },
# "Title": subject,
# "Details": body,
# "Person": {
# "PersonId": person_id
# },
# "Contact": {
# "ContactId": contact_id
# }
# }
#
# try:
# logger.info(f"Attempting to create Email Activity with subject '{subject}' for Person ID {person_id} and Contact ID {contact_id}")
# resp = self.session.post(url, headers=self._get_headers(), json=payload)
# resp.raise_for_status()
# created_activity = resp.json()
# logger.info(f"Successfully created Email Activity: '{created_activity.get('Title')}' (ID: {created_activity.get('ActivityId')})")
# return created_activity
# except Exception as e:
# logger.error(f"Error creating Email Activity: {e}")
# if hasattr(e, 'response') and e.response is not None:
# logger.error(f"Response: {e.response.text}")
# return None