140 lines
5.9 KiB
Python
140 lines
5.9 KiB
Python
import unittest
|
|
from unittest.mock import patch, MagicMock
|
|
import time
|
|
import os
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
# Importiere unsere Module
|
|
from trading_twins.orchestrator import TradingTwinsOrchestrator
|
|
from trading_twins.manager import TradingTwinsManager
|
|
from trading_twins.models import init_db, ProposalJob, ProposedSlot
|
|
|
|
# Logging reduzieren
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
class TestTradingTwinsDryRun(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
# Wir nutzen eine temporäre Test-Datenbank
|
|
cls.test_db_path = 'sqlite:///trading_twins/test_dry_run.db'
|
|
if os.path.exists("trading_twins/test_dry_run.db"):
|
|
os.remove("trading_twins/test_dry_run.db")
|
|
|
|
# Manager mit Test-DB initialisieren
|
|
cls.manager = TradingTwinsManager(db_path=cls.test_db_path)
|
|
|
|
def setUp(self):
|
|
# Orchestrator neu erstellen
|
|
self.orchestrator = TradingTwinsOrchestrator()
|
|
self.orchestrator.manager = self.manager # Inject Test Manager
|
|
|
|
# Timer drastisch verkürzen für Tests
|
|
self.orchestrator_timeout_patch = patch('trading_twins.orchestrator.TIMEOUT_SECONDS', 2)
|
|
self.orchestrator_timeout_patch.start()
|
|
|
|
def tearDown(self):
|
|
self.orchestrator_timeout_patch.stop()
|
|
|
|
@patch('trading_twins.orchestrator.send_email_via_graph')
|
|
@patch('trading_twins.orchestrator.send_approval_card')
|
|
def test_1_happy_path_timeout(self, mock_teams, mock_email):
|
|
"""Testet den automatischen Versand nach Timeout."""
|
|
print("\n--- TEST 1: Happy Path (Timeout -> Auto-Send) ---")
|
|
mock_teams.return_value = True
|
|
mock_email.return_value = True
|
|
|
|
# Lead verarbeiten
|
|
job_uuid = self.orchestrator.process_lead("test1@example.com", "Kunde Eins", "Firma A")
|
|
|
|
print(f"Job {job_uuid} gestartet. Warte auf Timeout (2s).")
|
|
time.sleep(3) # Warte länger als Timeout
|
|
|
|
# Prüfungen
|
|
mock_teams.assert_called_once()
|
|
mock_email.assert_called_once() # E-Mail muss versendet worden sein
|
|
|
|
status = self.manager.get_job_status(job_uuid)
|
|
self.assertEqual(status, 'sent')
|
|
print("✅ E-Mail wurde automatisch versendet.")
|
|
|
|
@patch('trading_twins.orchestrator.send_email_via_graph')
|
|
@patch('trading_twins.orchestrator.send_approval_card')
|
|
def test_2_manual_cancel(self, mock_teams, mock_email):
|
|
"""Testet den manuellen Abbruch."""
|
|
print("\n--- TEST 2: Manueller Abbruch (STOP) ---")
|
|
mock_teams.return_value = True
|
|
|
|
# Lead verarbeiten
|
|
job_uuid = self.orchestrator.process_lead("test2@example.com", "Kunde Zwei", "Firma B")
|
|
|
|
# Simuliere Klick auf "STOP" (direkter DB-Update wie API es tun würde)
|
|
print("Simuliere Klick auf 'STOP'...")
|
|
self.manager.update_job_status(job_uuid, 'cancelled')
|
|
|
|
print("Warte auf Timeout (2s).")
|
|
time.sleep(3)
|
|
|
|
# Prüfungen
|
|
mock_teams.assert_called_once()
|
|
mock_email.assert_not_called() # E-Mail darf NICHT versendet werden
|
|
|
|
status = self.manager.get_job_status(job_uuid)
|
|
self.assertEqual(status, 'cancelled')
|
|
print("✅ Abbruch erfolgreich, keine E-Mail gesendet.")
|
|
|
|
@patch('trading_twins.manager.TradingTwinsManager._mock_calendar_availability')
|
|
def test_3_overbooking_factor_3(self, mock_calendar):
|
|
"""
|
|
Testet die Faktor-3 Logik.
|
|
Wir stellen 4 Slots zur Verfügung.
|
|
Wir erzeugen 4 Leads.
|
|
Die ersten 3 sollten Slot A bekommen.
|
|
Der 4. Lead sollte Slot A NICHT mehr bekommen, sondern Slot B (oder andere).
|
|
"""
|
|
print("\n--- TEST 3: Überbuchungs-Logik (Faktor 3) ---")
|
|
|
|
# Setup Mock Calendar: Gibt immer dieselben 4 Slots zurück
|
|
tomorrow = datetime.now().date() + timedelta(days=1)
|
|
slot_a = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=10)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=10, minute=45))}
|
|
slot_b = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=11)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=11, minute=45))}
|
|
slot_c = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=14)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=14, minute=45))}
|
|
|
|
# Mock gibt diese Liste zurück
|
|
mock_calendar.return_value = [slot_a, slot_b, slot_c]
|
|
|
|
# Wir feuern 4 Leads ab
|
|
uuids = []
|
|
for i in range(1, 5):
|
|
uuid, slots = self.manager.create_proposal_job(f"bulk{i}@test.com", f"Bulk {i}", "Bulk Corp")
|
|
uuids.append((uuid, slots))
|
|
# print(f"Lead {i} bekam Slots: {[s['start'].strftime('%H:%M') for s in slots]}")
|
|
|
|
# Analyse
|
|
# Lead 1: Bekommt A, B (A hat Count 0 -> 1)
|
|
# Lead 2: Bekommt A, B (A hat Count 1 -> 2)
|
|
# Lead 3: Bekommt A, B (A hat Count 2 -> 3 -> VOLL)
|
|
# Lead 4: Sollte A NICHT bekommen, sondern B, C
|
|
|
|
slots_lead_1 = uuids[0][1]
|
|
slots_lead_4 = uuids[3][1]
|
|
|
|
start_time_lead_1_first_slot = slots_lead_1[0]['start']
|
|
start_time_lead_4_first_slot = slots_lead_4[0]['start']
|
|
|
|
print(f"Lead 1 Slot 1: {start_time_lead_1_first_slot}")
|
|
print(f"Lead 4 Slot 1: {start_time_lead_4_first_slot}")
|
|
|
|
if start_time_lead_1_first_slot != start_time_lead_4_first_slot:
|
|
print("✅ Faktor-3 Logik greift: Lead 4 hat einen anderen Start-Slot bekommen!")
|
|
else:
|
|
print("❌ Faktor-3 Logik fehlgeschlagen: Lead 4 hat denselben Slot bekommen.")
|
|
# Debug
|
|
session = self.manager.Session()
|
|
count = session.query(ProposedSlot).filter(ProposedSlot.start_time == start_time_lead_1_first_slot).count()
|
|
print(f"Total entries for Slot A: {count}")
|
|
session.close()
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|