Files
Brancheneinstufung2/lead-engine/trading_twins/test_dry_run.py

140 lines
5.9 KiB
Python

import unittest
from unittest.mock import patch, MagicMock
import time
import os
import logging
from datetime import datetime, timedelta
# Importiere unsere Module
from trading_twins.orchestrator import TradingTwinsOrchestrator
from trading_twins.manager import TradingTwinsManager
from trading_twins.models import init_db, ProposalJob, ProposedSlot
# Logging reduzieren
logging.basicConfig(level=logging.INFO)
class TestTradingTwinsDryRun(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Wir nutzen eine temporäre Test-Datenbank
cls.test_db_path = 'sqlite:///trading_twins/test_dry_run.db'
if os.path.exists("trading_twins/test_dry_run.db"):
os.remove("trading_twins/test_dry_run.db")
# Manager mit Test-DB initialisieren
cls.manager = TradingTwinsManager(db_path=cls.test_db_path)
def setUp(self):
# Orchestrator neu erstellen
self.orchestrator = TradingTwinsOrchestrator()
self.orchestrator.manager = self.manager # Inject Test Manager
# Timer drastisch verkürzen für Tests
self.orchestrator_timeout_patch = patch('trading_twins.orchestrator.TIMEOUT_SECONDS', 2)
self.orchestrator_timeout_patch.start()
def tearDown(self):
self.orchestrator_timeout_patch.stop()
@patch('trading_twins.orchestrator.send_email_via_graph')
@patch('trading_twins.orchestrator.send_approval_card')
def test_1_happy_path_timeout(self, mock_teams, mock_email):
"""Testet den automatischen Versand nach Timeout."""
print("\n--- TEST 1: Happy Path (Timeout -> Auto-Send) ---")
mock_teams.return_value = True
mock_email.return_value = True
# Lead verarbeiten
job_uuid = self.orchestrator.process_lead("test1@example.com", "Kunde Eins", "Firma A")
print(f"Job {job_uuid} gestartet. Warte auf Timeout (2s).")
time.sleep(3) # Warte länger als Timeout
# Prüfungen
mock_teams.assert_called_once()
mock_email.assert_called_once() # E-Mail muss versendet worden sein
status = self.manager.get_job_status(job_uuid)
self.assertEqual(status, 'sent')
print("✅ E-Mail wurde automatisch versendet.")
@patch('trading_twins.orchestrator.send_email_via_graph')
@patch('trading_twins.orchestrator.send_approval_card')
def test_2_manual_cancel(self, mock_teams, mock_email):
"""Testet den manuellen Abbruch."""
print("\n--- TEST 2: Manueller Abbruch (STOP) ---")
mock_teams.return_value = True
# Lead verarbeiten
job_uuid = self.orchestrator.process_lead("test2@example.com", "Kunde Zwei", "Firma B")
# Simuliere Klick auf "STOP" (direkter DB-Update wie API es tun würde)
print("Simuliere Klick auf 'STOP'...")
self.manager.update_job_status(job_uuid, 'cancelled')
print("Warte auf Timeout (2s).")
time.sleep(3)
# Prüfungen
mock_teams.assert_called_once()
mock_email.assert_not_called() # E-Mail darf NICHT versendet werden
status = self.manager.get_job_status(job_uuid)
self.assertEqual(status, 'cancelled')
print("✅ Abbruch erfolgreich, keine E-Mail gesendet.")
@patch('trading_twins.manager.TradingTwinsManager._mock_calendar_availability')
def test_3_overbooking_factor_3(self, mock_calendar):
"""
Testet die Faktor-3 Logik.
Wir stellen 4 Slots zur Verfügung.
Wir erzeugen 4 Leads.
Die ersten 3 sollten Slot A bekommen.
Der 4. Lead sollte Slot A NICHT mehr bekommen, sondern Slot B (oder andere).
"""
print("\n--- TEST 3: Überbuchungs-Logik (Faktor 3) ---")
# Setup Mock Calendar: Gibt immer dieselben 4 Slots zurück
tomorrow = datetime.now().date() + timedelta(days=1)
slot_a = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=10)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=10, minute=45))}
slot_b = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=11)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=11, minute=45))}
slot_c = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=14)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=14, minute=45))}
# Mock gibt diese Liste zurück
mock_calendar.return_value = [slot_a, slot_b, slot_c]
# Wir feuern 4 Leads ab
uuids = []
for i in range(1, 5):
uuid, slots = self.manager.create_proposal_job(f"bulk{i}@test.com", f"Bulk {i}", "Bulk Corp")
uuids.append((uuid, slots))
# print(f"Lead {i} bekam Slots: {[s['start'].strftime('%H:%M') for s in slots]}")
# Analyse
# Lead 1: Bekommt A, B (A hat Count 0 -> 1)
# Lead 2: Bekommt A, B (A hat Count 1 -> 2)
# Lead 3: Bekommt A, B (A hat Count 2 -> 3 -> VOLL)
# Lead 4: Sollte A NICHT bekommen, sondern B, C
slots_lead_1 = uuids[0][1]
slots_lead_4 = uuids[3][1]
start_time_lead_1_first_slot = slots_lead_1[0]['start']
start_time_lead_4_first_slot = slots_lead_4[0]['start']
print(f"Lead 1 Slot 1: {start_time_lead_1_first_slot}")
print(f"Lead 4 Slot 1: {start_time_lead_4_first_slot}")
if start_time_lead_1_first_slot != start_time_lead_4_first_slot:
print("✅ Faktor-3 Logik greift: Lead 4 hat einen anderen Start-Slot bekommen!")
else:
print("❌ Faktor-3 Logik fehlgeschlagen: Lead 4 hat denselben Slot bekommen.")
# Debug
session = self.manager.Session()
count = session.query(ProposedSlot).filter(ProposedSlot.start_time == start_time_lead_1_first_slot).count()
print(f"Total entries for Slot A: {count}")
session.close()
if __name__ == '__main__':
unittest.main()