83 lines
2.8 KiB
Python
83 lines
2.8 KiB
Python
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from unittest.mock import MagicMock, patch
|
|
import os
|
|
import sys
|
|
|
|
# Resolve paths
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if BASE_DIR not in sys.path:
|
|
sys.path.append(BASE_DIR)
|
|
|
|
from trading_twins.manager import app, SessionLocal
|
|
from trading_twins.models import Base, ProposalJob
|
|
|
|
# --- Test Database Setup ---
|
|
# Use a local file for testing to ensure visibility across sessions
|
|
TEST_DB_FILE = "test_trading_twins.db"
|
|
SQLALCHEMY_DATABASE_URL = f"sqlite:///./{TEST_DB_FILE}"
|
|
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
|
|
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def setup_database():
|
|
# Create tables
|
|
Base.metadata.create_all(bind=engine)
|
|
yield
|
|
# Cleanup
|
|
Base.metadata.drop_all(bind=engine)
|
|
if os.path.exists(TEST_DB_FILE):
|
|
os.remove(TEST_DB_FILE)
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
# Patch the SessionLocal in manager.py to use our test DB
|
|
with patch("trading_twins.manager.SessionLocal", TestingSessionLocal):
|
|
with TestClient(app) as c:
|
|
yield c
|
|
|
|
def test_trigger_test_lead(client):
|
|
with patch("trading_twins.manager.process_lead") as mock_process:
|
|
response = client.get("/test_lead")
|
|
assert response.status_code == 202
|
|
assert "Test lead triggered" in response.json()["status"]
|
|
mock_process.assert_called()
|
|
|
|
def test_stop_job(client):
|
|
db = TestingSessionLocal()
|
|
job = ProposalJob(job_uuid="test_uuid_1", customer_email="test@example.com", status="pending")
|
|
db.add(job)
|
|
db.commit()
|
|
db.close()
|
|
|
|
response = client.get("/stop/test_uuid_1")
|
|
assert response.status_code == 200
|
|
assert response.text == "Gestoppt."
|
|
|
|
db = TestingSessionLocal()
|
|
updated_job = db.query(ProposalJob).filter(ProposalJob.job_uuid == "test_uuid_1").first()
|
|
assert updated_job.status == "cancelled"
|
|
db.close()
|
|
|
|
def test_send_now(client):
|
|
db = TestingSessionLocal()
|
|
job = ProposalJob(job_uuid="test_uuid_2", customer_email="test@example.com", status="pending")
|
|
db.add(job)
|
|
db.commit()
|
|
db.close()
|
|
|
|
response = client.get("/send_now/test_uuid_2")
|
|
assert response.status_code == 200
|
|
assert response.text == "Wird gesendet."
|
|
|
|
db = TestingSessionLocal()
|
|
updated_job = db.query(ProposalJob).filter(ProposalJob.job_uuid == "test_uuid_2").first()
|
|
assert updated_job.status == "send_now"
|
|
db.close()
|
|
|
|
def test_book_slot_fail_no_job(client):
|
|
response = client.get("/book_slot/invalid_uuid/123456789")
|
|
assert response.status_code == 400
|
|
assert response.text == "Fehler." |