[30388f42] feat: Add automated test infrastructure for core services
This commit is contained in:
@@ -8,3 +8,5 @@ fastapi
|
||||
uvicorn
|
||||
schedule
|
||||
sqlalchemy
|
||||
pytest
|
||||
httpx
|
||||
|
||||
3
connector-superoffice/run_tests.sh
Normal file
3
connector-superoffice/run_tests.sh
Normal file
@@ -0,0 +1,3 @@
|
||||
#!/bin/bash
|
||||
export PYTHONPATH=$PYTHONPATH:/app
|
||||
python3 -m pytest -v /app/tests/test_webhook.py
|
||||
46
connector-superoffice/tests/test_webhook.py
Normal file
46
connector-superoffice/tests/test_webhook.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from unittest.mock import MagicMock, patch
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Resolve paths
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
# Mock environment before importing app
|
||||
with patch.dict(os.environ, {"WEBHOOK_TOKEN": "test_secret_token"}):
|
||||
from webhook_app import app, queue
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
def test_health_check():
|
||||
response = client.get("/health")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "ok"}
|
||||
|
||||
def test_webhook_invalid_token():
|
||||
response = client.post("/webhook?token=wrong_token", json={"Event": "test"})
|
||||
assert response.status_code == 403
|
||||
|
||||
@patch("webhook_app.queue.add_job")
|
||||
def test_webhook_valid_token(mock_add_job):
|
||||
payload = {
|
||||
"Event": "contact.created",
|
||||
"PrimaryKey": 123,
|
||||
"Name": "Test Company"
|
||||
}
|
||||
|
||||
response = client.post("/webhook?token=test_secret_token", json=payload)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "queued"}
|
||||
|
||||
# Verify job was added to queue
|
||||
mock_add_job.assert_called_once_with("contact.created", payload)
|
||||
|
||||
def test_stats_endpoint():
|
||||
# Mock get_stats result
|
||||
with patch.object(queue, 'get_stats', return_value={"PENDING": 5, "COMPLETED": 10}):
|
||||
response = client.get("/stats")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"PENDING": 5, "COMPLETED": 10}
|
||||
Reference in New Issue
Block a user