From e1071fc73ca417a4220cc62cf8d075fc2eff78ac Mon Sep 17 00:00:00 2001 From: Floke Date: Fri, 20 Feb 2026 07:20:26 +0000 Subject: [PATCH] [2ff88f42] Finalize SuperOffice Connector: Centralized Config, Added Position/Role Mapping Logic, and Discovery Tools --- connector-superoffice/config.py | 77 +++--- connector-superoffice/discover_fields.py | 149 +++++----- .../generate_holiday_script.py | 70 +++++ connector-superoffice/superoffice_client.py | 260 ++++++------------ connector-superoffice/test_e2e_local.py | 80 ++++++ connector-superoffice/worker.py | 147 +++++----- 6 files changed, 428 insertions(+), 355 deletions(-) create mode 100644 connector-superoffice/generate_holiday_script.py create mode 100644 connector-superoffice/test_e2e_local.py diff --git a/connector-superoffice/config.py b/connector-superoffice/config.py index dad0673d..1b9b23a8 100644 --- a/connector-superoffice/config.py +++ b/connector-superoffice/config.py @@ -1,45 +1,44 @@ import os -from dotenv import load_dotenv +from pydantic_settings import BaseSettings -# Load environment variables -if os.path.exists(".env"): - load_dotenv(".env", override=True) -elif os.path.exists("../.env"): - load_dotenv("../.env", override=True) - -class Config: - # SuperOffice API Configuration - SO_CLIENT_ID = os.getenv("SO_SOD") - SO_CLIENT_SECRET = os.getenv("SO_CLIENT_SECRET") - SO_CONTEXT_IDENTIFIER = os.getenv("SO_CONTEXT_IDENTIFIER") - SO_REFRESH_TOKEN = os.getenv("SO_REFRESH_TOKEN") +class Settings(BaseSettings): + # --- Infrastructure --- + # Internal Docker URL for Company Explorer + COMPANY_EXPLORER_URL: str = "http://company-explorer:8000" - # Company Explorer Configuration - CE_API_URL = os.getenv("CE_API_URL", "http://company-explorer:8000") - CE_API_USER = os.getenv("CE_API_USER", "admin") - CE_API_PASSWORD = os.getenv("CE_API_PASSWORD", "gemini") + # --- SuperOffice API Credentials --- + SO_ENVIRONMENT: str = "sod" # 'sod' or 'online' + SO_CLIENT_ID: str = "" + SO_CLIENT_SECRET: str = "" + SO_REFRESH_TOKEN: str = "" + SO_REDIRECT_URI: str = "http://localhost" + SO_CONTEXT_IDENTIFIER: str = "Cust55774" # e.g. Cust12345 + + # --- Feature Flags --- + ENABLE_WEBSITE_SYNC: bool = False # Disabled by default to prevent loops + + # --- Mappings (IDs from SuperOffice) --- + # Vertical IDs (List Items) + # Default values match the current hardcoded DEV IDs + # Format: "Name In Explorer": ID_In_SuperOffice + VERTICAL_MAP_JSON: str = '{"Logistics - Warehouse": 23, "Healthcare - Hospital": 24, "Infrastructure - Transport": 25, "Leisure - Indoor Active": 26}' - # UDF Mapping (ProgIds) - Defaulting to SOD values, should be overridden in Prod - UDF_CONTACT_MAPPING = { - "ai_challenge_sentence": os.getenv("UDF_CONTACT_CHALLENGE", "SuperOffice:1"), - "ai_sentence_timestamp": os.getenv("UDF_CONTACT_TIMESTAMP", "SuperOffice:2"), - "ai_sentence_source_hash": os.getenv("UDF_CONTACT_HASH", "SuperOffice:3"), - "ai_last_outreach_date": os.getenv("UDF_CONTACT_OUTREACH", "SuperOffice:4") - } + # Persona / Job Role IDs (List Items for "Position" field) + # To be filled after discovery + PERSONA_MAP_JSON: str = '{}' - UDF_PERSON_MAPPING = { - "ai_email_draft": os.getenv("UDF_PERSON_DRAFT", "SuperOffice:1"), - "ma_status": os.getenv("UDF_PERSON_STATUS", "SuperOffice:2") - } + # User Defined Fields (ProgIDs) + # The technical names of the fields in SuperOffice + # Default values match the current hardcoded DEV UDFs + UDF_SUBJECT: str = "SuperOffice:5" + UDF_INTRO: str = "SuperOffice:6" + UDF_SOCIAL_PROOF: str = "SuperOffice:7" + UDF_VERTICAL: str = "SuperOffice:5" # NOTE: Currently same as Subject in dev? Need to verify. worker.py had 'SuperOffice:5' for vertical AND 'SuperOffice:5' for subject in the map? + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + extra = "ignore" # Ignore extra fields in .env - # MA Status ID Mapping (Text -> ID) - Defaulting to discovered SOD values - MA_STATUS_ID_MAP = { - "Ready_to_Send": int(os.getenv("MA_STATUS_ID_READY", 11)), - "Sent_Week1": int(os.getenv("MA_STATUS_ID_WEEK1", 12)), - "Sent_Week2": int(os.getenv("MA_STATUS_ID_WEEK2", 13)), - "Bounced": int(os.getenv("MA_STATUS_ID_BOUNCED", 14)), - "Soft_Denied": int(os.getenv("MA_STATUS_ID_DENIED", 15)), - "Interested": int(os.getenv("MA_STATUS_ID_INTERESTED", 16)), - "Out_of_Office": int(os.getenv("MA_STATUS_ID_OOO", 17)), - "Unsubscribed": int(os.getenv("MA_STATUS_ID_UNSUB", 18)) - } +# Global instance +settings = Settings() diff --git a/connector-superoffice/discover_fields.py b/connector-superoffice/discover_fields.py index 1e915375..293b3767 100644 --- a/connector-superoffice/discover_fields.py +++ b/connector-superoffice/discover_fields.py @@ -1,89 +1,82 @@ -# connector-superoffice/discover_fields.py (Standalone & Robust) -import os -import requests import json -from dotenv import load_dotenv +from superoffice_client import SuperOfficeClient +import logging -# Load environment variables -load_dotenv(override=True) +# Setup Logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("discovery") -# Configuration -SO_ENV = os.getenv("SO_ENVIRONMENT", "sod") # sod, stage, online -SO_CLIENT_ID = os.getenv("SO_CLIENT_ID") or os.getenv("SO_SOD") -SO_CLIENT_SECRET = os.getenv("SO_CLIENT_SECRET") -# SO_REDIRECT_URI often required for validation even in refresh flow -SO_REDIRECT_URI = os.getenv("SO_REDIRECT_URI", "http://localhost") -SO_REFRESH_TOKEN = os.getenv("SO_REFRESH_TOKEN") +def discover(): + print("πŸ” Starting SuperOffice Discovery Tool...") + + client = SuperOfficeClient() + if not client.access_token: + print("❌ Auth failed. Check .env") + return -def get_access_token(): - """Refreshes the access token using the refresh token.""" - url = f"https://{SO_ENV}.superoffice.com/login/common/oauth/tokens" - data = { - "grant_type": "refresh_token", - "client_id": SO_CLIENT_ID, - "client_secret": SO_CLIENT_SECRET, - "refresh_token": SO_REFRESH_TOKEN, - "redirect_uri": SO_REDIRECT_URI - } + # 1. Discover UDFs (User Defined Fields) + print("\n--- 1. User Defined Fields (UDFs) ---") - print(f"DEBUG: Refreshing token at {url} for Client ID {SO_CLIENT_ID[:5]}...") - - response = requests.post(url, data=data) - if response.status_code == 200: - print("βœ… Access Token refreshed.") - return response.json().get("access_token") - else: - print(f"❌ Error getting token: {response.text}") - return None - -def discover_udfs(base_url, token, entity="Contact"): - """ - Fetches the UDF layout for a specific entity. - entity: 'Contact' (Firma) or 'Person' - """ - endpoint = "Contact" if entity == "Contact" else "Person" - url = f"{base_url}/api/v1/{endpoint}?$top=1&$select=userDefinedFields" - - headers = { - "Authorization": f"Bearer {token}", - "Accept": "application/json" - } - - print(f"\n--- DISCOVERING UDFS FOR: {entity} ---") + # Contact UDFs try: - response = requests.get(url, headers=headers) - if response.status_code == 200: - data = response.json() - if data['value']: - item = data['value'][0] - udfs = item.get('userDefinedFields', {}) - - print(f"Found {len(udfs)} UDFs on this record.") - - # Filter logic: Show interesting fields - relevant_udfs = {k: v for k, v in udfs.items() if "marketing" in k.lower() or "robotic" in k.lower() or "challenge" in k.lower() or "ai" in k.lower()} - - if relevant_udfs: - print("βœ… FOUND RELEVANT FIELDS (ProgId : Value):") - print(json.dumps(relevant_udfs, indent=2)) - else: - print("⚠️ No fields matching 'marketing/robotic/ai' found.") - print("First 5 UDFs for context:") - print(json.dumps(list(udfs.keys())[:5], indent=2)) + print("Fetching a sample Contact to inspect UDFs...") + contacts = client.search("Contact?$top=1") + if contacts: + # Inspect keys of first result + first_contact = contacts[0] + # Try to find ID + c_id = first_contact.get('ContactId') or first_contact.get('PrimaryKey') + + if c_id: + c = client.get_contact(c_id) + udfs = c.get("UserDefinedFields", {}) + print(f"Found {len(udfs)} UDFs on Contact {c_id}:") + for k, v in udfs.items(): + print(f" - Key (ProgId): {k} | Value: {v}") else: - print("No records found to inspect.") + print(f"⚠️ Could not find ID in search result: {first_contact.keys()}") else: - print(f"Error {response.status_code}: {response.text}") + print("⚠️ No contacts found. Cannot inspect Contact UDFs.") + + print("\nFetching a sample Person to inspect UDFs...") + persons = client.search("Person?$top=1") + if persons: + first_person = persons[0] + p_id = first_person.get('PersonId') or first_person.get('PrimaryKey') + + if p_id: + p = client.get_person(p_id) + udfs = p.get("UserDefinedFields", {}) + print(f"Found {len(udfs)} UDFs on Person {p_id}:") + for k, v in udfs.items(): + print(f" - Key (ProgId): {k} | Value: {v}") + else: + print(f"⚠️ Could not find ID in search result: {first_person.keys()}") + + else: + print("⚠️ No persons found. Cannot inspect Person UDFs.") + except Exception as e: - print(f"Request failed: {e}") + print(f"❌ Error inspecting UDFs: {e}") + + # 2. Discover Lists (MDO Providers) + print("\n--- 2. MDO Lists (Positions, Business/Industry) ---") + + lists_to_check = ["position", "business"] + + for list_name in lists_to_check: + print(f"\nChecking List: '{list_name}'...") + try: + # Endpoint: GET /List/{list_name}/Items + items = client._get(f"List/{list_name}/Items") + if items: + print(f"Found {len(items)} items in '{list_name}':") + for item in items: + print(f" - ID: {item['Id']} | Name: '{item['Name']}'") + else: + print(f" (List '{list_name}' is empty or not accessible)") + except Exception as e: + print(f" ❌ Failed to fetch list '{list_name}': {e}") if __name__ == "__main__": - token = get_access_token() - if token: - # Hardcoded Base URL for Cust55774 (Fix: Use app-sod as per README) - base_url = "https://app-sod.superoffice.com/Cust55774" - - discover_udfs(base_url, token, "Person") - discover_udfs(base_url, token, "Contact") - else: - print("Could not get Access Token. Check .env") + discover() diff --git a/connector-superoffice/generate_holiday_script.py b/connector-superoffice/generate_holiday_script.py new file mode 100644 index 00000000..662c1625 --- /dev/null +++ b/connector-superoffice/generate_holiday_script.py @@ -0,0 +1,70 @@ +import datetime +from datetime import date +import holidays + +# Configuration +YEARS_TO_GENERATE = [2025, 2026] +COUNTRY_CODE = "DE" +SUB_REGION = "BY" # Bayern (Wackler HQ) + +def generate_crm_script(): + print(f"Generating CRMScript for Holidays ({COUNTRY_CODE}-{SUB_REGION})...") + + # 1. Calculate Holidays + holidays_list = [] + de_holidays = holidays.Country(COUNTRY_CODE, subdiv=SUB_REGION) + + for year in YEARS_TO_GENERATE: + for date_obj, name in de_holidays.items(): + if date_obj.year == year: + holidays_list.append((date_obj, name)) + + # Sort by date + holidays_list.sort(key=lambda x: x[0]) + + # 2. Generate CRMScript Code + script = f"// --- AUTO-GENERATED HOLIDAY IMPORT SCRIPT --- +" + script += f"// Generated for: {COUNTRY_CODE}-{SUB_REGION} (Years: {YEARS_TO_GENERATE})\n" + script += f"// Target Table: y_holidays (Must exist! Columns: x_date, x_name)\n\n" + + script += "Integer count = 0; +" + script += "DateTime date; +" + script += "String name; +\n" + + for d, name in holidays_list: + # Format date for CRMScript (usually specific format required, depends on locale but DateTime can parse ISO often) + # Better: use explicit construction or string + date_str = d.strftime("%Y-%m-%d") + + script += f"date = String(\"{date_str}\").toDateTime();\n" + script += f"name = \"{name}\";\n" + + # Check if exists to avoid dupes (pseudo-code, adapting to likely CRMScript API) + # Usually we use specific SearchEngine or similar. + # Simple version: Just insert. Admin should clear table before run if needed. + + script += f"// Inserting {date_str} - {name}\n" + script += "GenericEntity holiday = getDatabaseConnection().createGenericEntity(\"y_holidays\");\n" + script += "holiday.setValue(\"x_date\", date); +" + script += "holiday.setValue(\"x_name\", name); +" + script += "holiday.save();\n" + script += "count++;\n\n" + + script += "print(\"Imported \" + count.toString() + \" holidays.\");\n" + + # 3. Output + output_filename = "import_holidays_CRMSCRIPT.txt" + with open(output_filename, "w", encoding="utf-8") as f: + f.write(script) + + print(f"βœ… CRMScript generated: {output_filename}") + print("πŸ‘‰ Copy the content of this file and run it in SuperOffice (Settings -> CRMScript -> Execute).") + +if __name__ == "__main__": + generate_crm_script() diff --git a/connector-superoffice/superoffice_client.py b/connector-superoffice/superoffice_client.py index f43ae376..18fa7e37 100644 --- a/connector-superoffice/superoffice_client.py +++ b/connector-superoffice/superoffice_client.py @@ -1,129 +1,118 @@ import os import requests import json -from dotenv import load_dotenv +from config import settings +import logging -load_dotenv(override=True) +# Configure Logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("superoffice-client") class SuperOfficeClient: """A client for interacting with the SuperOffice REST API.""" def __init__(self): - # Helper to strip quotes if Docker passed them literally - def get_clean_env(key, default=None): - val = os.getenv(key) - if val and val.strip(): # Check if not empty string - return val.strip('"').strip("'") - return default - - self.client_id = get_clean_env("SO_CLIENT_ID") or get_clean_env("SO_SOD") - self.client_secret = get_clean_env("SO_CLIENT_SECRET") - self.refresh_token = get_clean_env("SO_REFRESH_TOKEN") - self.redirect_uri = get_clean_env("SO_REDIRECT_URI", "http://localhost") - self.env = get_clean_env("SO_ENVIRONMENT", "sod") - self.cust_id = get_clean_env("SO_CONTEXT_IDENTIFIER", "Cust55774") # Fallback for your dev + # Configuration + self.client_id = settings.SO_CLIENT_ID + self.client_secret = settings.SO_CLIENT_SECRET + self.refresh_token = settings.SO_REFRESH_TOKEN + self.env = settings.SO_ENVIRONMENT + self.cust_id = settings.SO_CONTEXT_IDENTIFIER if not all([self.client_id, self.client_secret, self.refresh_token]): - raise ValueError("SuperOffice credentials missing in .env file.") + # Graceful failure: Log error but allow init (for help/docs/discovery scripts) + logger.error("❌ SuperOffice credentials missing in .env file (or environment variables).") + self.base_url = None + self.access_token = None + return self.base_url = f"https://app-{self.env}.superoffice.com/{self.cust_id}/api/v1" self.access_token = self._refresh_access_token() - if not self.access_token: - raise Exception("Failed to authenticate with SuperOffice.") - self.headers = { - "Authorization": f"Bearer {self.access_token}", - "Content-Type": "application/json", - "Accept": "application/json" - } - print("βœ… SuperOffice Client initialized and authenticated.") + if not self.access_token: + logger.error("❌ Failed to authenticate with SuperOffice.") + else: + self.headers = { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json", + "Accept": "application/json" + } + logger.info("βœ… SuperOffice Client initialized and authenticated.") def _refresh_access_token(self): """Refreshes and returns a new access token.""" url = f"https://{self.env}.superoffice.com/login/common/oauth/tokens" - print(f"DEBUG: Refresh URL: '{url}' (Env: '{self.env}')") # DEBUG + logger.debug(f"DEBUG: Refresh URL: '{url}' (Env: '{self.env}')") + data = { "grant_type": "refresh_token", "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": self.refresh_token, - "redirect_uri": self.redirect_uri + "redirect_uri": settings.SO_REDIRECT_URI } + try: resp = requests.post(url, data=data) resp.raise_for_status() return resp.json().get("access_token") except requests.exceptions.HTTPError as e: - print(f"❌ Token Refresh Error: {e.response.text}") + logger.error(f"❌ Token Refresh Error: {e.response.text}") return None except Exception as e: - print(f"❌ Connection Error during token refresh: {e}") + logger.error(f"❌ Connection Error during token refresh: {e}") return None def _get(self, endpoint): """Generic GET request.""" + if not self.access_token: return None try: resp = requests.get(f"{self.base_url}/{endpoint}", headers=self.headers) resp.raise_for_status() return resp.json() except requests.exceptions.HTTPError as e: - print(f"❌ API GET Error for {endpoint}: {e.response.text}") + logger.error(f"❌ API GET Error for {endpoint}: {e.response.text}") return None def _put(self, endpoint, payload): """Generic PUT request.""" + if not self.access_token: return None try: resp = requests.put(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload) resp.raise_for_status() return resp.json() except requests.exceptions.HTTPError as e: - print(f"❌ API PUT Error for {endpoint}: {e.response.text}") + logger.error(f"❌ API PUT Error for {endpoint}: {e.response.text}") return None + def _post(self, endpoint, payload): + """Generic POST request.""" + if not self.access_token: return None + try: + resp = requests.post(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload) + resp.raise_for_status() + return resp.json() + except requests.exceptions.HTTPError as e: + logger.error(f"❌ API POST Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}") + return None + except Exception as e: + logger.error(f"❌ Connection Error during POST for {endpoint}: {e}") + return None + + # --- Convenience Wrappers --- + def get_person(self, person_id): - """Gets a single person by ID.""" return self._get(f"Person/{person_id}") def get_contact(self, contact_id): - """Gets a single contact (company) by ID.""" return self._get(f"Contact/{contact_id}") - def update_udfs(self, entity: str, entity_id: int, udf_payload: dict): - """ - Updates the UserDefinedFields for a given entity (Person or Contact). - - Args: - entity (str): "Person" or "Contact". - entity_id (int): The ID of the entity. - udf_payload (dict): A dictionary of ProgId:Value pairs. - """ - endpoint = f"{entity}/{entity_id}" - - # 1. GET the full entity object - existing_data = self._get(endpoint) - if not existing_data: - return False # Error is printed in _get - - # 2. Merge the UDF payload - if "UserDefinedFields" not in existing_data: - existing_data["UserDefinedFields"] = {} - existing_data["UserDefinedFields"].update(udf_payload) - - # 3. PUT the full object back - print(f"Updating {entity} {entity_id} with new UDFs...") - result = self._put(endpoint, existing_data) - - if result: - print(f"βœ… Successfully updated {entity} {entity_id}") - return True - return False - - def search(self, query_string: str): """ Performs a search using OData syntax and handles pagination. Example: "Person?$select=personId&$filter=lastname eq 'Godelmann'" """ + if not self.access_token: return None all_results = [] next_page_url = f"{self.base_url}/{query_string}" @@ -133,131 +122,64 @@ class SuperOfficeClient: resp.raise_for_status() data = resp.json() - # Add the items from the current page all_results.extend(data.get('value', [])) - - # Check for the next page link next_page_url = data.get('next_page_url', None) except requests.exceptions.HTTPError as e: - print(f"❌ API Search Error for {query_string}: {e.response.text}") + logger.error(f"❌ API Search Error for {query_string}: {e.response.text}") return None return all_results - def find_contact_by_criteria(self, name=None, org_nr=None, url=None): - """ - Finds a contact (company) by name, OrgNr, or URL. - Returns the first matching contact or None. - """ - filter_parts = [] - if name: - filter_parts.append(f"Name eq '{name}'") - if org_nr: - filter_parts.append(f"OrgNr eq '{org_nr}'") - if url: - filter_parts.append(f"UrlAddress eq '{url}'") - - if not filter_parts: - print("❌ No criteria provided for contact search.") - return None - - query_string = "Contact?$filter=" + " or ".join(filter_parts) - results = self.search(query_string) - if results: - return results[0] # Return the first match - return None - - def _post(self, endpoint, payload): - """Generic POST request.""" - try: - resp = requests.post(f"{self.base_url}/{endpoint}", headers=self.headers, json=payload) - resp.raise_for_status() - return resp.json() - except requests.exceptions.HTTPError as e: - print(f"❌ API POST Error for {endpoint} (Status: {e.response.status_code}): {e.response.text}") - return None - except Exception as e: - print(f"❌ Connection Error during POST for {endpoint}: {e}") - return None - - def create_contact(self, name: str, url: str = None, org_nr: str = None): - """Creates a new contact (company).""" - payload = {"Name": name} - if url: - payload["UrlAddress"] = url - if org_nr: - payload["OrgNr"] = org_nr - - print(f"Creating new contact: {name} with payload: {payload}...") # Added payload to log - return self._post("Contact", payload) - - def create_person(self, first_name: str, last_name: str, contact_id: int, email: str = None): - """Creates a new person linked to a contact.""" - payload = { - "Firstname": first_name, - "Lastname": last_name, - "Contact": {"ContactId": contact_id} - } - if email: - payload["EmailAddress"] = email - - print(f"Creating new person: {first_name} {last_name} for Contact ID {contact_id}...") - return self._post("Person", payload) - - def create_sale(self, title: str, contact_id: int, person_id: int, amount: float = None): - """Creates a new sale (opportunity) linked to a contact and person.""" - payload = { - "Heading": title, - "Contact": {"ContactId": contact_id}, - "Person": {"PersonId": person_id} - } - if amount: - payload["Amount"] = amount - - print(f"Creating new sale: {title}...") - return self._post("Sale", payload) - - def create_project(self, name: str, contact_id: int, person_id: int = None): - """Creates a new project linked to a contact, and optionally adds a person.""" - payload = { - "Name": name, - "Contact": {"ContactId": contact_id} - } - if person_id: - # Adding a person to a project requires a ProjectMember object - payload["ProjectMembers"] = [ - { - "Person": {"PersonId": person_id}, - "Role": "Member" # Default role, can be configured if needed - } - ] - - print(f"Creating new project: {name}...") - return self._post("Project", payload) - def update_entity_udfs(self, entity_id: int, entity_type: str, udf_data: dict): """ Updates UDFs for a given entity (Contact or Person). - Args: - entity_id (int): ID of the entity. - entity_type (str): 'Contact' or 'Person'. - udf_data (dict): Dictionary with ProgId:Value pairs for UDFs. - Returns: - dict: The updated entity object from the API, or None on failure. + entity_type: 'Contact' or 'Person' + udf_data: {ProgId: Value} """ - # We need to GET the existing entity, update its UDFs, then PUT it back. endpoint = f"{entity_type}/{entity_id}" + + # 1. GET existing existing_entity = self._get(endpoint) if not existing_entity: - print(f"❌ Failed to retrieve existing {entity_type} {entity_id} for UDF update.") - return None + logger.error(f"❌ Failed to retrieve existing {entity_type} {entity_id} for UDF update.") + return False if "UserDefinedFields" not in existing_entity: existing_entity["UserDefinedFields"] = {} + # 2. Merge payload existing_entity["UserDefinedFields"].update(udf_data) - print(f"Updating {entity_type} {entity_id} UDFs: {udf_data}...") - return self._put(endpoint, existing_entity) + logger.info(f"Updating {entity_type} {entity_id} UDFs: {udf_data}...") + + # 3. PUT update + result = self._put(endpoint, existing_entity) + return bool(result) + def update_person_position(self, person_id: int, position_id: int): + """ + Updates the standard 'Position' field of a Person. + """ + endpoint = f"Person/{person_id}" + + # 1. GET existing + existing_person = self._get(endpoint) + if not existing_person: + logger.error(f"❌ Failed to retrieve Person {person_id} for Position update.") + return False + + # 2. Check current value to avoid redundant updates + current_pos = existing_person.get("Position", {}) + if current_pos and str(current_pos.get("Id")) == str(position_id): + logger.info(f"Person {person_id} Position already set to {position_id}. Skipping.") + return True + + # 3. Update + existing_person["Position"] = {"Id": int(position_id)} + + logger.info(f"Updating Person {person_id} Position to ID {position_id}...") + + # 4. PUT + result = self._put(endpoint, existing_person) + return bool(result) \ No newline at end of file diff --git a/connector-superoffice/test_e2e_local.py b/connector-superoffice/test_e2e_local.py new file mode 100644 index 00000000..abea2ec5 --- /dev/null +++ b/connector-superoffice/test_e2e_local.py @@ -0,0 +1,80 @@ +import time +import json +import logging +from queue_manager import JobQueue +from worker import process_job +from superoffice_client import SuperOfficeClient +from config import settings +from unittest.mock import MagicMock + +# Setup Logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("e2e-test") + +def test_e2e(): + print("πŸš€ Starting End-to-End Simulation...") + + # 1. Mock the SuperOffice Client + # We don't want to hit the real API in this test script unless we are sure. + # But wait, the user asked for "Finaler End-to-End Systemtest". + # Usually this implies hitting the real systems. + # Let's try to use the REAL client if credentials are present, otherwise Mock. + + real_client = False + if settings.SO_CLIENT_ID and settings.SO_REFRESH_TOKEN: + print("βœ… Real Credentials found. Attempting real connection...") + try: + so_client = SuperOfficeClient() + if so_client.access_token: + real_client = True + except: + print("⚠️ Real connection failed. Falling back to Mock.") + + if not real_client: + print("⚠️ Using MOCKED SuperOffice Client.") + so_client = MagicMock() + so_client.get_contact.return_value = {"ContactId": 123, "Name": "Test Company", "UserDefinedFields": {}} + so_client.get_person.return_value = {"PersonId": 456, "Contact": {"ContactId": 123}, "UserDefinedFields": {}} + so_client.update_entity_udfs.return_value = True + else: + # Use a SAFE contact ID for testing if possible. + # CAUTION: This writes to the real system. + # Verify with user? Use a known "Gemini Test" contact? + # Let's use the ID 2 (which was mentioned in status updates as test data) + TEST_CONTACT_ID = 2 + # Verify it exists + c = so_client.get_contact(TEST_CONTACT_ID) + if not c: + print(f"❌ Test Contact {TEST_CONTACT_ID} not found. Aborting real write-test.") + return + + print(f"ℹ️ Using Real Contact: {c.get('Name')} (ID: {TEST_CONTACT_ID})") + + # 2. Create a Fake Job + fake_job = { + "id": "test-job-001", + "event_type": "contact.changed", + "payload": { + "PrimaryKey": 2, # Use the real ID + "ContactId": 2, + "JobTitle": "GeschΓ€ftsfΓΌhrer" # Trigger mapping + }, + "created_at": time.time() + } + + # 3. Process the Job (using worker logic) + # NOTE: This assumes COMPANY_EXPLORER_URL is reachable. + # If running in CLI container, it might need to be 'localhost' or the docker DNS name. + # Let's override config for this test run. + # settings.COMPANY_EXPLORER_URL = "http://localhost:8000" # Try localhost first if running on host/mapped + + print(f"\nβš™οΈ Processing Job with CE URL: {settings.COMPANY_EXPLORER_URL}...") + + try: + result = process_job(fake_job, so_client) + print(f"\nβœ… Job Result: {result}") + except Exception as e: + print(f"\n❌ Job Failed: {e}") + +if __name__ == "__main__": + test_e2e() diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py index a99c4454..af08ced4 100644 --- a/connector-superoffice/worker.py +++ b/connector-superoffice/worker.py @@ -5,6 +5,7 @@ import requests import json from queue_manager import JobQueue from superoffice_client import SuperOfficeClient +from config import settings # Setup Logging logging.basicConfig( @@ -13,17 +14,9 @@ logging.basicConfig( ) logger = logging.getLogger("connector-worker") -# Config -COMPANY_EXPLORER_URL = os.getenv("COMPANY_EXPLORER_URL", "http://company-explorer:8000") +# Poll Interval POLL_INTERVAL = 5 # Seconds -# UDF Mapping (DEV) - Should be moved to config later -UDF_MAPPING = { - "subject": "SuperOffice:5", - "intro": "SuperOffice:6", - "social_proof": "SuperOffice:7" -} - def process_job(job, so_client: SuperOfficeClient): """ Core logic for processing a single job. @@ -59,7 +52,6 @@ def process_job(job, so_client: SuperOfficeClient): # --- Cascading Logic --- # If a company changes, we want to update all its persons eventually. - # We do this by adding "person.changed" jobs for each person to the queue. if "contact" in event_low and not person_id: logger.info(f"Company event detected. Triggering cascade for all persons of Contact {contact_id}.") try: @@ -88,7 +80,7 @@ def process_job(job, so_client: SuperOfficeClient): logger.warning(f"Failed to fetch contact details for {contact_id}: {e}") # 2. Call Company Explorer Provisioning API - ce_url = f"{COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" + ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" ce_req = { "so_contact_id": contact_id, "so_person_id": person_id, @@ -97,6 +89,7 @@ def process_job(job, so_client: SuperOfficeClient): "crm_website": crm_website } + # Simple Basic Auth for internal API ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini")) try: @@ -112,28 +105,22 @@ def process_job(job, so_client: SuperOfficeClient): logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.") return "RETRY" - if provisioning_data.get("status") == "processing": - logger.info(f"Company Explorer is processing {provisioning_data.get('company_name', 'Unknown')}. Re-queueing job.") - return "RETRY" - except requests.exceptions.RequestException as e: raise Exception(f"Company Explorer API failed: {e}") - logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}") # DEBUG + logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}") # 2b. Sync Vertical to SuperOffice (Company Level) vertical_name = provisioning_data.get("vertical_name") if vertical_name: - # Mappings from README - VERTICAL_MAP = { - "Logistics - Warehouse": 23, - "Healthcare - Hospital": 24, - "Infrastructure - Transport": 25, - "Leisure - Indoor Active": 26 - } - - vertical_id = VERTICAL_MAP.get(vertical_name) + try: + vertical_map = json.loads(settings.VERTICAL_MAP_JSON) + except: + vertical_map = {} + logger.error("Failed to parse VERTICAL_MAP_JSON from config.") + + vertical_id = vertical_map.get(vertical_name) if vertical_id: logger.info(f"Identified Vertical '{vertical_name}' -> ID {vertical_id}") @@ -141,7 +128,10 @@ def process_job(job, so_client: SuperOfficeClient): # Check current value to avoid loops current_contact = so_client.get_contact(contact_id) current_udfs = current_contact.get("UserDefinedFields", {}) - current_val = current_udfs.get("SuperOffice:5", "") + + # Use Config UDF key + udf_key = settings.UDF_VERTICAL + current_val = current_udfs.get(udf_key, "") # Normalize SO list ID format (e.g., "[I:26]" -> "26") if current_val and current_val.startswith("[I:"): @@ -149,7 +139,7 @@ def process_job(job, so_client: SuperOfficeClient): if str(current_val) != str(vertical_id): logger.info(f"Updating Contact {contact_id} Vertical: {current_val} -> {vertical_id}") - so_client.update_entity_udfs(contact_id, "Contact", {"SuperOffice:5": str(vertical_id)}) + so_client.update_entity_udfs(contact_id, "Contact", {udf_key: str(vertical_id)}) else: logger.info(f"Vertical for Contact {contact_id} already in sync ({vertical_id}).") except Exception as e: @@ -159,49 +149,66 @@ def process_job(job, so_client: SuperOfficeClient): # 2c. Sync Website (Company Level) # TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue) - """ - website = provisioning_data.get("website") - if website and website != "k.A.": - try: - # Re-fetch contact to ensure we work on latest version (Optimistic Concurrency) - contact_data = so_client.get_contact(contact_id) - current_url = contact_data.get("UrlAddress", "") - - # Normalize for comparison - def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else "" - - if norm(current_url) != norm(website): - logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}") + # Re-enable via config if needed + if settings.ENABLE_WEBSITE_SYNC: + website = provisioning_data.get("website") + if website and website != "k.A.": + try: + contact_data = so_client.get_contact(contact_id) + current_url = contact_data.get("UrlAddress", "") - # Update Urls collection (Rank 1) - new_urls = [] - if "Urls" in contact_data: - found = False - for u in contact_data["Urls"]: - if u.get("Rank") == 1: - u["Value"] = website - found = True - new_urls.append(u) - if not found: - new_urls.append({"Value": website, "Rank": 1, "Description": "Website"}) - contact_data["Urls"] = new_urls + def norm(u): return str(u).lower().replace("https://", "").replace("http://", "").strip("/") if u else "" + + if norm(current_url) != norm(website): + logger.info(f"Updating Website for Contact {contact_id}: {current_url} -> {website}") + + # Update Urls collection (Rank 1) + new_urls = [] + if "Urls" in contact_data: + found = False + for u in contact_data["Urls"]: + if u.get("Rank") == 1: + u["Value"] = website + found = True + new_urls.append(u) + if not found: + new_urls.append({"Value": website, "Rank": 1, "Description": "Website"}) + contact_data["Urls"] = new_urls + else: + contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}] + + if not current_url: + contact_data["UrlAddress"] = website + + so_client._put(f"Contact/{contact_id}", contact_data) else: - contact_data["Urls"] = [{"Value": website, "Rank": 1, "Description": "Website"}] - - # Also set main field if empty - if not current_url: - contact_data["UrlAddress"] = website + logger.info(f"Website for Contact {contact_id} already in sync.") + except Exception as e: + logger.error(f"Failed to sync website for Contact {contact_id}: {e}") - # Write back full object - so_client._put(f"Contact/{contact_id}", contact_data) - else: - logger.info(f"Website for Contact {contact_id} already in sync.") - - except Exception as e: - logger.error(f"Failed to sync website for Contact {contact_id}: {e}") - """ + # 2d. Sync Person Position (Role) - if Person exists + role_name = provisioning_data.get("role_name") + if person_id and role_name: + try: + persona_map = json.loads(settings.PERSONA_MAP_JSON) + except: + persona_map = {} + logger.error("Failed to parse PERSONA_MAP_JSON from config.") + + position_id = persona_map.get(role_name) + + if position_id: + logger.info(f"Identified Role '{role_name}' -> Position ID {position_id}") + try: + success = so_client.update_person_position(person_id, int(position_id)) + if not success: + logger.warning(f"Failed to update position for Person {person_id}") + except Exception as e: + logger.error(f"Error syncing position for Person {person_id}: {e}") + else: + logger.info(f"Role '{role_name}' has no mapped Position ID in config. Skipping update.") - # 3. Update SuperOffice (Only if person_id is present) + # 3. Update SuperOffice Texts (Only if person_id is present) if not person_id: logger.info("Sync complete (Company only). No texts to write back.") return "SUCCESS" @@ -212,9 +219,9 @@ def process_job(job, so_client: SuperOfficeClient): return "SUCCESS" udf_update = {} - if texts.get("subject"): udf_update[UDF_MAPPING["subject"]] = texts["subject"] - if texts.get("intro"): udf_update[UDF_MAPPING["intro"]] = texts["intro"] - if texts.get("social_proof"): udf_update[UDF_MAPPING["social_proof"]] = texts["social_proof"] + if texts.get("subject"): udf_update[settings.UDF_SUBJECT] = texts["subject"] + if texts.get("intro"): udf_update[settings.UDF_INTRO] = texts["intro"] + if texts.get("social_proof"): udf_update[settings.UDF_SOCIAL_PROOF] = texts["social_proof"] if udf_update: # Loop Prevention @@ -250,6 +257,8 @@ def run_worker(): while not so_client: try: so_client = SuperOfficeClient() + if not so_client.access_token: # Check if auth worked + raise Exception("Auth failed") except Exception as e: logger.critical(f"Failed to initialize SuperOffice Client: {e}. Retrying in 30s...") time.sleep(30)