[30e88f42] ✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf
✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf mit Echtdaten abgeschlossen.
This commit is contained in:
@@ -32,6 +32,8 @@ class Settings:
|
||||
self.UDF_INTRO = os.getenv("UDF_INTRO", "SuperOffice:6")
|
||||
self.UDF_SOCIAL_PROOF = os.getenv("UDF_SOCIAL_PROOF", "SuperOffice:7")
|
||||
self.UDF_VERTICAL = os.getenv("UDF_VERTICAL", "SuperOffice:5")
|
||||
self.UDF_OPENER = os.getenv("UDF_OPENER", "SuperOffice:6")
|
||||
self.UDF_OPENER_SECONDARY = os.getenv("UDF_OPENER_SECONDARY", "SuperOffice:7")
|
||||
|
||||
# Global instance
|
||||
settings = Settings()
|
||||
@@ -104,3 +104,24 @@ class JobQueue:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
|
||||
return dict(cursor.fetchall())
|
||||
|
||||
def get_recent_jobs(self, limit=50):
|
||||
with sqlite3.connect(DB_PATH) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
SELECT id, event_type, status, created_at, updated_at, error_msg, payload
|
||||
FROM jobs
|
||||
ORDER BY updated_at DESC, created_at DESC
|
||||
LIMIT ?
|
||||
""", (limit,))
|
||||
rows = cursor.fetchall()
|
||||
results = []
|
||||
for row in rows:
|
||||
r = dict(row)
|
||||
try:
|
||||
r['payload'] = json.loads(r['payload'])
|
||||
except:
|
||||
pass
|
||||
results.append(r)
|
||||
return results
|
||||
|
||||
@@ -198,6 +198,8 @@ class TestE2EFlow(unittest.TestCase):
|
||||
mock_settings.UDF_SUBJECT = "SuperOffice:Subject"
|
||||
mock_settings.UDF_INTRO = "SuperOffice:Intro"
|
||||
mock_settings.UDF_SOCIAL_PROOF = "SuperOffice:SocialProof"
|
||||
mock_settings.UDF_OPENER = "SuperOffice:Opener"
|
||||
mock_settings.UDF_OPENER_SECONDARY = "SuperOffice:OpenerSecondary"
|
||||
mock_settings.VERTICAL_MAP_JSON = '{"Logistics - Warehouse": 23, "Healthcare - Hospital": 24}'
|
||||
mock_settings.PERSONA_MAP_JSON = '{"Operativer Entscheider": 99}'
|
||||
mock_settings.ENABLE_WEBSITE_SYNC = True
|
||||
@@ -213,11 +215,22 @@ class TestE2EFlow(unittest.TestCase):
|
||||
company = db.query(Company).filter(Company.crm_id == "100").first()
|
||||
company.status = "ENRICHED"
|
||||
company.industry_ai = "Logistics - Warehouse"
|
||||
company.city = "Koeln"
|
||||
company.crm_vat = "DE813016729"
|
||||
company.ai_opener = "Positive observation about Silly Billy"
|
||||
company.ai_opener_secondary = "Secondary observation"
|
||||
db.commit()
|
||||
db.close()
|
||||
|
||||
process_job(job, self.mock_so_client) # SUCCESS
|
||||
assert self.mock_so_client.contacts[100]["UserDefinedFields"]["SuperOffice:Vertical"] == "23"
|
||||
|
||||
# Verify Contact Updates (Standard Fields & UDFs)
|
||||
contact = self.mock_so_client.contacts[100]
|
||||
self.assertEqual(contact["UserDefinedFields"]["SuperOffice:Vertical"], "23")
|
||||
self.assertEqual(contact["UserDefinedFields"]["SuperOffice:Opener"], "Positive observation about Silly Billy")
|
||||
self.assertEqual(contact["UserDefinedFields"]["SuperOffice:OpenerSecondary"], "Secondary observation")
|
||||
self.assertEqual(contact.get("PostalAddress", {}).get("City"), "Koeln")
|
||||
self.assertEqual(contact.get("OrgNumber"), "DE813016729")
|
||||
|
||||
# --- Step 2: Person Created (Get Logistics Texts) ---
|
||||
print("[TEST] Step 2: Create Person...")
|
||||
|
||||
56
connector-superoffice/verify_superoffice_data.py
Normal file
56
connector-superoffice/verify_superoffice_data.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from superoffice_client import SuperOfficeClient
|
||||
import json
|
||||
import logging
|
||||
|
||||
# Setup minimal logging
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
|
||||
def verify_contact(contact_id):
|
||||
print(f"🔍 Verifying REAL SuperOffice Data for Contact {contact_id}...")
|
||||
|
||||
client = SuperOfficeClient()
|
||||
if not client.access_token:
|
||||
print("❌ Auth failed.")
|
||||
return
|
||||
|
||||
contact = client.get_contact(contact_id)
|
||||
if not contact:
|
||||
print("❌ Contact not found.")
|
||||
return
|
||||
|
||||
# 1. Standard Fields
|
||||
print("\n--- Standard Fields ---")
|
||||
print(f"Name: {contact.get('Name')}")
|
||||
print(f"OrgNr: {contact.get('OrgNr')}") # Changed from OrgNumber
|
||||
|
||||
addr = contact.get("Address", {}) # Changed from PostalAddress
|
||||
print(f"Raw Address JSON: {json.dumps(addr, indent=2)}")
|
||||
|
||||
if addr:
|
||||
postal = addr.get("Postal", {})
|
||||
street = addr.get("Street", {})
|
||||
print(f"Postal City: {postal.get('City')}")
|
||||
print(f"Street City: {street.get('City')}")
|
||||
else:
|
||||
print("Address: (Empty)")
|
||||
print("Address: (Empty)")
|
||||
|
||||
# 2. UDFs
|
||||
print("\n--- User Defined Fields (UDFs) ---")
|
||||
udfs = contact.get("UserDefinedFields", {})
|
||||
if not udfs:
|
||||
print("(No UDFs found)")
|
||||
else:
|
||||
for k, v in udfs.items():
|
||||
# Filter relevant UDFs if possible, or show all
|
||||
if "SuperOffice:" in k:
|
||||
# Try to decode value if it's a list item like [I:26]
|
||||
val_str = str(v)
|
||||
print(f"{k}: {val_str}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
c_id = 6
|
||||
if len(sys.argv) > 1:
|
||||
c_id = int(sys.argv[1])
|
||||
verify_contact(c_id)
|
||||
@@ -1,4 +1,5 @@
|
||||
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
|
||||
from fastapi.responses import HTMLResponse
|
||||
import logging
|
||||
import os
|
||||
import json
|
||||
@@ -51,6 +52,104 @@ def health():
|
||||
def stats():
|
||||
return queue.get_stats()
|
||||
|
||||
@app.get("/api/jobs")
|
||||
def get_jobs():
|
||||
return queue.get_recent_jobs(limit=100)
|
||||
|
||||
@app.get("/dashboard", response_class=HTMLResponse)
|
||||
def dashboard():
|
||||
html_content = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Connector Dashboard</title>
|
||||
<meta http-equiv="refresh" content="5">
|
||||
<style>
|
||||
body { font-family: sans-serif; padding: 20px; background: #f0f2f5; }
|
||||
.container { max-width: 1200px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
|
||||
h1 { color: #333; }
|
||||
table { width: 100%; border-collapse: collapse; margin-top: 20px; }
|
||||
th, td { text-align: left; padding: 12px; border-bottom: 1px solid #ddd; font-size: 14px; }
|
||||
th { background-color: #f8f9fa; color: #666; font-weight: 600; }
|
||||
tr:hover { background-color: #f8f9fa; }
|
||||
.status { padding: 4px 8px; border-radius: 4px; font-size: 12px; font-weight: bold; text-transform: uppercase; }
|
||||
.status-PENDING { background: #e2e8f0; color: #475569; }
|
||||
.status-PROCESSING { background: #dbeafe; color: #1e40af; }
|
||||
.status-COMPLETED { background: #dcfce7; color: #166534; }
|
||||
.status-FAILED { background: #fee2e2; color: #991b1b; }
|
||||
.status-RETRY { background: #fef9c3; color: #854d0e; }
|
||||
.meta { color: #888; font-size: 12px; }
|
||||
pre { margin: 0; white-space: pre-wrap; word-break: break-word; color: #444; font-family: monospace; font-size: 11px; max-height: 60px; overflow-y: auto; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container">
|
||||
<div style="display: flex; justify-content: space-between; align-items: center;">
|
||||
<h1>🔌 SuperOffice Connector Dashboard</h1>
|
||||
<div id="stats"></div>
|
||||
</div>
|
||||
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th width="50">ID</th>
|
||||
<th width="120">Status</th>
|
||||
<th width="150">Updated</th>
|
||||
<th width="150">Event</th>
|
||||
<th>Payload / Error</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody id="job-table">
|
||||
<tr><td colspan="5" style="text-align:center;">Loading...</td></tr>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<script>
|
||||
async function loadData() {
|
||||
try {
|
||||
// Use relative path to work behind Nginx /connector/ prefix
|
||||
const response = await fetch('api/jobs');
|
||||
const jobs = await response.json();
|
||||
|
||||
const tbody = document.getElementById('job-table');
|
||||
tbody.innerHTML = '';
|
||||
|
||||
if (jobs.length === 0) {
|
||||
tbody.innerHTML = '<tr><td colspan="5" style="text-align:center;">No jobs found</td></tr>';
|
||||
return;
|
||||
}
|
||||
|
||||
jobs.forEach(job => {
|
||||
const tr = document.createElement('tr');
|
||||
|
||||
let details = JSON.stringify(job.payload, null, 2);
|
||||
if (job.error_msg) {
|
||||
details += "\\n\\n🔴 ERROR: " + job.error_msg;
|
||||
}
|
||||
|
||||
tr.innerHTML = `
|
||||
<td>#${job.id}</td>
|
||||
<td><span class="status status-${job.status}">${job.status}</span></td>
|
||||
<td>${new Date(job.updated_at + "Z").toLocaleTimeString()}</td>
|
||||
<td>${job.event_type}</td>
|
||||
<td><pre>${details}</pre></td>
|
||||
`;
|
||||
tbody.appendChild(tr);
|
||||
});
|
||||
} catch (e) {
|
||||
console.error("Failed to load jobs", e);
|
||||
}
|
||||
}
|
||||
|
||||
loadData();
|
||||
// Also handled by meta refresh, but JS refresh is smoother if we want to remove meta refresh
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
return HTMLResponse(content=html_content, status_code=200)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run("webhook_app:app", host="0.0.0.0", port=8000, reload=True)
|
||||
|
||||
@@ -141,7 +141,10 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch contact details for {contact_id}: {e}")
|
||||
|
||||
# 2. Call Company Explorer Provisioning API
|
||||
# --- 2. PREPARE UPDATES (Atomic Strategy) ---
|
||||
# We will fetch the contact ONCE, calculate all needed changes (Standard + UDFs),
|
||||
# and push them in a single operation if possible to avoid race conditions.
|
||||
|
||||
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
|
||||
ce_req = {
|
||||
"so_contact_id": contact_id,
|
||||
@@ -152,7 +155,6 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
"crm_industry_name": crm_industry_name
|
||||
}
|
||||
|
||||
# Simple Basic Auth for internal API
|
||||
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
|
||||
|
||||
try:
|
||||
@@ -173,82 +175,116 @@ def process_job(job, so_client: SuperOfficeClient):
|
||||
|
||||
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
|
||||
|
||||
# 2b. Sync Vertical to SuperOffice (Company Level)
|
||||
vertical_name = provisioning_data.get("vertical_name")
|
||||
# Fetch fresh Contact Data for comparison
|
||||
try:
|
||||
contact_data = so_client.get_contact(contact_id)
|
||||
if not contact_data:
|
||||
logger.error(f"Contact {contact_id} not found in SuperOffice.")
|
||||
return "FAILED"
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch contact {contact_id}: {e}")
|
||||
return "RETRY"
|
||||
|
||||
dirty_standard = False
|
||||
dirty_udfs = False
|
||||
|
||||
# Ensure nested dicts exist
|
||||
if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {}
|
||||
if "PostalAddress" not in contact_data or contact_data["PostalAddress"] is None: contact_data["PostalAddress"] = {}
|
||||
|
||||
# --- A. Vertical Sync ---
|
||||
vertical_name = provisioning_data.get("vertical_name")
|
||||
if vertical_name:
|
||||
try:
|
||||
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
|
||||
except:
|
||||
vertical_map = {}
|
||||
logger.error("Failed to parse VERTICAL_MAP_JSON from config.")
|
||||
|
||||
vertical_id = vertical_map.get(vertical_name)
|
||||
|
||||
if vertical_id:
|
||||
logger.info(f"Identified Vertical '{vertical_name}' -> ID {vertical_id}")
|
||||
try:
|
||||
# Check current value to avoid loops
|
||||
current_contact = so_client.get_contact(contact_id)
|
||||
current_udfs = current_contact.get("UserDefinedFields", {})
|
||||
|
||||
# Use Config UDF key
|
||||
vertical_id = vertical_map.get(vertical_name)
|
||||
if vertical_id:
|
||||
udf_key = settings.UDF_VERTICAL
|
||||
current_val = current_udfs.get(udf_key, "")
|
||||
|
||||
# Normalize SO list ID format (e.g., "[I:26]" -> "26")
|
||||
if current_val and current_val.startswith("[I:"):
|
||||
current_val = current_val.split(":")[1].strip("]")
|
||||
current_val = contact_data["UserDefinedFields"].get(udf_key, "")
|
||||
# Normalize "[I:26]" -> "26"
|
||||
if current_val and str(current_val).startswith("[I:"):
|
||||
current_val = str(current_val).split(":")[1].strip("]")
|
||||
|
||||
if str(current_val) != str(vertical_id):
|
||||
logger.info(f"Updating Contact {contact_id} Vertical: {current_val} -> {vertical_id}")
|
||||
so_client.update_entity_udfs(contact_id, "Contact", {udf_key: str(vertical_id)})
|
||||
else:
|
||||
logger.info(f"Vertical for Contact {contact_id} already in sync ({vertical_id}).")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to sync vertical for Contact {contact_id}: {e}")
|
||||
else:
|
||||
logger.warning(f"Vertical '{vertical_name}' not found in internal mapping.")
|
||||
logger.info(f"Change detected: Vertical {current_val} -> {vertical_id}")
|
||||
contact_data["UserDefinedFields"][udf_key] = str(vertical_id)
|
||||
dirty_udfs = True
|
||||
except Exception as e:
|
||||
logger.error(f"Vertical sync error: {e}")
|
||||
|
||||
# 2b.2 Sync Address & VAT (Standard Fields)
|
||||
# Check if we have address data to sync
|
||||
# --- B. Address & VAT Sync ---
|
||||
ce_city = provisioning_data.get("address_city")
|
||||
ce_country = provisioning_data.get("address_country") # Assuming 'DE' code or similar
|
||||
ce_street = provisioning_data.get("address_street")
|
||||
ce_zip = provisioning_data.get("address_zip")
|
||||
ce_vat = provisioning_data.get("vat_id")
|
||||
|
||||
if ce_city or ce_vat:
|
||||
# Check if ANY address component is present
|
||||
if ce_city or ce_street or ce_zip:
|
||||
# Initialize Address object if missing
|
||||
if "Address" not in contact_data or contact_data["Address"] is None:
|
||||
contact_data["Address"] = {"Street": {}, "Postal": {}}
|
||||
|
||||
addr_obj = contact_data["Address"]
|
||||
if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {}
|
||||
if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {}
|
||||
|
||||
# Update Helper
|
||||
def update_addr_field(field_name, new_val, log_name):
|
||||
nonlocal dirty_standard
|
||||
if new_val:
|
||||
# Sync to both Postal and Street for best visibility
|
||||
for type_key in ["Postal", "Street"]:
|
||||
cur = addr_obj[type_key].get(field_name, "")
|
||||
if cur != new_val:
|
||||
logger.info(f"Change detected: {type_key} {log_name} '{cur}' -> '{new_val}'")
|
||||
addr_obj[type_key][field_name] = new_val
|
||||
dirty_standard = True
|
||||
|
||||
update_addr_field("City", ce_city, "City")
|
||||
update_addr_field("Address1", ce_street, "Street")
|
||||
update_addr_field("Zipcode", ce_zip, "Zip")
|
||||
|
||||
if ce_vat:
|
||||
# Field is 'OrgNr' in WebAPI, not 'OrgNumber'
|
||||
current_vat = contact_data.get("OrgNr", "")
|
||||
if current_vat != ce_vat:
|
||||
logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'")
|
||||
contact_data["OrgNr"] = ce_vat
|
||||
dirty_standard = True
|
||||
|
||||
# --- C. AI Openers Sync ---
|
||||
ce_opener = provisioning_data.get("opener")
|
||||
ce_opener_secondary = provisioning_data.get("opener_secondary")
|
||||
|
||||
if ce_opener and ce_opener != "null":
|
||||
current_opener = contact_data["UserDefinedFields"].get(settings.UDF_OPENER, "")
|
||||
if current_opener != ce_opener:
|
||||
logger.info("Change detected: Primary Opener")
|
||||
contact_data["UserDefinedFields"][settings.UDF_OPENER] = ce_opener
|
||||
dirty_udfs = True
|
||||
|
||||
if ce_opener_secondary and ce_opener_secondary != "null":
|
||||
current_opener_sec = contact_data["UserDefinedFields"].get(settings.UDF_OPENER_SECONDARY, "")
|
||||
if current_opener_sec != ce_opener_secondary:
|
||||
logger.info("Change detected: Secondary Opener")
|
||||
contact_data["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
|
||||
dirty_udfs = True
|
||||
|
||||
# --- D. Apply Updates (Single Transaction) ---
|
||||
if dirty_standard or dirty_udfs:
|
||||
logger.info(f"Pushing combined updates for Contact {contact_id} (Standard={dirty_standard}, UDFs={dirty_udfs})...")
|
||||
try:
|
||||
# Re-fetch contact to be safe (or use cached if optimal)
|
||||
contact_data = so_client.get_contact(contact_id)
|
||||
changed = False
|
||||
|
||||
# City (PostalAddress)
|
||||
if ce_city:
|
||||
# SuperOffice Address structure is complex. Simplified check on PostalAddress.
|
||||
# Address: { "PostalAddress": { "City": "..." } }
|
||||
current_city = contact_data.get("PostalAddress", {}).get("City", "")
|
||||
if current_city != ce_city:
|
||||
if "PostalAddress" not in contact_data: contact_data["PostalAddress"] = {}
|
||||
contact_data["PostalAddress"]["City"] = ce_city
|
||||
changed = True
|
||||
logger.info(f"Updating City: {current_city} -> {ce_city}")
|
||||
|
||||
# VAT (OrgNumber)
|
||||
if ce_vat:
|
||||
current_vat = contact_data.get("OrgNumber", "")
|
||||
if current_vat != ce_vat:
|
||||
contact_data["OrgNumber"] = ce_vat
|
||||
changed = True
|
||||
logger.info(f"Updating VAT: {current_vat} -> {ce_vat}")
|
||||
|
||||
if changed:
|
||||
logger.info(f"Pushing standard field updates for Contact {contact_id}...")
|
||||
so_client._put(f"Contact/{contact_id}", contact_data)
|
||||
|
||||
# We PUT the whole modified contact object back
|
||||
# This handles both standard fields and UDFs in one atomic-ish go
|
||||
so_client._put(f"Contact/{contact_id}", contact_data)
|
||||
logger.info("✅ Contact Update Successful.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to sync Address/VAT for Contact {contact_id}: {e}")
|
||||
logger.error(f"Failed to update Contact {contact_id}: {e}")
|
||||
raise
|
||||
else:
|
||||
logger.info("No changes needed for Contact.")
|
||||
|
||||
# 2c. Sync Website (Company Level)
|
||||
# 2d. Sync Person Position (Role) - if Person exists
|
||||
# TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue)
|
||||
# Re-enable via config if needed
|
||||
if settings.ENABLE_WEBSITE_SYNC:
|
||||
|
||||
Reference in New Issue
Block a user