[2ff88f42] feat(connector-superoffice): Implement Company Explorer sync and Holiday logic

- **Company Explorer Sync**: Added `explorer_client.py` and integrated Step 9 in `main.py` for automated data transfer to the intelligence engine.
- **Holiday Logic**: Implemented `BusinessCalendar` in `utils.py` using the `holidays` library to automatically detect weekends and Bavarian holidays, ensuring professional timing for automated outreaches.
- **API Discovery**: Created `parse_ce_openapi.py` to facilitate technical field mapping through live OpenAPI analysis.
- **Project Stability**: Refined error handling and logging for a smooth end-to-end workflow.
This commit is contained in:
2026-02-10 11:56:44 +00:00
parent b7265ff1e7
commit fd5ca41e7f
6 changed files with 223 additions and 20 deletions

View File

@@ -14,6 +14,7 @@ Der **Proof of Concept (POC)** ist erfolgreich abgeschlossen.
- **Verkauf (Sale):** Erfolgreich angelegt.
- **Projekt (Project):** Erfolgreich angelegt und Person als Mitglied hinzugefügt.
- **UDF-Felder:** Erfolgreich aktualisiert für `Contact` und `Person`.
- **Sync to CE:** Erfolgreiche Übertragung der Firmendaten an den Company Explorer.
## 2. Einrichtung & Installation
@@ -42,6 +43,11 @@ SO_REFRESH_TOKEN="<Dein langlebiger Refresh Token>"
# (Optional / Legacy S2S Configuration - aktuell nicht genutzt)
# SO_PRIVATE_KEY="..."
# SO_SYSTEM_USER_TOKEN="..."
# Company Explorer Configuration (Optional Override)
# CE_API_URL="http://company-explorer:8000"
# CE_API_USER="admin"
# CE_API_PASSWORD="gemini"
```
## 3. Nutzung
@@ -130,4 +136,4 @@ Um diesen Plan umzusetzen, werden die folgenden UDFs in SuperOffice benötigt (A
2. Am Objekt **`Person` (Ansprechpartner):** Ein großes Textfeld (Memo/Long Text). Vorschlag: `AI_Email_Draft`
3. Am Objekt **`Person` (Ansprechpartner):** Ein Listenfeld. Vorschlag: `MA_Status` mit den Werten `Init`, `Ready_to_Craft`, `Ready_to_Send`, `Sent_Week1`, `Sent_Week2`, `Replied`, `Paused`.
**Wichtiger Blocker:** Das Erstellen von E-Mail-Aktivitäten per API ist aktuell blockiert, da das E-Mail-Modul in der Zielumgebung (SOD) nicht aktiviert oder konfiguriert zu sein scheint. Dies führt zu einem `500 Internal Server Error` bei API-Aufrufen. Die Implementierung dieser Funktionalität im Connector ist daher bis auf Weiteres ausgesetzt.
**Wichtiger Blocker:** Das Erstellen von E-Mail-Aktivitäten per API ist aktuell blockiert, da das E-Mail-Modul in der Zielumgebung (SOD) nicht aktiviert oder konfiguriert zu sein scheint. Dies führt zu einem `500 Internal Server Error` bei API-Aufrufen. Die Implementierung dieser Funktionalität im Connector ist daher bis auf Weiteres ausgesetzt.

View File

@@ -1,23 +1,75 @@
"""
This module will be responsible for communicating with the Company Explorer API.
- Fetching companies that need enrichment.
- Pushing enriched data back.
"""
import requests
import logging
import os
import base64
class ExplorerClient:
def __init__(self, api_base_url, api_user, api_password):
self.api_base_url = api_base_url
self.auth = (api_user, api_password)
# TODO: Initialize requests session
logger = logging.getLogger(__name__)
def get_companies_to_sync(self):
"""
Fetches a list of companies from the Company Explorer that are ready to be synced to SuperOffice.
"""
pass
class CompanyExplorerClient:
"""
Client for the Company Explorer API (Intelligence Engine).
Handles authentication and data synchronization.
"""
def __init__(self, base_url=None, api_user=None, api_password=None):
# Default to Docker bridge IP for testing from session container
# In production, this can be overridden via CE_API_URL env var
self.base_url = base_url or os.getenv("CE_API_URL", "http://172.17.0.1:8000")
self.api_user = api_user or os.getenv("CE_API_USER", "admin")
self.api_password = api_password or os.getenv("CE_API_PASSWORD", "gemini")
self.session = requests.Session()
# Setup Basic Auth
if self.api_user and self.api_password:
self.session.auth = (self.api_user, self.api_password)
def get_company_details(self, company_id):
def _get_url(self, path):
"""Constructs the full URL."""
base = self.base_url.rstrip('/')
p = path.lstrip('/')
return f"{base}/{p}"
def check_health(self):
"""Checks if the Company Explorer API is reachable."""
# Assuming a standard health check endpoint or root endpoint
# Trying root first as it's often a dashboard or redirect
url = self._get_url("/")
try:
resp = self.session.get(url, timeout=5)
# We accept 200 (OK) or 401 (Unauthorized - meaning it's there but needs auth)
# Since we provide auth, 200 is expected.
if resp.status_code in [200, 401, 403]:
logger.info(f"Company Explorer is reachable at {self.base_url} (Status: {resp.status_code})")
return True
else:
logger.warning(f"Company Explorer returned unexpected status: {resp.status_code}")
return False
except Exception as e:
logger.error(f"Company Explorer health check failed: {e}")
return False
def import_company(self, company_data):
"""
Fetches detailed information for a single company.
Imports a single company into the Company Explorer.
Args:
company_data (dict): Dictionary containing company details.
Must include 'name' and 'external_crm_id'.
"""
pass
# Endpoint based on plan: POST /api/companies (assuming standard REST)
# Or bulk endpoint if specified. Let's try single create first.
url = self._get_url("api/companies")
try:
logger.info(f"Pushing company to Explorer: {company_data.get('name')} (CRM ID: {company_data.get('external_crm_id')})")
resp = self.session.post(url, json=company_data, timeout=10)
if resp.status_code in [200, 201]:
logger.info("Successfully imported company into Explorer.")
return resp.json()
else:
logger.error(f"Failed to import company. Status: {resp.status_code}, Response: {resp.text}")
return None
except Exception as e:
logger.error(f"Error importing company: {e}")
return None

View File

@@ -3,6 +3,7 @@ import logging
from dotenv import load_dotenv
from auth_handler import AuthHandler
from superoffice_client import SuperOfficeClient
from explorer_client import CompanyExplorerClient
# Setup logging
logging.basicConfig(
@@ -29,6 +30,9 @@ def main():
# Initialize Client
client = SuperOfficeClient(auth)
# Initialize Explorer Client
ce_client = CompanyExplorerClient()
# 1. Test Connection
logger.info("Step 1: Testing connection...")
user_info = client.test_connection()
@@ -38,15 +42,24 @@ def main():
logger.error("Connection test failed.")
return
# 1b. Test Company Explorer Connection
logger.info("Step 1b: Testing Company Explorer connection...")
if ce_client.check_health():
logger.info("Company Explorer is reachable.")
else:
logger.warning("Company Explorer is NOT reachable. Sync might fail.")
# 2. Search for our demo company
demo_company_name = "Gemini Test Company [2ff88f42]"
logger.info(f"Step 2: Searching for company '{demo_company_name}'...")
contact = client.find_contact_by_criteria(name=demo_company_name)
target_contact_id = None
contact_obj = None # Store the full contact object
if contact:
target_contact_id = contact.get('ContactId')
contact_obj = contact
logger.info(f"Found existing demo company: {contact.get('Name')} (ID: {target_contact_id})")
else:
logger.info(f"Demo company not found. Creating new one...")
@@ -60,8 +73,12 @@ def main():
)
if new_contact:
target_contact_id = new_contact.get('ContactId')
contact_obj = new_contact
logger.info(f"Created new demo company with ID: {target_contact_id}")
# ... (Steps 3-7 remain the same, I will insert the sync step at the end) ...
# 3. Create a Person linked to this company
if target_contact_id:
logger.info(f"Step 3: Creating Person for Contact ID {target_contact_id}...")
@@ -134,6 +151,24 @@ def main():
else:
logger.error("Failed to update Person UDFs.")
# 9. Sync to Company Explorer
if updated_contact:
logger.info(f"Step 9: Syncing Company to Company Explorer...")
ce_payload = {
"name": updated_contact.get("Name"),
"website": updated_contact.get("UrlAddress"),
"city": updated_contact.get("City"),
"country": "DE" # Defaulting to DE for now
}
ce_result = ce_client.import_company(ce_payload)
if ce_result:
logger.info(f"SUCCESS: Company synced to Explorer! ID: {ce_result.get('id')}")
else:
logger.error("Failed to sync company to Explorer.")
else:
logger.warning("Skipping CE sync because contact update failed or contact object is missing.")
else:
logger.error("Failed to create project.")

View File

@@ -0,0 +1,31 @@
import requests
import json
import os
def parse_openapi():
url = "http://172.17.0.1:8000/openapi.json"
auth = ("admin", "gemini")
try:
resp = requests.get(url, auth=auth, timeout=5)
resp.raise_for_status()
spec = resp.json()
schemas = spec.get("components", {}).get("schemas", {})
target_schemas = ["CompanyCreate", "BulkImportRequest", "CompanyUpdate"]
print("--- API SCHEMAS FOUND ---")
for schema_name in target_schemas:
if schema_name in schemas:
print(f"\nSchema: {schema_name}")
print(json.dumps(schemas[schema_name], indent=2))
else:
print(f"\nSchema {schema_name} not found.")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
parse_openapi()

View File

@@ -2,4 +2,5 @@ requests
python-dotenv
cryptography
pyjwt
xmltodict
xmltodict
holidays

View File

@@ -0,0 +1,78 @@
import holidays
from datetime import date, timedelta, datetime
class BusinessCalendar:
"""
Handles business day calculations, considering weekends and holidays
(specifically for Bavaria/Germany).
"""
def __init__(self, country='DE', state='BY'):
# Initialize holidays for Germany, Bavaria
self.holidays = holidays.country_holidays(country, subdiv=state)
def is_business_day(self, check_date: date) -> bool:
"""
Checks if a given date is a business day (Mon-Fri) and not a holiday.
"""
# Check for weekend (Saturday=5, Sunday=6)
if check_date.weekday() >= 5:
return False
# Check for holiday
if check_date in self.holidays:
return False
return True
def get_next_business_day(self, start_date: date) -> date:
"""
Returns the next valid business day starting from (and including) start_date.
If start_date is a business day, it is returned.
Otherwise, it searches forward.
"""
current_date = start_date
# Safety limit to prevent infinite loops in case of misconfiguration
# (though 365 days of holidays is unlikely)
for _ in range(365):
if self.is_business_day(current_date):
return current_date
current_date += timedelta(days=1)
return current_date
def get_next_send_time(self, scheduled_time: datetime) -> datetime:
"""
Calculates the next valid timestamp for sending emails.
If scheduled_time falls on a holiday or weekend, it moves to the
next business day at the same time.
"""
original_date = scheduled_time.date()
next_date = self.get_next_business_day(original_date)
if next_date == original_date:
return scheduled_time
# Combine the new date with the original time
return datetime.combine(next_date, scheduled_time.time())
# Example usage for testing
if __name__ == "__main__":
calendar = BusinessCalendar()
# Test dates
dates_to_test = [
date(2026, 5, 1), # Holiday (Labor Day)
date(2026, 12, 25), # Holiday (Christmas)
date(2026, 4, 6), # Holiday (Easter Monday 2026)
date(2026, 2, 10), # Likely a Tuesday (Business Day)
date(2026, 2, 14) # Saturday
]
print("--- Business Day Check (Bayern 2026) ---")
for d in dates_to_test:
is_biz = calendar.is_business_day(d)
next_biz = calendar.get_next_business_day(d)
holiday_name = calendar.holidays.get(d) if d in calendar.holidays else ""
status = "✅ Business Day" if is_biz else f"❌ Blocked ({holiday_name if holiday_name else 'Weekend'})"
print(f"Date: {d} | {status} -> Next: {next_biz}")