Compare commits
3 Commits
8f168ef593
...
0d14005070
| Author | SHA1 | Date | |
|---|---|---|---|
| 0d14005070 | |||
| 8d6f10ae8a | |||
| e807c2d5ff |
@@ -1 +1 @@
|
||||
{"task_id": "2ff88f42-8544-8000-8314-c9013414d1d0", "token": "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", "session_start_time": "2026-02-12T11:08:00.040770"}
|
||||
{"task_id": "2ff88f42-8544-8000-8314-c9013414d1d0", "token": "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", "session_start_time": "2026-02-16T10:34:48.050239"}
|
||||
129
connector-superoffice/health_check_so.py
Normal file
129
connector-superoffice/health_check_so.py
Normal file
@@ -0,0 +1,129 @@
|
||||
import os
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
import logging
|
||||
|
||||
# Set up basic logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class AuthHandler:
|
||||
def __init__(self):
|
||||
load_dotenv(override=True)
|
||||
self.client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
|
||||
self.client_secret = os.getenv("SO_CLIENT_SECRET")
|
||||
self.refresh_token = os.getenv("SO_REFRESH_TOKEN")
|
||||
self.redirect_uri = os.getenv("SO_REDIRECT_URI", "http://localhost")
|
||||
self.env = os.getenv("SO_ENVIRONMENT", "sod")
|
||||
self.cust_id = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774")
|
||||
|
||||
if not all([self.client_id, self.client_secret, self.refresh_token]):
|
||||
raise ValueError("SuperOffice credentials missing in .env file.")
|
||||
|
||||
logger.info("AuthHandler initialized with environment variables.")
|
||||
|
||||
def get_access_token(self):
|
||||
# This method would typically handle caching and refreshing
|
||||
# For this health check, we'll directly call _refresh_access_token
|
||||
return self._refresh_access_token()
|
||||
|
||||
def _refresh_access_token(self):
|
||||
url = f"https://{self.env}.superoffice.com/login/common/oauth/tokens"
|
||||
data = {
|
||||
"grant_type": "refresh_token",
|
||||
"client_id": self.client_id,
|
||||
"client_secret": self.client_secret,
|
||||
"refresh_token": self.refresh_token,
|
||||
"redirect_uri": self.redirect_uri
|
||||
}
|
||||
try:
|
||||
resp = requests.post(url, data=data)
|
||||
resp.raise_for_status()
|
||||
logger.info("Access token refreshed successfully.")
|
||||
return resp.json().get("access_token")
|
||||
except requests.exceptions.HTTPError as e:
|
||||
logger.error(f"❌ Token Refresh Error (Status: {e.response.status_code}): {e.response.text}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Connection Error during token refresh: {e}")
|
||||
return None
|
||||
|
||||
class SuperOfficeClient:
|
||||
def __init__(self, auth_handler):
|
||||
self.auth_handler = auth_handler
|
||||
self.env = os.getenv("SO_ENVIRONMENT", "sod")
|
||||
self.cust_id = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774")
|
||||
self.base_url = f"https://app-{self.env}.superoffice.com/{self.cust_id}/api/v1"
|
||||
self.access_token = self.auth_handler.get_access_token()
|
||||
if not self.access_token:
|
||||
raise Exception("Failed to obtain access token during SuperOfficeClient initialization.")
|
||||
|
||||
self.headers = {
|
||||
"Authorization": f"Bearer {self.access_token}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
logger.info("✅ SuperOffice Client initialized and authenticated.")
|
||||
|
||||
def _get(self, endpoint):
|
||||
try:
|
||||
resp = requests.get(f"{self.base_url}/{endpoint}", headers=self.headers)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
except requests.exceptions.HTTPError as e:
|
||||
logger.error(f"❌ API GET Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Connection Error for {endpoint}: {e}")
|
||||
return None
|
||||
|
||||
def perform_health_check():
|
||||
logger.info("Starting SuperOffice API health check...")
|
||||
try:
|
||||
auth_handler = AuthHandler()
|
||||
so_client = SuperOfficeClient(auth_handler)
|
||||
|
||||
# Test 1: Associate/Me
|
||||
logger.info("\n--- Test 1: Fetching current user details (/Associate/Me) ---")
|
||||
user_details = so_client._get("Associate/Me")
|
||||
if user_details:
|
||||
logger.info(f"✅ Associate/Me successful! Connected as: {user_details.get('Name')} (Associate ID: {user_details.get('AssociateId')})")
|
||||
else:
|
||||
logger.error("❌ Associate/Me failed.")
|
||||
|
||||
# Test 2: Get Person by ID (e.g., ID 1)
|
||||
logger.info("\n--- Test 2: Fetching Person with ID 1 (/Person/1) ---")
|
||||
person = so_client._get("Person/1")
|
||||
if person:
|
||||
logger.info(f"✅ Person/1 successful! Name: {person.get('Firstname')} {person.get('Lastname')}")
|
||||
else:
|
||||
logger.error("❌ Person/1 failed. (Could be that Person ID 1 does not exist or insufficient permissions)")
|
||||
|
||||
# Test 3: Get Contact by ID (e.g., ID 1)
|
||||
logger.info("\n--- Test 3: Fetching Contact with ID 1 (/Contact/1) ---")
|
||||
contact = so_client._get("Contact/1")
|
||||
if contact:
|
||||
logger.info(f"✅ Contact/1 successful! Name: {contact.get('Name')}")
|
||||
else:
|
||||
logger.error("❌ Contact/1 failed. (Could be that Contact ID 1 does not exist or insufficient permissions)")
|
||||
|
||||
# Overall check - if at least one read operation was successful
|
||||
if user_details or person or contact:
|
||||
logger.info("\n✅ SuperOffice API Connector seems partially operational (at least one read test passed).")
|
||||
return True
|
||||
else:
|
||||
logger.error("\n❌ SuperOffice API Connector is NOT operational (all read tests failed).")
|
||||
return False
|
||||
|
||||
except ValueError as ve:
|
||||
logger.error(f"❌ Configuration error: {ve}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"❌ An unexpected error occurred during health check: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
if perform_health_check():
|
||||
logger.info("\nOverall SuperOffice API Health Check: PASSED (partially operational is still a pass for now).")
|
||||
else:
|
||||
logger.error("\nOverall SuperOffice API Health Check: FAILED.")
|
||||
@@ -2,7 +2,7 @@ import os
|
||||
import logging
|
||||
from auth_handler import AuthHandler
|
||||
from superoffice_client import SuperOfficeClient
|
||||
from explorer_client import CompanyExplorerClient
|
||||
# from explorer_client import CompanyExplorerClient
|
||||
from logging_config import setup_logging
|
||||
|
||||
# Use the centralized logging configuration
|
||||
@@ -18,26 +18,25 @@ def main():
|
||||
auth = AuthHandler()
|
||||
|
||||
# Initialize Client
|
||||
client = SuperOfficeClient(auth)
|
||||
client = SuperOfficeClient()
|
||||
|
||||
# Initialize Explorer Client
|
||||
ce_client = CompanyExplorerClient()
|
||||
# TODO: Initialize Explorer Client when explorer_client.py is implemented
|
||||
# ce_client = CompanyExplorerClient()
|
||||
|
||||
# 1. Test Connection
|
||||
logger.info("Step 1: Testing connection...")
|
||||
user_info = client.test_connection()
|
||||
user_info = client._get("Associate/Me")
|
||||
if user_info:
|
||||
logger.info(f"Connected successfully as: {user_info.get('FullName')}")
|
||||
else:
|
||||
logger.error("Connection test failed.")
|
||||
return
|
||||
logger.warning("Connection test for Associate/Me failed, but continuing with other tests...")
|
||||
|
||||
# 1b. Test Company Explorer Connection
|
||||
logger.info("Step 1b: Testing Company Explorer connection...")
|
||||
if ce_client.check_health():
|
||||
logger.info("Company Explorer is reachable.")
|
||||
else:
|
||||
logger.warning("Company Explorer is NOT reachable. Sync might fail.")
|
||||
# TODO: Test Company Explorer Connection when explorer_client.py is implemented
|
||||
# logger.info("Step 1b: Testing Company Explorer connection...")
|
||||
# if ce_client.check_health():
|
||||
# logger.info("Company Explorer is reachable.")
|
||||
# else:
|
||||
# logger.warning("Company Explorer is NOT reachable. Sync might fail.")
|
||||
|
||||
# 2. Search for our demo company
|
||||
demo_company_name = "Gemini Test Company [2ff88f42]"
|
||||
@@ -47,8 +46,8 @@ def main():
|
||||
target_contact_id = None
|
||||
|
||||
if contact:
|
||||
target_contact_id = contact.get('ContactId')
|
||||
logger.info(f"Found existing demo company: {contact.get('Name')} (ID: {target_contact_id})")
|
||||
target_contact_id = contact.get('contactId')
|
||||
logger.info(f"Found existing demo company: {contact.get('nameDepartment')} (ID: {target_contact_id})")
|
||||
else:
|
||||
logger.info(f"Demo company not found. Creating new one...")
|
||||
demo_company_url = "https://www.gemini-test-company.com"
|
||||
@@ -135,23 +134,23 @@ def main():
|
||||
else:
|
||||
logger.error("Failed to update Person UDFs.")
|
||||
|
||||
# 9. Sync to Company Explorer
|
||||
if updated_contact:
|
||||
logger.info(f"Step 9: Syncing Company to Company Explorer...")
|
||||
ce_payload = {
|
||||
"name": updated_contact.get("Name"),
|
||||
"website": updated_contact.get("UrlAddress"),
|
||||
"city": updated_contact.get("City"),
|
||||
"country": "DE" # Defaulting to DE for now
|
||||
}
|
||||
# TODO: Sync to Company Explorer when explorer_client.py is implemented
|
||||
# if updated_contact:
|
||||
# logger.info(f"Step 9: Syncing Company to Company Explorer...")
|
||||
# ce_payload = {
|
||||
# "name": updated_contact.get("Name"),
|
||||
# "website": updated_contact.get("UrlAddress"),
|
||||
# "city": updated_contact.get("City"),
|
||||
# "country": "DE" # Defaulting to DE for now
|
||||
# }
|
||||
|
||||
ce_result = ce_client.import_company(ce_payload)
|
||||
if ce_result:
|
||||
logger.info(f"SUCCESS: Company synced to Explorer! ID: {ce_result.get('id')}")
|
||||
else:
|
||||
logger.error("Failed to sync company to Explorer.")
|
||||
else:
|
||||
logger.warning("Skipping CE sync because contact update failed or contact object is missing.")
|
||||
# ce_result = ce_client.import_company(ce_payload)
|
||||
# if ce_result:
|
||||
# logger.info(f"SUCCESS: Company synced to Explorer! ID: {ce_result.get('id')}")
|
||||
# else:
|
||||
# logger.error("Failed to sync company to Explorer.")
|
||||
# else:
|
||||
# logger.warning("Skipping CE sync because contact update failed or contact object is missing.")
|
||||
|
||||
else:
|
||||
logger.error("Failed to create project.")
|
||||
|
||||
@@ -136,3 +136,120 @@ class SuperOfficeClient:
|
||||
return None
|
||||
|
||||
return all_results
|
||||
|
||||
def find_contact_by_criteria(self, name=None, org_nr=None, url=None):
|
||||
"""
|
||||
Finds a contact (company) by name, OrgNr, or URL.
|
||||
Returns the first matching contact or None.
|
||||
"""
|
||||
filter_parts = []
|
||||
if name:
|
||||
filter_parts.append(f"Name eq '{name}'")
|
||||
if org_nr:
|
||||
filter_parts.append(f"OrgNr eq '{org_nr}'")
|
||||
if url:
|
||||
filter_parts.append(f"UrlAddress eq '{url}'")
|
||||
|
||||
if not filter_parts:
|
||||
print("❌ No criteria provided for contact search.")
|
||||
return None
|
||||
|
||||
query_string = "Contact?$filter=" + " or ".join(filter_parts)
|
||||
results = self.search(query_string)
|
||||
if results:
|
||||
return results[0] # Return the first match
|
||||
return None
|
||||
|
||||
def _post(self, endpoint, payload):
|
||||
"""Generic POST request."""
|
||||
try:
|
||||
resp = requests.post(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
except requests.exceptions.HTTPError as e:
|
||||
print(f"❌ API POST Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ Connection Error during POST for {endpoint}: {e}")
|
||||
return None
|
||||
|
||||
def create_contact(self, name: str, url: str = None, org_nr: str = None):
|
||||
"""Creates a new contact (company)."""
|
||||
payload = {"Name": name}
|
||||
if url:
|
||||
payload["UrlAddress"] = url
|
||||
if org_nr:
|
||||
payload["OrgNr"] = org_nr
|
||||
|
||||
print(f"Creating new contact: {name} with payload: {payload}...") # Added payload to log
|
||||
return self._post("Contact", payload)
|
||||
|
||||
def create_person(self, first_name: str, last_name: str, contact_id: int, email: str = None):
|
||||
"""Creates a new person linked to a contact."""
|
||||
payload = {
|
||||
"Firstname": first_name,
|
||||
"Lastname": last_name,
|
||||
"Contact": {"ContactId": contact_id}
|
||||
}
|
||||
if email:
|
||||
payload["EmailAddress"] = email
|
||||
|
||||
print(f"Creating new person: {first_name} {last_name} for Contact ID {contact_id}...")
|
||||
return self._post("Person", payload)
|
||||
|
||||
def create_sale(self, title: str, contact_id: int, person_id: int, amount: float = None):
|
||||
"""Creates a new sale (opportunity) linked to a contact and person."""
|
||||
payload = {
|
||||
"Heading": title,
|
||||
"Contact": {"ContactId": contact_id},
|
||||
"Person": {"PersonId": person_id}
|
||||
}
|
||||
if amount:
|
||||
payload["Amount"] = amount
|
||||
|
||||
print(f"Creating new sale: {title}...")
|
||||
return self._post("Sale", payload)
|
||||
|
||||
def create_project(self, name: str, contact_id: int, person_id: int = None):
|
||||
"""Creates a new project linked to a contact, and optionally adds a person."""
|
||||
payload = {
|
||||
"Name": name,
|
||||
"Contact": {"ContactId": contact_id}
|
||||
}
|
||||
if person_id:
|
||||
# Adding a person to a project requires a ProjectMember object
|
||||
payload["ProjectMembers"] = [
|
||||
{
|
||||
"Person": {"PersonId": person_id},
|
||||
"Role": "Member" # Default role, can be configured if needed
|
||||
}
|
||||
]
|
||||
|
||||
print(f"Creating new project: {name}...")
|
||||
return self._post("Project", payload)
|
||||
|
||||
def update_entity_udfs(self, entity_id: int, entity_type: str, udf_data: dict):
|
||||
"""
|
||||
Updates UDFs for a given entity (Contact or Person).
|
||||
Args:
|
||||
entity_id (int): ID of the entity.
|
||||
entity_type (str): 'Contact' or 'Person'.
|
||||
udf_data (dict): Dictionary with ProgId:Value pairs for UDFs.
|
||||
Returns:
|
||||
dict: The updated entity object from the API, or None on failure.
|
||||
"""
|
||||
# We need to GET the existing entity, update its UDFs, then PUT it back.
|
||||
endpoint = f"{entity_type}/{entity_id}"
|
||||
existing_entity = self._get(endpoint)
|
||||
if not existing_entity:
|
||||
print(f"❌ Failed to retrieve existing {entity_type} {entity_id} for UDF update.")
|
||||
return None
|
||||
|
||||
if "UserDefinedFields" not in existing_entity:
|
||||
existing_entity["UserDefinedFields"] = {}
|
||||
|
||||
existing_entity["UserDefinedFields"].update(udf_data)
|
||||
|
||||
print(f"Updating {entity_type} {entity_id} UDFs: {udf_data}...")
|
||||
return self._put(endpoint, existing_entity)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user