This commit addresses the issue of duplicate jobs being created by the SuperOffice connector. The root cause was identified as a race condition where SuperOffice would send multiple webhooks in quick succession for the same entity, leading to multiple identical jobs in the queue. The solution involves several layers of improvement: 1. **Ingress De-duplication:** The now checks for existing jobs for the same entity *before* adding a new job to the queue. This is the primary fix and prevents duplicates at the source. 2. **DB Schema Enhancement:** The table schema in was extended with an column to allow for reliable and efficient checking of duplicate entities. 3. **Improved Logging:** The log messages in for job retries (e.g., when waiting for the Company Explorer) have been made more descriptive to avoid confusion and false alarms.
284 lines
12 KiB
Python
284 lines
12 KiB
Python
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
|
|
from fastapi.responses import HTMLResponse
|
|
import logging
|
|
import os
|
|
import json
|
|
from queue_manager import JobQueue
|
|
|
|
# Logging Setup
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("connector-webhook")
|
|
|
|
app = FastAPI(title="SuperOffice Connector Webhook", version="2.0")
|
|
queue = JobQueue()
|
|
|
|
WEBHOOK_TOKEN = os.getenv("WEBHOOK_TOKEN", "changeme")
|
|
|
|
@app.post("/webhook")
|
|
async def receive_webhook(request: Request, background_tasks: BackgroundTasks):
|
|
"""
|
|
Endpoint for SuperOffice Webhooks.
|
|
"""
|
|
# 1. Verify Secret (Basic Security)
|
|
# SuperOffice puts signature in headers, but for custom webhook we might just use query param or header
|
|
# Or simply a secret in the URL: /webhook?token=...
|
|
|
|
token = request.query_params.get("token")
|
|
if token != WEBHOOK_TOKEN:
|
|
logger.warning(f"Invalid webhook token attempt: {token}")
|
|
raise HTTPException(403, "Invalid Token")
|
|
|
|
try:
|
|
payload = await request.json()
|
|
logger.info(f"Received webhook payload: {payload}")
|
|
|
|
event_type = payload.get("Event", "unknown")
|
|
|
|
# --- DEDUPLICATION AT INGRESS (Added March 2026) ---
|
|
# Before adding a job, check if an identical one is already pending.
|
|
if queue.is_duplicate_pending(event_type, payload):
|
|
return {"status": "skipped_duplicate"}
|
|
|
|
# Add to local Queue
|
|
queue.add_job(event_type, payload)
|
|
|
|
return {"status": "queued"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing webhook: {e}", exc_info=True)
|
|
raise HTTPException(500, "Internal Server Error")
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok"}
|
|
|
|
@app.get("/stats")
|
|
def stats():
|
|
return queue.get_stats()
|
|
|
|
@app.get("/api/jobs")
|
|
def get_jobs():
|
|
return queue.get_recent_jobs(limit=100)
|
|
|
|
@app.get("/api/accounts")
|
|
def get_accounts():
|
|
return queue.get_account_summary(limit=500)
|
|
|
|
@app.get("/dashboard", response_class=HTMLResponse)
|
|
def dashboard():
|
|
html_content = """
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>Connector Dashboard</title>
|
|
<meta http-equiv="refresh" content="30">
|
|
<style>
|
|
body {
|
|
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
|
|
padding: 20px;
|
|
background: #0f172a;
|
|
color: #f1f5f9;
|
|
}
|
|
.container {
|
|
max-width: 1200px;
|
|
margin: 0 auto;
|
|
background: #1e293b;
|
|
padding: 24px;
|
|
border-radius: 12px;
|
|
box-shadow: 0 10px 15px -3px rgba(0, 0, 0, 0.3);
|
|
border: 1px solid #334155;
|
|
}
|
|
header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 24px; }
|
|
h1 { margin: 0; font-size: 24px; color: #f8fafc; }
|
|
|
|
.tabs { display: flex; gap: 8px; margin-bottom: 20px; border-bottom: 1px solid #334155; padding-bottom: 10px; }
|
|
.tab { padding: 8px 16px; cursor: pointer; border-radius: 6px; font-weight: 500; font-size: 14px; color: #94a3b8; transition: all 0.2s; }
|
|
.tab:hover { background: #334155; color: #f8fafc; }
|
|
.tab.active { background: #3b82f6; color: white; }
|
|
|
|
table { width: 100%; border-collapse: collapse; }
|
|
th, td { text-align: left; padding: 14px; border-bottom: 1px solid #334155; font-size: 14px; }
|
|
th { background-color: #1e293b; color: #94a3b8; font-weight: 600; text-transform: uppercase; font-size: 12px; letter-spacing: 0.5px; }
|
|
tr:hover { background-color: #334155; }
|
|
|
|
.status { padding: 4px 8px; border-radius: 6px; font-size: 11px; font-weight: 700; text-transform: uppercase; }
|
|
.status-PENDING { background: #334155; color: #cbd5e1; }
|
|
.status-PROCESSING { background: #1e40af; color: #bfdbfe; }
|
|
.status-COMPLETED { background: #064e3b; color: #a7f3d0; }
|
|
.status-FAILED { background: #7f1d1d; color: #fecaca; }
|
|
.status-SKIPPED { background: #475569; color: #cbd5e1; }
|
|
.status-DELETED { background: #7c2d12; color: #fdba74; }
|
|
|
|
.phases { display: flex; gap: 4px; align-items: center; }
|
|
.phase { width: 12px; height: 12px; border-radius: 50%; background: #334155; border: 2px solid #1e293b; box-shadow: 0 0 0 1px #334155; }
|
|
.phase.completed { background: #10b981; box-shadow: 0 0 0 1px #10b981; }
|
|
.phase.processing { background: #f59e0b; box-shadow: 0 0 0 1px #f59e0b; animation: pulse 1.5s infinite; }
|
|
.phase.failed { background: #ef4444; box-shadow: 0 0 0 1px #ef4444; }
|
|
|
|
@keyframes pulse { 0% { opacity: 1; } 50% { opacity: 0.4; } 100% { opacity: 1; } }
|
|
|
|
.meta { color: #94a3b8; font-size: 12px; display: block; margin-top: 4px; }
|
|
pre {
|
|
margin: 0;
|
|
white-space: pre-wrap;
|
|
word-break: break-word;
|
|
color: #cbd5e1;
|
|
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, monospace;
|
|
font-size: 11px;
|
|
max-height: 80px;
|
|
overflow-y: auto;
|
|
background: #0f172a;
|
|
padding: 10px;
|
|
border-radius: 6px;
|
|
border: 1px solid #334155;
|
|
}
|
|
|
|
.hidden { display: none; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="container">
|
|
<header>
|
|
<h1>🔌 SuperOffice Connector Dashboard</h1>
|
|
<div id="stats"></div>
|
|
</header>
|
|
|
|
<div class="tabs">
|
|
<div class="tab active" id="tab-accounts" onclick="switchTab('accounts')">Account View</div>
|
|
<div class="tab" id="tab-events" onclick="switchTab('events')">Event Log</div>
|
|
</div>
|
|
|
|
<div id="view-accounts">
|
|
<table>
|
|
<thead>
|
|
<tr>
|
|
<th>Account / Person</th>
|
|
<th width="100">Responsible</th>
|
|
<th width="120">ID</th>
|
|
<th width="150">Process Progress</th>
|
|
<th width="100">Duration</th>
|
|
<th width="120">Status</th>
|
|
<th width="150">Last Update</th>
|
|
<th>Details</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody id="account-table">
|
|
<tr><td colspan="8" style="text-align:center;">Loading Accounts...</td></tr>
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
|
|
<div id="view-events" class="hidden">
|
|
<table>
|
|
<thead>
|
|
<tr>
|
|
<th width="50">ID</th>
|
|
<th width="120">Status</th>
|
|
<th width="150">Updated</th>
|
|
<th width="150">Event</th>
|
|
<th>Payload / Error</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody id="event-table">
|
|
<tr><td colspan="5" style="text-align:center;">Loading Events...</td></tr>
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
</div>
|
|
|
|
<script>
|
|
let currentTab = 'accounts';
|
|
|
|
function switchTab(tab) {
|
|
currentTab = tab;
|
|
document.getElementById('tab-accounts').classList.toggle('active', tab === 'accounts');
|
|
document.getElementById('tab-events').classList.toggle('active', tab === 'events');
|
|
document.getElementById('view-accounts').classList.toggle('hidden', tab !== 'accounts');
|
|
document.getElementById('view-events').classList.toggle('hidden', tab !== 'events');
|
|
loadData();
|
|
}
|
|
|
|
async function loadData() {
|
|
if (currentTab === 'accounts') await loadAccounts();
|
|
else await loadEvents();
|
|
}
|
|
|
|
async function loadAccounts() {
|
|
try {
|
|
const response = await fetch('api/accounts');
|
|
const accounts = await response.json();
|
|
const tbody = document.getElementById('account-table');
|
|
tbody.innerHTML = '';
|
|
|
|
if (accounts.length === 0) {
|
|
tbody.innerHTML = '<tr><td colspan="8" style="text-align:center;">No accounts in process</td></tr>';
|
|
return;
|
|
}
|
|
|
|
accounts.sort((a,b) => new Date(b.updated_at) - new Date(a.updated_at));
|
|
|
|
accounts.forEach(acc => {
|
|
const tr = document.createElement('tr');
|
|
const phasesHtml = `
|
|
<div class="phases">
|
|
<div class="phase ${acc.phases.received}" title="Received"></div>
|
|
<div class="phase ${acc.phases.enriching}" title="Enriching (CE)"></div>
|
|
<div class="phase ${acc.phases.syncing}" title="Syncing (SO)"></div>
|
|
<div class="phase ${acc.phases.completed}" title="Completed"></div>
|
|
</div>
|
|
`;
|
|
|
|
tr.innerHTML = `
|
|
<td>
|
|
<strong>${acc.name}</strong>
|
|
<span class="meta">${acc.last_event}</span>
|
|
</td>
|
|
<td><span class="status status-PENDING" style="font-size: 10px;">👤 ${acc.associate || '---'}</span></td>
|
|
<td>${acc.id}</td>
|
|
<td>${phasesHtml}</td>
|
|
<td><span class="meta">${acc.duration || '0s'}</span></td>
|
|
<td><span class="status status-${acc.status}">${acc.status}</span></td>
|
|
<td>${new Date(acc.updated_at + "Z").toLocaleString('de-DE', { day: '2-digit', month: '2-digit', hour: '2-digit', minute: '2-digit' })}</td>
|
|
<td><pre>${acc.error_msg || 'No issues'}</pre></td>
|
|
`;
|
|
tbody.appendChild(tr);
|
|
});
|
|
} catch (e) { console.error("Failed to load accounts", e); }
|
|
}
|
|
|
|
async function loadEvents() {
|
|
try {
|
|
const response = await fetch('api/jobs');
|
|
const jobs = await response.json();
|
|
const tbody = document.getElementById('event-table');
|
|
tbody.innerHTML = '';
|
|
|
|
jobs.forEach(job => {
|
|
const tr = document.createElement('tr');
|
|
let details = JSON.stringify(job.payload, null, 2);
|
|
if (job.error_msg) details += "\\n\\n🔴 ERROR: " + job.error_msg;
|
|
|
|
tr.innerHTML = `
|
|
<td>#${job.id}</td>
|
|
<td><span class="status status-${job.status}">${job.status}</span></td>
|
|
<td>${new Date(job.updated_at + "Z").toLocaleString('de-DE', { day: '2-digit', month: '2-digit', hour: '2-digit', minute: '2-digit' })}</td>
|
|
<td>${job.event_type}</td>
|
|
<td><pre>${details}</pre></td>
|
|
`;
|
|
tbody.appendChild(tr);
|
|
});
|
|
} catch (e) { console.error("Failed to load events", e); }
|
|
}
|
|
|
|
loadData();
|
|
setInterval(loadData, 5000);
|
|
</script>
|
|
</body>
|
|
</html>
|
|
"""
|
|
return HTMLResponse(content=html_content, status_code=200)
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run("webhook_app:app", host="0.0.0.0", port=8000, reload=True)
|