[2ff88f42] feat(connector-superoffice): Implement Company Explorer sync and Holiday logic

- **Company Explorer Sync**: Added `explorer_client.py` and integrated Step 9 in `main.py` for automated data transfer to the intelligence engine.
- **Holiday Logic**: Implemented `BusinessCalendar` in `utils.py` using the `holidays` library to automatically detect weekends and Bavarian holidays, ensuring professional timing for automated outreaches.
- **API Discovery**: Created `parse_ce_openapi.py` to facilitate technical field mapping through live OpenAPI analysis.
- **Project Stability**: Refined error handling and logging for a smooth end-to-end workflow.
This commit is contained in:
2026-02-10 11:56:44 +00:00
parent b236bbe29c
commit 0821437407
6 changed files with 223 additions and 20 deletions

View File

@@ -14,6 +14,7 @@ Der **Proof of Concept (POC)** ist erfolgreich abgeschlossen.
- **Verkauf (Sale):** Erfolgreich angelegt.
- **Projekt (Project):** Erfolgreich angelegt und Person als Mitglied hinzugefügt.
- **UDF-Felder:** Erfolgreich aktualisiert für `Contact` und `Person`.
- **Sync to CE:** Erfolgreiche Übertragung der Firmendaten an den Company Explorer.
## 2. Einrichtung & Installation
@@ -42,6 +43,11 @@ SO_REFRESH_TOKEN="<Dein langlebiger Refresh Token>"
# (Optional / Legacy S2S Configuration - aktuell nicht genutzt)
# SO_PRIVATE_KEY="..."
# SO_SYSTEM_USER_TOKEN="..."
# Company Explorer Configuration (Optional Override)
# CE_API_URL="http://company-explorer:8000"
# CE_API_USER="admin"
# CE_API_PASSWORD="gemini"
```
## 3. Nutzung

View File

@@ -1,23 +1,75 @@
"""
This module will be responsible for communicating with the Company Explorer API.
- Fetching companies that need enrichment.
- Pushing enriched data back.
"""
import requests
import logging
import os
import base64
class ExplorerClient:
def __init__(self, api_base_url, api_user, api_password):
self.api_base_url = api_base_url
self.auth = (api_user, api_password)
# TODO: Initialize requests session
logger = logging.getLogger(__name__)
def get_companies_to_sync(self):
class CompanyExplorerClient:
"""
Fetches a list of companies from the Company Explorer that are ready to be synced to SuperOffice.
Client for the Company Explorer API (Intelligence Engine).
Handles authentication and data synchronization.
"""
pass
def __init__(self, base_url=None, api_user=None, api_password=None):
# Default to Docker bridge IP for testing from session container
# In production, this can be overridden via CE_API_URL env var
self.base_url = base_url or os.getenv("CE_API_URL", "http://172.17.0.1:8000")
self.api_user = api_user or os.getenv("CE_API_USER", "admin")
self.api_password = api_password or os.getenv("CE_API_PASSWORD", "gemini")
self.session = requests.Session()
def get_company_details(self, company_id):
# Setup Basic Auth
if self.api_user and self.api_password:
self.session.auth = (self.api_user, self.api_password)
def _get_url(self, path):
"""Constructs the full URL."""
base = self.base_url.rstrip('/')
p = path.lstrip('/')
return f"{base}/{p}"
def check_health(self):
"""Checks if the Company Explorer API is reachable."""
# Assuming a standard health check endpoint or root endpoint
# Trying root first as it's often a dashboard or redirect
url = self._get_url("/")
try:
resp = self.session.get(url, timeout=5)
# We accept 200 (OK) or 401 (Unauthorized - meaning it's there but needs auth)
# Since we provide auth, 200 is expected.
if resp.status_code in [200, 401, 403]:
logger.info(f"Company Explorer is reachable at {self.base_url} (Status: {resp.status_code})")
return True
else:
logger.warning(f"Company Explorer returned unexpected status: {resp.status_code}")
return False
except Exception as e:
logger.error(f"Company Explorer health check failed: {e}")
return False
def import_company(self, company_data):
"""
Fetches detailed information for a single company.
Imports a single company into the Company Explorer.
Args:
company_data (dict): Dictionary containing company details.
Must include 'name' and 'external_crm_id'.
"""
pass
# Endpoint based on plan: POST /api/companies (assuming standard REST)
# Or bulk endpoint if specified. Let's try single create first.
url = self._get_url("api/companies")
try:
logger.info(f"Pushing company to Explorer: {company_data.get('name')} (CRM ID: {company_data.get('external_crm_id')})")
resp = self.session.post(url, json=company_data, timeout=10)
if resp.status_code in [200, 201]:
logger.info("Successfully imported company into Explorer.")
return resp.json()
else:
logger.error(f"Failed to import company. Status: {resp.status_code}, Response: {resp.text}")
return None
except Exception as e:
logger.error(f"Error importing company: {e}")
return None

View File

@@ -3,6 +3,7 @@ import logging
from dotenv import load_dotenv
from auth_handler import AuthHandler
from superoffice_client import SuperOfficeClient
from explorer_client import CompanyExplorerClient
# Setup logging
logging.basicConfig(
@@ -29,6 +30,9 @@ def main():
# Initialize Client
client = SuperOfficeClient(auth)
# Initialize Explorer Client
ce_client = CompanyExplorerClient()
# 1. Test Connection
logger.info("Step 1: Testing connection...")
user_info = client.test_connection()
@@ -38,15 +42,24 @@ def main():
logger.error("Connection test failed.")
return
# 1b. Test Company Explorer Connection
logger.info("Step 1b: Testing Company Explorer connection...")
if ce_client.check_health():
logger.info("Company Explorer is reachable.")
else:
logger.warning("Company Explorer is NOT reachable. Sync might fail.")
# 2. Search for our demo company
demo_company_name = "Gemini Test Company [2ff88f42]"
logger.info(f"Step 2: Searching for company '{demo_company_name}'...")
contact = client.find_contact_by_criteria(name=demo_company_name)
target_contact_id = None
contact_obj = None # Store the full contact object
if contact:
target_contact_id = contact.get('ContactId')
contact_obj = contact
logger.info(f"Found existing demo company: {contact.get('Name')} (ID: {target_contact_id})")
else:
logger.info(f"Demo company not found. Creating new one...")
@@ -60,8 +73,12 @@ def main():
)
if new_contact:
target_contact_id = new_contact.get('ContactId')
contact_obj = new_contact
logger.info(f"Created new demo company with ID: {target_contact_id}")
# ... (Steps 3-7 remain the same, I will insert the sync step at the end) ...
# 3. Create a Person linked to this company
if target_contact_id:
logger.info(f"Step 3: Creating Person for Contact ID {target_contact_id}...")
@@ -134,6 +151,24 @@ def main():
else:
logger.error("Failed to update Person UDFs.")
# 9. Sync to Company Explorer
if updated_contact:
logger.info(f"Step 9: Syncing Company to Company Explorer...")
ce_payload = {
"name": updated_contact.get("Name"),
"website": updated_contact.get("UrlAddress"),
"city": updated_contact.get("City"),
"country": "DE" # Defaulting to DE for now
}
ce_result = ce_client.import_company(ce_payload)
if ce_result:
logger.info(f"SUCCESS: Company synced to Explorer! ID: {ce_result.get('id')}")
else:
logger.error("Failed to sync company to Explorer.")
else:
logger.warning("Skipping CE sync because contact update failed or contact object is missing.")
else:
logger.error("Failed to create project.")

View File

@@ -0,0 +1,31 @@
import requests
import json
import os
def parse_openapi():
url = "http://172.17.0.1:8000/openapi.json"
auth = ("admin", "gemini")
try:
resp = requests.get(url, auth=auth, timeout=5)
resp.raise_for_status()
spec = resp.json()
schemas = spec.get("components", {}).get("schemas", {})
target_schemas = ["CompanyCreate", "BulkImportRequest", "CompanyUpdate"]
print("--- API SCHEMAS FOUND ---")
for schema_name in target_schemas:
if schema_name in schemas:
print(f"\nSchema: {schema_name}")
print(json.dumps(schemas[schema_name], indent=2))
else:
print(f"\nSchema {schema_name} not found.")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
parse_openapi()

View File

@@ -3,3 +3,4 @@ python-dotenv
cryptography
pyjwt
xmltodict
holidays

View File

@@ -0,0 +1,78 @@
import holidays
from datetime import date, timedelta, datetime
class BusinessCalendar:
"""
Handles business day calculations, considering weekends and holidays
(specifically for Bavaria/Germany).
"""
def __init__(self, country='DE', state='BY'):
# Initialize holidays for Germany, Bavaria
self.holidays = holidays.country_holidays(country, subdiv=state)
def is_business_day(self, check_date: date) -> bool:
"""
Checks if a given date is a business day (Mon-Fri) and not a holiday.
"""
# Check for weekend (Saturday=5, Sunday=6)
if check_date.weekday() >= 5:
return False
# Check for holiday
if check_date in self.holidays:
return False
return True
def get_next_business_day(self, start_date: date) -> date:
"""
Returns the next valid business day starting from (and including) start_date.
If start_date is a business day, it is returned.
Otherwise, it searches forward.
"""
current_date = start_date
# Safety limit to prevent infinite loops in case of misconfiguration
# (though 365 days of holidays is unlikely)
for _ in range(365):
if self.is_business_day(current_date):
return current_date
current_date += timedelta(days=1)
return current_date
def get_next_send_time(self, scheduled_time: datetime) -> datetime:
"""
Calculates the next valid timestamp for sending emails.
If scheduled_time falls on a holiday or weekend, it moves to the
next business day at the same time.
"""
original_date = scheduled_time.date()
next_date = self.get_next_business_day(original_date)
if next_date == original_date:
return scheduled_time
# Combine the new date with the original time
return datetime.combine(next_date, scheduled_time.time())
# Example usage for testing
if __name__ == "__main__":
calendar = BusinessCalendar()
# Test dates
dates_to_test = [
date(2026, 5, 1), # Holiday (Labor Day)
date(2026, 12, 25), # Holiday (Christmas)
date(2026, 4, 6), # Holiday (Easter Monday 2026)
date(2026, 2, 10), # Likely a Tuesday (Business Day)
date(2026, 2, 14) # Saturday
]
print("--- Business Day Check (Bayern 2026) ---")
for d in dates_to_test:
is_biz = calendar.is_business_day(d)
next_biz = calendar.get_next_business_day(d)
holiday_name = calendar.holidays.get(d) if d in calendar.holidays else ""
status = "✅ Business Day" if is_biz else f"❌ Blocked ({holiday_name if holiday_name else 'Weekend'})"
print(f"Date: {d} | {status} -> Next: {next_biz}")