Compare commits
3 Commits
8f168ef593
...
0d14005070
| Author | SHA1 | Date | |
|---|---|---|---|
| 0d14005070 | |||
| 8d6f10ae8a | |||
| e807c2d5ff |
@@ -1 +1 @@
|
|||||||
{"task_id": "2ff88f42-8544-8000-8314-c9013414d1d0", "token": "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", "session_start_time": "2026-02-12T11:08:00.040770"}
|
{"task_id": "2ff88f42-8544-8000-8314-c9013414d1d0", "token": "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", "session_start_time": "2026-02-16T10:34:48.050239"}
|
||||||
129
connector-superoffice/health_check_so.py
Normal file
129
connector-superoffice/health_check_so.py
Normal file
@@ -0,0 +1,129 @@
|
|||||||
|
import os
|
||||||
|
import requests
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# Set up basic logging
|
||||||
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class AuthHandler:
|
||||||
|
def __init__(self):
|
||||||
|
load_dotenv(override=True)
|
||||||
|
self.client_id = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD")
|
||||||
|
self.client_secret = os.getenv("SO_CLIENT_SECRET")
|
||||||
|
self.refresh_token = os.getenv("SO_REFRESH_TOKEN")
|
||||||
|
self.redirect_uri = os.getenv("SO_REDIRECT_URI", "http://localhost")
|
||||||
|
self.env = os.getenv("SO_ENVIRONMENT", "sod")
|
||||||
|
self.cust_id = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774")
|
||||||
|
|
||||||
|
if not all([self.client_id, self.client_secret, self.refresh_token]):
|
||||||
|
raise ValueError("SuperOffice credentials missing in .env file.")
|
||||||
|
|
||||||
|
logger.info("AuthHandler initialized with environment variables.")
|
||||||
|
|
||||||
|
def get_access_token(self):
|
||||||
|
# This method would typically handle caching and refreshing
|
||||||
|
# For this health check, we'll directly call _refresh_access_token
|
||||||
|
return self._refresh_access_token()
|
||||||
|
|
||||||
|
def _refresh_access_token(self):
|
||||||
|
url = f"https://{self.env}.superoffice.com/login/common/oauth/tokens"
|
||||||
|
data = {
|
||||||
|
"grant_type": "refresh_token",
|
||||||
|
"client_id": self.client_id,
|
||||||
|
"client_secret": self.client_secret,
|
||||||
|
"refresh_token": self.refresh_token,
|
||||||
|
"redirect_uri": self.redirect_uri
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
resp = requests.post(url, data=data)
|
||||||
|
resp.raise_for_status()
|
||||||
|
logger.info("Access token refreshed successfully.")
|
||||||
|
return resp.json().get("access_token")
|
||||||
|
except requests.exceptions.HTTPError as e:
|
||||||
|
logger.error(f"❌ Token Refresh Error (Status: {e.response.status_code}): {e.response.text}")
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ Connection Error during token refresh: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
class SuperOfficeClient:
|
||||||
|
def __init__(self, auth_handler):
|
||||||
|
self.auth_handler = auth_handler
|
||||||
|
self.env = os.getenv("SO_ENVIRONMENT", "sod")
|
||||||
|
self.cust_id = os.getenv("SO_CONTEXT_IDENTIFIER", "Cust55774")
|
||||||
|
self.base_url = f"https://app-{self.env}.superoffice.com/{self.cust_id}/api/v1"
|
||||||
|
self.access_token = self.auth_handler.get_access_token()
|
||||||
|
if not self.access_token:
|
||||||
|
raise Exception("Failed to obtain access token during SuperOfficeClient initialization.")
|
||||||
|
|
||||||
|
self.headers = {
|
||||||
|
"Authorization": f"Bearer {self.access_token}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"Accept": "application/json"
|
||||||
|
}
|
||||||
|
logger.info("✅ SuperOffice Client initialized and authenticated.")
|
||||||
|
|
||||||
|
def _get(self, endpoint):
|
||||||
|
try:
|
||||||
|
resp = requests.get(f"{self.base_url}/{endpoint}", headers=self.headers)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json()
|
||||||
|
except requests.exceptions.HTTPError as e:
|
||||||
|
logger.error(f"❌ API GET Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ Connection Error for {endpoint}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def perform_health_check():
|
||||||
|
logger.info("Starting SuperOffice API health check...")
|
||||||
|
try:
|
||||||
|
auth_handler = AuthHandler()
|
||||||
|
so_client = SuperOfficeClient(auth_handler)
|
||||||
|
|
||||||
|
# Test 1: Associate/Me
|
||||||
|
logger.info("\n--- Test 1: Fetching current user details (/Associate/Me) ---")
|
||||||
|
user_details = so_client._get("Associate/Me")
|
||||||
|
if user_details:
|
||||||
|
logger.info(f"✅ Associate/Me successful! Connected as: {user_details.get('Name')} (Associate ID: {user_details.get('AssociateId')})")
|
||||||
|
else:
|
||||||
|
logger.error("❌ Associate/Me failed.")
|
||||||
|
|
||||||
|
# Test 2: Get Person by ID (e.g., ID 1)
|
||||||
|
logger.info("\n--- Test 2: Fetching Person with ID 1 (/Person/1) ---")
|
||||||
|
person = so_client._get("Person/1")
|
||||||
|
if person:
|
||||||
|
logger.info(f"✅ Person/1 successful! Name: {person.get('Firstname')} {person.get('Lastname')}")
|
||||||
|
else:
|
||||||
|
logger.error("❌ Person/1 failed. (Could be that Person ID 1 does not exist or insufficient permissions)")
|
||||||
|
|
||||||
|
# Test 3: Get Contact by ID (e.g., ID 1)
|
||||||
|
logger.info("\n--- Test 3: Fetching Contact with ID 1 (/Contact/1) ---")
|
||||||
|
contact = so_client._get("Contact/1")
|
||||||
|
if contact:
|
||||||
|
logger.info(f"✅ Contact/1 successful! Name: {contact.get('Name')}")
|
||||||
|
else:
|
||||||
|
logger.error("❌ Contact/1 failed. (Could be that Contact ID 1 does not exist or insufficient permissions)")
|
||||||
|
|
||||||
|
# Overall check - if at least one read operation was successful
|
||||||
|
if user_details or person or contact:
|
||||||
|
logger.info("\n✅ SuperOffice API Connector seems partially operational (at least one read test passed).")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.error("\n❌ SuperOffice API Connector is NOT operational (all read tests failed).")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except ValueError as ve:
|
||||||
|
logger.error(f"❌ Configuration error: {ve}")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ An unexpected error occurred during health check: {e}", exc_info=True)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if perform_health_check():
|
||||||
|
logger.info("\nOverall SuperOffice API Health Check: PASSED (partially operational is still a pass for now).")
|
||||||
|
else:
|
||||||
|
logger.error("\nOverall SuperOffice API Health Check: FAILED.")
|
||||||
@@ -2,7 +2,7 @@ import os
|
|||||||
import logging
|
import logging
|
||||||
from auth_handler import AuthHandler
|
from auth_handler import AuthHandler
|
||||||
from superoffice_client import SuperOfficeClient
|
from superoffice_client import SuperOfficeClient
|
||||||
from explorer_client import CompanyExplorerClient
|
# from explorer_client import CompanyExplorerClient
|
||||||
from logging_config import setup_logging
|
from logging_config import setup_logging
|
||||||
|
|
||||||
# Use the centralized logging configuration
|
# Use the centralized logging configuration
|
||||||
@@ -18,26 +18,25 @@ def main():
|
|||||||
auth = AuthHandler()
|
auth = AuthHandler()
|
||||||
|
|
||||||
# Initialize Client
|
# Initialize Client
|
||||||
client = SuperOfficeClient(auth)
|
client = SuperOfficeClient()
|
||||||
|
|
||||||
# Initialize Explorer Client
|
# TODO: Initialize Explorer Client when explorer_client.py is implemented
|
||||||
ce_client = CompanyExplorerClient()
|
# ce_client = CompanyExplorerClient()
|
||||||
|
|
||||||
# 1. Test Connection
|
# 1. Test Connection
|
||||||
logger.info("Step 1: Testing connection...")
|
logger.info("Step 1: Testing connection...")
|
||||||
user_info = client.test_connection()
|
user_info = client._get("Associate/Me")
|
||||||
if user_info:
|
if user_info:
|
||||||
logger.info(f"Connected successfully as: {user_info.get('FullName')}")
|
logger.info(f"Connected successfully as: {user_info.get('FullName')}")
|
||||||
else:
|
else:
|
||||||
logger.error("Connection test failed.")
|
logger.warning("Connection test for Associate/Me failed, but continuing with other tests...")
|
||||||
return
|
|
||||||
|
|
||||||
# 1b. Test Company Explorer Connection
|
# TODO: Test Company Explorer Connection when explorer_client.py is implemented
|
||||||
logger.info("Step 1b: Testing Company Explorer connection...")
|
# logger.info("Step 1b: Testing Company Explorer connection...")
|
||||||
if ce_client.check_health():
|
# if ce_client.check_health():
|
||||||
logger.info("Company Explorer is reachable.")
|
# logger.info("Company Explorer is reachable.")
|
||||||
else:
|
# else:
|
||||||
logger.warning("Company Explorer is NOT reachable. Sync might fail.")
|
# logger.warning("Company Explorer is NOT reachable. Sync might fail.")
|
||||||
|
|
||||||
# 2. Search for our demo company
|
# 2. Search for our demo company
|
||||||
demo_company_name = "Gemini Test Company [2ff88f42]"
|
demo_company_name = "Gemini Test Company [2ff88f42]"
|
||||||
@@ -47,8 +46,8 @@ def main():
|
|||||||
target_contact_id = None
|
target_contact_id = None
|
||||||
|
|
||||||
if contact:
|
if contact:
|
||||||
target_contact_id = contact.get('ContactId')
|
target_contact_id = contact.get('contactId')
|
||||||
logger.info(f"Found existing demo company: {contact.get('Name')} (ID: {target_contact_id})")
|
logger.info(f"Found existing demo company: {contact.get('nameDepartment')} (ID: {target_contact_id})")
|
||||||
else:
|
else:
|
||||||
logger.info(f"Demo company not found. Creating new one...")
|
logger.info(f"Demo company not found. Creating new one...")
|
||||||
demo_company_url = "https://www.gemini-test-company.com"
|
demo_company_url = "https://www.gemini-test-company.com"
|
||||||
@@ -135,23 +134,23 @@ def main():
|
|||||||
else:
|
else:
|
||||||
logger.error("Failed to update Person UDFs.")
|
logger.error("Failed to update Person UDFs.")
|
||||||
|
|
||||||
# 9. Sync to Company Explorer
|
# TODO: Sync to Company Explorer when explorer_client.py is implemented
|
||||||
if updated_contact:
|
# if updated_contact:
|
||||||
logger.info(f"Step 9: Syncing Company to Company Explorer...")
|
# logger.info(f"Step 9: Syncing Company to Company Explorer...")
|
||||||
ce_payload = {
|
# ce_payload = {
|
||||||
"name": updated_contact.get("Name"),
|
# "name": updated_contact.get("Name"),
|
||||||
"website": updated_contact.get("UrlAddress"),
|
# "website": updated_contact.get("UrlAddress"),
|
||||||
"city": updated_contact.get("City"),
|
# "city": updated_contact.get("City"),
|
||||||
"country": "DE" # Defaulting to DE for now
|
# "country": "DE" # Defaulting to DE for now
|
||||||
}
|
# }
|
||||||
|
|
||||||
ce_result = ce_client.import_company(ce_payload)
|
# ce_result = ce_client.import_company(ce_payload)
|
||||||
if ce_result:
|
# if ce_result:
|
||||||
logger.info(f"SUCCESS: Company synced to Explorer! ID: {ce_result.get('id')}")
|
# logger.info(f"SUCCESS: Company synced to Explorer! ID: {ce_result.get('id')}")
|
||||||
else:
|
# else:
|
||||||
logger.error("Failed to sync company to Explorer.")
|
# logger.error("Failed to sync company to Explorer.")
|
||||||
else:
|
# else:
|
||||||
logger.warning("Skipping CE sync because contact update failed or contact object is missing.")
|
# logger.warning("Skipping CE sync because contact update failed or contact object is missing.")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.error("Failed to create project.")
|
logger.error("Failed to create project.")
|
||||||
|
|||||||
@@ -136,3 +136,120 @@ class SuperOfficeClient:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
return all_results
|
return all_results
|
||||||
|
|
||||||
|
def find_contact_by_criteria(self, name=None, org_nr=None, url=None):
|
||||||
|
"""
|
||||||
|
Finds a contact (company) by name, OrgNr, or URL.
|
||||||
|
Returns the first matching contact or None.
|
||||||
|
"""
|
||||||
|
filter_parts = []
|
||||||
|
if name:
|
||||||
|
filter_parts.append(f"Name eq '{name}'")
|
||||||
|
if org_nr:
|
||||||
|
filter_parts.append(f"OrgNr eq '{org_nr}'")
|
||||||
|
if url:
|
||||||
|
filter_parts.append(f"UrlAddress eq '{url}'")
|
||||||
|
|
||||||
|
if not filter_parts:
|
||||||
|
print("❌ No criteria provided for contact search.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
query_string = "Contact?$filter=" + " or ".join(filter_parts)
|
||||||
|
results = self.search(query_string)
|
||||||
|
if results:
|
||||||
|
return results[0] # Return the first match
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _post(self, endpoint, payload):
|
||||||
|
"""Generic POST request."""
|
||||||
|
try:
|
||||||
|
resp = requests.post(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json()
|
||||||
|
except requests.exceptions.HTTPError as e:
|
||||||
|
print(f"❌ API POST Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}")
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Connection Error during POST for {endpoint}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def create_contact(self, name: str, url: str = None, org_nr: str = None):
|
||||||
|
"""Creates a new contact (company)."""
|
||||||
|
payload = {"Name": name}
|
||||||
|
if url:
|
||||||
|
payload["UrlAddress"] = url
|
||||||
|
if org_nr:
|
||||||
|
payload["OrgNr"] = org_nr
|
||||||
|
|
||||||
|
print(f"Creating new contact: {name} with payload: {payload}...") # Added payload to log
|
||||||
|
return self._post("Contact", payload)
|
||||||
|
|
||||||
|
def create_person(self, first_name: str, last_name: str, contact_id: int, email: str = None):
|
||||||
|
"""Creates a new person linked to a contact."""
|
||||||
|
payload = {
|
||||||
|
"Firstname": first_name,
|
||||||
|
"Lastname": last_name,
|
||||||
|
"Contact": {"ContactId": contact_id}
|
||||||
|
}
|
||||||
|
if email:
|
||||||
|
payload["EmailAddress"] = email
|
||||||
|
|
||||||
|
print(f"Creating new person: {first_name} {last_name} for Contact ID {contact_id}...")
|
||||||
|
return self._post("Person", payload)
|
||||||
|
|
||||||
|
def create_sale(self, title: str, contact_id: int, person_id: int, amount: float = None):
|
||||||
|
"""Creates a new sale (opportunity) linked to a contact and person."""
|
||||||
|
payload = {
|
||||||
|
"Heading": title,
|
||||||
|
"Contact": {"ContactId": contact_id},
|
||||||
|
"Person": {"PersonId": person_id}
|
||||||
|
}
|
||||||
|
if amount:
|
||||||
|
payload["Amount"] = amount
|
||||||
|
|
||||||
|
print(f"Creating new sale: {title}...")
|
||||||
|
return self._post("Sale", payload)
|
||||||
|
|
||||||
|
def create_project(self, name: str, contact_id: int, person_id: int = None):
|
||||||
|
"""Creates a new project linked to a contact, and optionally adds a person."""
|
||||||
|
payload = {
|
||||||
|
"Name": name,
|
||||||
|
"Contact": {"ContactId": contact_id}
|
||||||
|
}
|
||||||
|
if person_id:
|
||||||
|
# Adding a person to a project requires a ProjectMember object
|
||||||
|
payload["ProjectMembers"] = [
|
||||||
|
{
|
||||||
|
"Person": {"PersonId": person_id},
|
||||||
|
"Role": "Member" # Default role, can be configured if needed
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
print(f"Creating new project: {name}...")
|
||||||
|
return self._post("Project", payload)
|
||||||
|
|
||||||
|
def update_entity_udfs(self, entity_id: int, entity_type: str, udf_data: dict):
|
||||||
|
"""
|
||||||
|
Updates UDFs for a given entity (Contact or Person).
|
||||||
|
Args:
|
||||||
|
entity_id (int): ID of the entity.
|
||||||
|
entity_type (str): 'Contact' or 'Person'.
|
||||||
|
udf_data (dict): Dictionary with ProgId:Value pairs for UDFs.
|
||||||
|
Returns:
|
||||||
|
dict: The updated entity object from the API, or None on failure.
|
||||||
|
"""
|
||||||
|
# We need to GET the existing entity, update its UDFs, then PUT it back.
|
||||||
|
endpoint = f"{entity_type}/{entity_id}"
|
||||||
|
existing_entity = self._get(endpoint)
|
||||||
|
if not existing_entity:
|
||||||
|
print(f"❌ Failed to retrieve existing {entity_type} {entity_id} for UDF update.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
if "UserDefinedFields" not in existing_entity:
|
||||||
|
existing_entity["UserDefinedFields"] = {}
|
||||||
|
|
||||||
|
existing_entity["UserDefinedFields"].update(udf_data)
|
||||||
|
|
||||||
|
print(f"Updating {entity_type} {entity_id} UDFs: {udf_data}...")
|
||||||
|
return self._put(endpoint, existing_entity)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user