import requests import logging from auth_handler import AuthHandler from config import Config from logging_config import setup_logging # Use the centralized logging configuration logger = setup_logging(__name__) class SuperOfficeClient: """ A client for interacting with the SuperOffice REST API using OAuth 2.0 Bearer tokens. """ def __init__(self, auth_handler: AuthHandler): self.auth_handler = auth_handler self.session = requests.Session() # Load mappings from Config self.udf_contact_mapping = Config.UDF_CONTACT_MAPPING self.udf_person_mapping = Config.UDF_PERSON_MAPPING self.ma_status_id_map = Config.MA_STATUS_ID_MAP def _get_headers(self): """Returns the authorization headers with Bearer token.""" access_token, _ = self.auth_handler.get_ticket() return { 'Authorization': f'Bearer {access_token}', 'Accept': 'application/json', 'Content-Type': 'application/json' } def _get_url(self, path): """Constructs the full URL for a given API path.""" _, webapi_url = self.auth_handler.get_ticket() base = webapi_url.rstrip('/') p = path.lstrip('/') return f"{base}/api/{p}" def test_connection(self): """Tests the connection by fetching the current user.""" url = self._get_url("v1/User/currentPrincipal") try: resp = self.session.get(url, headers=self._get_headers()) resp.raise_for_status() return resp.json() except Exception as e: logger.error(f"Connection test failed: {e}") if hasattr(e, 'response') and e.response is not None: logger.error(f"Response: {e.response.text}") return None def find_contact_by_criteria(self, name=None, url=None, org_nr=None): """Searches for a contact by OrgNr, URL, or Name.""" filters = [] if org_nr: filters.append(f"orgNr eq '{org_nr}'") if url: filters.append(f"urlAddress eq '{url}'") if not filters and name: filters.append(f"name contains '{name}'") if not filters: return None query = " and ".join(filters) path = f"v1/Contact?$filter={query}" try: full_url = self._get_url(path) resp = self.session.get(full_url, headers=self._get_headers()) resp.raise_for_status() data = resp.json() results = data.get("value", []) if results: logger.info(f"Found {len(results)} matching contacts.") return results[0] return None except Exception as e: logger.error(f"Error searching for contact: {e}") return None def create_contact(self, name, url=None, org_nr=None): """Creates a new contact (company) in SuperOffice with basic details.""" url = self._get_url("v1/Contact") payload = { "Name": name, "OrgNr": org_nr, "UrlAddress": url, "ActivePublications": [], # Required field, can be empty "Emails": [], # Required field, can be empty "Phones": [] # Required field, can be empty } # Remove None values payload = {k: v for k, v in payload.items() if v is not None} try: logger.info(f"Attempting to create contact: {name}") resp = self.session.post(url, headers=self._get_headers(), json=payload) resp.raise_for_status() created_contact = resp.json() logger.info(f"Successfully created contact: {created_contact.get('Name')} (ID: {created_contact.get('ContactId')})") return created_contact except Exception as e: if hasattr(e, 'response') and e.response is not None: logger.error(f"Response: {e.response.text}") return None def create_person(self, first_name, last_name, contact_id, email=None): """Creates a new person linked to a contact (company).""" url = self._get_url("v1/Person") payload = { "Firstname": first_name, "Lastname": last_name, "Contact": { "ContactId": contact_id }, "Emails": [] } if email: payload["Emails"].append({ "Value": email, "Rank": 1, "Description": "Work" # Optional description }) try: logger.info(f"Attempting to create person: {first_name} {last_name} for Contact ID {contact_id}") resp = self.session.post(url, headers=self._get_headers(), json=payload) resp.raise_for_status() created_person = resp.json() logger.info(f"Successfully created person: {created_person.get('Firstname')} {created_person.get('Lastname')} (ID: {created_person.get('PersonId')})") return created_person except Exception as e: if hasattr(e, 'response') and e.response is not None: logger.error(f"Response: {e.response.text}") return None def create_sale(self, title, contact_id, person_id=None, amount=0.0): """Creates a new Sale (Opportunity) linked to a contact and optionally a person.""" url = self._get_url("v1/Sale") payload = { "Heading": title, "Contact": { "ContactId": contact_id }, "Amount": amount, "SaleType": { # Assuming default ID 1 exists "Id": 1 }, "SaleStage": { # Assuming default ID for the first stage is 1 "Id": 1 }, "Probability": 10 # Default probability } if person_id: payload["Person"] = { "PersonId": person_id } try: logger.info(f"Attempting to create sale: '{title}' for Contact ID {contact_id}") resp = self.session.post(url, headers=self._get_headers(), json=payload) resp.raise_for_status() created_sale = resp.json() logger.info(f"Successfully created sale: {created_sale.get('Heading')} (ID: {created_sale.get('SaleId')})") return created_sale except Exception as e: if hasattr(e, 'response') and e.response is not None: logger.error(f"Response: {e.response.text}") return None def create_project(self, name, contact_id, person_id=None): """Creates a new Project linked to a contact and optionally adds a person as a member.""" url = self._get_url("v1/Project") payload = { "Name": name, "Contact": { "ContactId": contact_id }, "ProjectType": { # Assuming default ID 1 exists "Id": 1 }, "ProjectStatus": { # Assuming default ID 1 for 'In progress' exists "Id": 1 }, "ProjectMembers": [] } if person_id: payload["ProjectMembers"].append({ "PersonId": person_id }) try: logger.info(f"Attempting to create project: '{name}' for Contact ID {contact_id}") resp = self.session.post(url, headers=self._get_headers(), json=payload) resp.raise_for_status() created_project = resp.json() logger.info(f"Successfully created project: {created_project.get('Name')} (ID: {created_project.get('ProjectId')})") return created_project except Exception as e: logger.error(f"Error creating project: {e}") if hasattr(e, 'response') and e.response is not None: logger.error(f"Response: {e.response.text}") return None def update_entity_udfs(self, entity_id, entity_type, udf_data: dict): """Updates user-defined fields for a given entity (Contact or Person).""" if entity_type not in ["Contact", "Person"]: logger.error(f"Invalid entity_type: {entity_type}. Must be 'Contact' or 'Person'.") return None # 1. Retrieve the existing entity to ensure all required fields are present in the PUT payload get_url = self._get_url(f"v1/{entity_type}/{entity_id}") try: get_resp = self.session.get(get_url, headers=self._get_headers()) get_resp.raise_for_status() existing_entity = get_resp.json() logger.info(f"Successfully retrieved existing {entity_type} ID {entity_id}.") except Exception as e: logger.error(f"Error retrieving existing {entity_type} ID {entity_id}: {e}") if hasattr(e, 'response') and e.response is not None: logger.error(f"Response: {e.response.text}") return None # Use the existing entity data as the base for the PUT payload payload = existing_entity if "UserDefinedFields" not in payload: payload["UserDefinedFields"] = {} # Select the correct mapping based on entity type udf_mapping = self.udf_contact_mapping if entity_type == "Contact" else self.udf_person_mapping for key, value in udf_data.items(): prog_id = udf_mapping.get(key) if prog_id: if key == "ma_status" and entity_type == "Person": # For MA Status, we need to send the internal ID directly as an integer internal_id = self.ma_status_id_map.get(value) if internal_id: payload["UserDefinedFields"][prog_id] = internal_id else: logger.warning(f"Unknown MA Status value '{value}'. Skipping update for {key}.") else: # For other UDFs, send the value directly payload["UserDefinedFields"][prog_id] = value else: logger.warning(f"Unknown UDF key for {entity_type}: {key}. Skipping.") if not payload["UserDefinedFields"]: logger.info(f"No valid UDF data to update for {entity_type} ID {entity_id}.") return None # 2. Send the updated entity (including all original fields + modified UDFs) via PUT put_url = self._get_url(f"v1/{entity_type}/{entity_id}") try: logger.info(f"Attempting to update UDFs for {entity_type} ID {entity_id} with: {payload['UserDefinedFields']}") resp = self.session.put(put_url, headers=self._get_headers(), json=payload) resp.raise_for_status() updated_entity = resp.json() logger.info(f"Successfully updated UDFs for {entity_type} ID {entity_id}.") return updated_entity except Exception as e: logger.error(f"Error updating UDFs for {entity_type} ID {entity_id}: {e}") if hasattr(e, 'response') and e.response is not None: logger.error(f"Response: {e.response.text}") return None # NOTE: The create_email_activity method is currently blocked due to SuperOffice environment limitations. # Attempting to create an Email Activity via API results in a 500 Internal Server Error, # likely because the email module is not licensed or configured in the SOD environment. # This method is temporarily commented out. # # def create_email_activity(self, person_id, contact_id, subject, body): # """Creates an Email Activity linked to a person and contact.""" # url = self._get_url("v1/Activity") # # payload = { # "Type": { # Assuming ID 2 for "Email" ActivityType # "Id": 2 # }, # "Title": subject, # "Details": body, # "Person": { # "PersonId": person_id # }, # "Contact": { # "ContactId": contact_id # } # } # # try: # logger.info(f"Attempting to create Email Activity with subject '{subject}' for Person ID {person_id} and Contact ID {contact_id}") # resp = self.session.post(url, headers=self._get_headers(), json=payload) # resp.raise_for_status() # created_activity = resp.json() # logger.info(f"Successfully created Email Activity: '{created_activity.get('Title')}' (ID: {created_activity.get('ActivityId')})") # return created_activity # except Exception as e: # logger.error(f"Error creating Email Activity: {e}") # if hasattr(e, 'response') and e.response is not None: # logger.error(f"Response: {e.response.text}") # return None