[2ff88f42] refactor(connector-superoffice): finalize production readiness cleanup

- Integrated centralized logging system in all modules.
- Extracted all IDs and ProgIds into a separate .
- Refactored  and  for cleaner dependency management.
- Included updated discovery and inspection utilities.
- Verified end-to-end workflow stability.
This commit is contained in:
2026-02-10 12:43:26 +00:00
parent 1ea17695ec
commit 005c947dae
9 changed files with 185 additions and 255 deletions

View File

@@ -1,38 +1,37 @@
import os
import logging
import time
import requests
from config import Config
from logging_config import setup_logging
logger = logging.getLogger(__name__)
logger = setup_logging(__name__)
class AuthHandler:
def __init__(self):
# Load configuration from environment
self.client_id = os.getenv("SO_CLIENT_ID")
if not self.client_id:
self.client_id = os.getenv("SO_SOD")
if self.client_id:
logger.info("Using SO_SOD as Client ID")
else:
logger.info("Using SO_CLIENT_ID as Client ID")
# Load configuration from Config class
self.client_id = Config.SO_CLIENT_ID
self.client_secret = Config.SO_CLIENT_SECRET
self.refresh_token = Config.SO_REFRESH_TOKEN
self.tenant_id = Config.SO_CONTEXT_IDENTIFIER # e.g., Cust55774
self.client_secret = os.getenv("SO_CLIENT_SECRET")
self.refresh_token = os.getenv("SO_REFRESH_TOKEN")
self.tenant_id = os.getenv("SO_CONTEXT_IDENTIFIER") # e.g., Cust55774
# OAuth Token Endpoint for SOD
# OAuth Token Endpoint for SOD (Could be configurable in future)
self.token_url = "https://sod.superoffice.com/login/common/oauth/tokens"
self._access_token = None
self._webapi_url = None
self._expiry = 0
if not self.client_id:
logger.error("SO_CLIENT_ID (or SO_SOD) is not set in environment!")
def get_ticket(self):
if self._access_token and time.time() < self._expiry:
return self._access_token, self._webapi_url
return self.refresh_access_token()
def refresh_access_token(self):
if not self.client_id:
raise ValueError("Client ID is missing. Cannot refresh token.")
logger.info(f"Refreshing Access Token for Client ID: {self.client_id[:5]}...")
payload = {

View File

@@ -0,0 +1,45 @@
import os
from dotenv import load_dotenv
# Load environment variables
if os.path.exists(".env"):
load_dotenv(".env", override=True)
elif os.path.exists("../.env"):
load_dotenv("../.env", override=True)
class Config:
# SuperOffice API Configuration
SO_CLIENT_ID = os.getenv("SO_SOD")
SO_CLIENT_SECRET = os.getenv("SO_CLIENT_SECRET")
SO_CONTEXT_IDENTIFIER = os.getenv("SO_CONTEXT_IDENTIFIER")
SO_REFRESH_TOKEN = os.getenv("SO_REFRESH_TOKEN")
# Company Explorer Configuration
CE_API_URL = os.getenv("CE_API_URL", "http://company-explorer:8000")
CE_API_USER = os.getenv("CE_API_USER", "admin")
CE_API_PASSWORD = os.getenv("CE_API_PASSWORD", "gemini")
# UDF Mapping (ProgIds) - Defaulting to SOD values, should be overridden in Prod
UDF_CONTACT_MAPPING = {
"ai_challenge_sentence": os.getenv("UDF_CONTACT_CHALLENGE", "SuperOffice:1"),
"ai_sentence_timestamp": os.getenv("UDF_CONTACT_TIMESTAMP", "SuperOffice:2"),
"ai_sentence_source_hash": os.getenv("UDF_CONTACT_HASH", "SuperOffice:3"),
"ai_last_outreach_date": os.getenv("UDF_CONTACT_OUTREACH", "SuperOffice:4")
}
UDF_PERSON_MAPPING = {
"ai_email_draft": os.getenv("UDF_PERSON_DRAFT", "SuperOffice:1"),
"ma_status": os.getenv("UDF_PERSON_STATUS", "SuperOffice:2")
}
# MA Status ID Mapping (Text -> ID) - Defaulting to discovered SOD values
MA_STATUS_ID_MAP = {
"Ready_to_Send": int(os.getenv("MA_STATUS_ID_READY", 11)),
"Sent_Week1": int(os.getenv("MA_STATUS_ID_WEEK1", 12)),
"Sent_Week2": int(os.getenv("MA_STATUS_ID_WEEK2", 13)),
"Bounced": int(os.getenv("MA_STATUS_ID_BOUNCED", 14)),
"Soft_Denied": int(os.getenv("MA_STATUS_ID_DENIED", 15)),
"Interested": int(os.getenv("MA_STATUS_ID_INTERESTED", 16)),
"Out_of_Office": int(os.getenv("MA_STATUS_ID_OOO", 17)),
"Unsubscribed": int(os.getenv("MA_STATUS_ID_UNSUB", 18))
}

View File

@@ -1,77 +1,48 @@
import os
import logging
import json
from dotenv import load_dotenv
from config import Config
from logging_config import setup_logging
from auth_handler import AuthHandler
from superoffice_client import SuperOfficeClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger = setup_logging("discovery")
def get_list_items_by_prog_id(client, prog_id, entity_name):
"""Fetches and prints list items for a specific ProgId."""
logger.info(f"--- Fetching list items for {entity_name} ProgId: {prog_id} ---")
# The endpoint for user-defined lists is typically generic
list_url = client._get_url(f"v1/List/UserDefinedField/{prog_id}")
try:
list_resp = client.session.get(list_url, headers=client._get_headers())
list_resp.raise_for_status()
list_items = list_resp.json()
if list_items.get("value"):
print(" --- List Items Found ---")
print(f" --- List Items Found for {prog_id} ---")
for item in list_items["value"]:
print(f" ID: {item.get('Id'):<5} | Name: {item.get('Name')}")
print(" ------------------------")
return {item.get('Name'): item.get('Id') for item in list_items["value"]}
else:
print(" (No list items found or unexpected response structure)")
return None
except Exception as list_e:
logger.error(f" Failed to fetch list items for {prog_id}: {list_e}")
if hasattr(list_e, 'response') and list_e.response is not None:
logger.error(f" List fetch details: {list_e.response.text}")
except Exception as e:
logger.error(f"Failed to fetch list items for {prog_id}: {e}")
return None
def get_activity_types(client):
"""Fetches available activity types."""
logger.info("--- Fetching Activity Types ---")
# Common endpoint for activity types
activity_type_url = client._get_url("v1/ActivityType") # Trying direct ActivityType endpoint
url = client._get_url("v1/ActivityType")
try:
resp = client.session.get(activity_type_url, headers=client._get_headers())
resp = client.session.get(url, headers=client._get_headers())
resp.raise_for_status()
activity_types = resp.json()
if activity_types:
print(" --- Activity Types Found ---")
for atype in activity_types:
print(f" ID: {atype.get('Id'):<5} | Name: {atype.get('Name')}")
print(" ------------------------")
return {atype.get('Name'): atype.get('Id') for atype in activity_types}
else:
print(" (No activity types found or unexpected response structure)")
return None
except Exception as e:
logger.error(f" Failed to fetch activity types: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f" Activity type fetch details: {e.response.text}")
logger.error(f"Failed to fetch activity types: {e}")
return None
def main():
load_dotenv(dotenv_path="../.env")
auth = AuthHandler()
client = SuperOfficeClient(auth)
# --- We know the ProgIds, so we query them directly ---
# ProgId for the "MA Status" list on the Person entity
#person_ma_status_prog_id = "SuperOffice:2" # Keep for future reference
#get_list_items_by_prog_id(client, person_ma_status_prog_id, "Person MA Status")
get_activity_types(client)
if __name__ == "__main__":

View File

@@ -1,9 +1,10 @@
import requests
import logging
import os
import base64
from config import Config
from logging_config import setup_logging
logger = logging.getLogger(__name__)
# Use the centralized logging configuration
logger = setup_logging(__name__)
class CompanyExplorerClient:
"""
@@ -11,11 +12,11 @@ class CompanyExplorerClient:
Handles authentication and data synchronization.
"""
def __init__(self, base_url=None, api_user=None, api_password=None):
# Default to Docker bridge IP for testing from session container
# In production, this can be overridden via CE_API_URL env var
self.base_url = base_url or os.getenv("CE_API_URL", "http://172.17.0.1:8000")
self.api_user = api_user or os.getenv("CE_API_USER", "admin")
self.api_password = api_password or os.getenv("CE_API_PASSWORD", "gemini")
# Prefer Config values, allow overrides
self.base_url = base_url or Config.CE_API_URL
self.api_user = api_user or Config.CE_API_USER
self.api_password = api_password or Config.CE_API_PASSWORD
self.session = requests.Session()
# Setup Basic Auth

View File

@@ -1,40 +1,25 @@
import os
import logging
import json
from dotenv import load_dotenv
from config import Config
from logging_config import setup_logging
from auth_handler import AuthHandler
from superoffice_client import SuperOfficeClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger = setup_logging("inspector")
def inspect_person(person_id):
load_dotenv(dotenv_path="../.env")
auth = AuthHandler()
client = SuperOfficeClient(auth)
logger.info(f"Fetching Person with ID {person_id} to inspect structure...")
url = client._get_url(f"v1/Person/{person_id}")
try:
resp = client.session.get(url, headers=client._get_headers())
resp.raise_for_status()
person_data = resp.json()
print(f"\n--- PERSON STRUCTURE (ID: {person_id}) ---")
print(json.dumps(person_data, indent=2))
print("\n--- USER DEFINED FIELDS FOR THIS PERSON ---")
if person_data.get("UserDefinedFields"):
print(json.dumps(person_data["UserDefinedFields"], indent=2))
else:
print("(No UserDefinedFields found)")
except Exception as e:
logger.error(f"Failed to fetch person data: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Details: {e.response.text}")
if __name__ == "__main__":
# Use the specific person ID provided by the user
target_person_id = 9
inspect_person(target_person_id)

View File

@@ -0,0 +1,42 @@
import logging
import os
from logging.handlers import RotatingFileHandler
def setup_logging(name="connector", log_level=logging.INFO):
"""
Sets up a robust logging configuration.
Logs to console and to a rotating file.
"""
# Create logs directory if it doesn't exist
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logger = logging.getLogger(name)
logger.setLevel(log_level)
# Avoid duplicate handlers if setup is called multiple times
if logger.handlers:
return logger
# Formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Console Handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File Handler (Rotating: 5MB size, keep last 3 files)
file_handler = RotatingFileHandler(
os.path.join(log_dir, f"{name}.log"),
maxBytes=5*1024*1024,
backupCount=3
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger

View File

@@ -1,25 +1,15 @@
import os
import logging
from dotenv import load_dotenv
from auth_handler import AuthHandler
from superoffice_client import SuperOfficeClient
from explorer_client import CompanyExplorerClient
from logging_config import setup_logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Use the centralized logging configuration
logger = setup_logging(__name__)
def main():
# Load .env from the root directory
# If running from /app, .env is in the same directory.
# If running from /app/connector-superoffice, it's in ../.env
if os.path.exists(".env"):
load_dotenv(".env", override=True)
elif os.path.exists("../.env"):
load_dotenv("../.env", override=True)
# Note: Environment loading is now handled by config.py/helpers implicitly when clients are initialized
logger.info("Starting SuperOffice Connector S2S POC...")
@@ -55,11 +45,9 @@ def main():
contact = client.find_contact_by_criteria(name=demo_company_name)
target_contact_id = None
contact_obj = None # Store the full contact object
if contact:
target_contact_id = contact.get('ContactId')
contact_obj = contact
logger.info(f"Found existing demo company: {contact.get('Name')} (ID: {target_contact_id})")
else:
logger.info(f"Demo company not found. Creating new one...")
@@ -73,12 +61,8 @@ def main():
)
if new_contact:
target_contact_id = new_contact.get('ContactId')
contact_obj = new_contact
logger.info(f"Created new demo company with ID: {target_contact_id}")
# ... (Steps 3-7 remain the same, I will insert the sync step at the end) ...
# 3. Create a Person linked to this company
if target_contact_id:
logger.info(f"Step 3: Creating Person for Contact ID {target_contact_id}...")

View File

@@ -1,31 +1,26 @@
import requests
import json
import os
from config import Config
from logging_config import setup_logging
logger = setup_logging("ce_parser")
def parse_openapi():
# Use CE IP directly for this local tool
url = "http://172.17.0.1:8000/openapi.json"
auth = ("admin", "gemini")
auth = (Config.CE_API_USER, Config.CE_API_PASSWORD)
try:
resp = requests.get(url, auth=auth, timeout=5)
resp.raise_for_status()
spec = resp.json()
schemas = spec.get("components", {}).get("schemas", {})
target_schemas = ["CompanyCreate", "BulkImportRequest", "CompanyUpdate"]
print("--- API SCHEMAS FOUND ---")
target_schemas = ["CompanyCreate", "BulkImportRequest"]
for schema_name in target_schemas:
if schema_name in schemas:
print(f"\nSchema: {schema_name}")
print(json.dumps(schemas[schema_name], indent=2))
else:
print(f"\nSchema {schema_name} not found.")
except Exception as e:
print(f"Error: {e}")
logger.error(f"Error parsing CE OpenAPI: {e}")
if __name__ == "__main__":
parse_openapi()

View File

@@ -1,8 +1,11 @@
import requests
import logging
from auth_handler import AuthHandler
from config import Config
from logging_config import setup_logging
logger = logging.getLogger(__name__)
# Use the centralized logging configuration
logger = setup_logging(__name__)
class SuperOfficeClient:
"""
@@ -12,31 +15,10 @@ class SuperOfficeClient:
self.auth_handler = auth_handler
self.session = requests.Session()
# Mapping for UDF fields for Contact entity
self.udf_contact_mapping = {
"ai_challenge_sentence": "SuperOffice:1",
"ai_sentence_timestamp": "SuperOffice:2",
"ai_sentence_source_hash": "SuperOffice:3",
"ai_last_outreach_date": "SuperOffice:4"
}
# Mapping for UDF fields for Person entity
self.udf_person_mapping = {
"ai_email_draft": "SuperOffice:1", # NOTE: This is currently a Date field in SO and needs to be changed to Text (Long/Memo)
"ma_status": "SuperOffice:2"
}
# Mapping for MA Status list values (Text Label -> SO ID)
self.ma_status_id_map = {
"Ready_to_Send": 11,
"Sent_Week1": 12,
"Sent_Week2": 13,
"Bounced": 14,
"Soft_Denied": 15,
"Interested": 16,
"Out_of_Office": 17,
"Unsubscribed": 18
}
# Load mappings from Config
self.udf_contact_mapping = Config.UDF_CONTACT_MAPPING
self.udf_person_mapping = Config.UDF_PERSON_MAPPING
self.ma_status_id_map = Config.MA_STATUS_ID_MAP
def _get_headers(self):
"""Returns the authorization headers with Bearer token."""
@@ -293,112 +275,38 @@ class SuperOfficeClient:
return None
# NOTE: The create_email_activity method is currently blocked due to SuperOffice environment limitations.
# Attempting to create an Email Activity via API results in a 500 Internal Server Error,
# likely because the email module is not licensed or configured in the SOD environment.
# This method is temporarily commented out.
#
# def create_email_activity(self, person_id, contact_id, subject, body):
# """Creates an Email Activity linked to a person and contact."""
# url = self._get_url("v1/Activity")
#
# payload = {
# "Type": { # Assuming ID 2 for "Email" ActivityType
# "Id": 2
# },
# "Title": subject,
# "Details": body,
# "Person": {
# "PersonId": person_id
# },
# "Contact": {
# "ContactId": contact_id
# }
# }
#
# try:
# logger.info(f"Attempting to create Email Activity with subject '{subject}' for Person ID {person_id} and Contact ID {contact_id}")
# resp = self.session.post(url, headers=self._get_headers(), json=payload)
# resp.raise_for_status()
# created_activity = resp.json()
# logger.info(f"Successfully created Email Activity: '{created_activity.get('Title')}' (ID: {created_activity.get('ActivityId')})")
# return created_activity
# except Exception as e:
# logger.error(f"Error creating Email Activity: {e}")
# if hasattr(e, 'response') and e.response is not None:
# logger.error(f"Response: {e.response.text}")
# return None