feat: Implement Trading Twins Autopilot with Teams integration and फैक्टर-3 overbooking logic [31988f42]

This commit is contained in:
2026-03-04 08:22:28 +00:00
parent 16f80fc81b
commit 6b89c68edc
14 changed files with 849 additions and 25 deletions

View File

View File

@@ -0,0 +1,58 @@
from flask import Flask, request, jsonify, render_template_string
from .manager import TradingTwinsManager
app = Flask(__name__)
manager = TradingTwinsManager()
# Einfaches HTML-Template für Feedback
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<title>Trading Twins Status</title>
<style>
body { font-family: sans-serif; text-align: center; padding: 50px; }
.success { color: green; font-size: 24px; }
.cancelled { color: red; font-size: 24px; }
.info { color: gray; margin-top: 20px; }
</style>
</head>
<body>
<h1>{title}</h1>
<p class="{status_class}">{message}</p>
<p class="info">Job ID: {job_uuid}</p>
</body>
</html>
"""
@app.route('/action/approve/<job_uuid>', methods=['GET'])
def approve_job(job_uuid):
current_status = manager.get_job_status(job_uuid)
if not current_status:
return render_template_string(HTML_TEMPLATE, title="Fehler", status_class="cancelled", message="Job nicht gefunden.", job_uuid=job_uuid), 404
if current_status == 'pending':
manager.update_job_status(job_uuid, 'approved')
# TODO: Hier würde der E-Mail-Versand sofort getriggert werden (Phase 3)
return render_template_string(HTML_TEMPLATE, title="Erfolg", status_class="success", message="✅ E-Mail wird jetzt versendet!", job_uuid=job_uuid)
elif current_status == 'approved':
return render_template_string(HTML_TEMPLATE, title="Info", status_class="success", message="⚠️ Job wurde bereits genehmigt.", job_uuid=job_uuid)
else:
return render_template_string(HTML_TEMPLATE, title="Info", status_class="cancelled", message=f"Job-Status ist bereits: {current_status}", job_uuid=job_uuid)
@app.route('/action/cancel/<job_uuid>', methods=['GET'])
def cancel_job(job_uuid):
current_status = manager.get_job_status(job_uuid)
if not current_status:
return render_template_string(HTML_TEMPLATE, title="Fehler", status_class="cancelled", message="Job nicht gefunden.", job_uuid=job_uuid), 404
if current_status == 'pending':
manager.update_job_status(job_uuid, 'cancelled')
return render_template_string(HTML_TEMPLATE, title="Abbruch", status_class="cancelled", message="❌ E-Mail-Versand gestoppt.", job_uuid=job_uuid)
else:
return render_template_string(HTML_TEMPLATE, title="Info", status_class="info", message=f"Job konnte nicht gestoppt werden (Status: {current_status}).", job_uuid=job_uuid)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8004)

View File

@@ -0,0 +1,85 @@
import os
import msal
import requests
import base64
# Graph API Konfiguration (aus .env laden)
CLIENT_ID = os.getenv("AZURE_CLIENT_ID")
CLIENT_SECRET = os.getenv("AZURE_CLIENT_SECRET")
TENANT_ID = os.getenv("AZURE_TENANT_ID")
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPE = ["https://graph.microsoft.com/.default"]
SENDER_EMAIL = "info@robo-planet.de"
def get_access_token():
"""Holt ein Token für die Graph API."""
app = msal.ConfidentialClientApplication(
CLIENT_ID, authority=AUTHORITY, client_credential=CLIENT_SECRET
)
result = app.acquire_token_for_client(scopes=SCOPE)
if "access_token" in result:
return result["access_token"]
else:
raise Exception(f"Fehler beim Abrufen des Tokens: {result.get('error_description')}")
def send_email_via_graph(to_email, subject, body_html, banner_path=None):
"""
Sendet eine E-Mail über die Microsoft Graph API.
"""
token = get_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# E-Mail Struktur für Graph API
email_msg = {
"message": {
"subject": subject,
"body": {
"contentType": "HTML",
"content": body_html
},
"toRecipients": [
{
"emailAddress": {
"address": to_email
}
}
],
"from": {
"emailAddress": {
"address": SENDER_EMAIL
}
}
},
"saveToSentItems": "true"
}
# Optional: Banner-Bild als Inline-Attachment einfügen
if banner_path and os.path.exists(banner_path):
with open(banner_path, "rb") as f:
content_bytes = f.read()
content_b64 = base64.b64encode(content_bytes).decode("utf-8")
email_msg["message"]["attachments"] = [
{
"@odata.type": "#microsoft.graph.fileAttachment",
"name": "banner.png",
"contentBytes": content_b64,
"isInline": True,
"contentId": "banner_image"
}
]
endpoint = f"https://graph.microsoft.com/v1.0/users/{SENDER_EMAIL}/sendMail"
response = requests.post(endpoint, headers=headers, json=email_msg)
if response.status_code == 202:
print(f"E-Mail erfolgreich an {to_email} gesendet.")
return True
else:
print(f"Fehler beim Senden: {response.status_code} - {response.text}")
return False

View File

@@ -0,0 +1,133 @@
import datetime
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker
from .models import ProposalJob, ProposedSlot, Base
import uuid
# Konfiguration
DB_PATH = 'sqlite:///trading_twins/trading_twins.db'
MAX_PROPOSALS_PER_SLOT = 3 # Aggressiver Faktor 3
class TradingTwinsManager:
def __init__(self, db_path=DB_PATH):
self.engine = create_engine(db_path)
self.Session = sessionmaker(bind=self.engine)
Base.metadata.create_all(self.engine)
def create_proposal_job(self, customer_email, customer_name, customer_company):
"""Erstellt einen neuen Job, sucht Slots und speichert alles."""
session = self.Session()
try:
# 1. Freie Slots finden (Mock für jetzt)
# Später: real_slots = self.fetch_calendar_availability()
candidate_slots = self._mock_calendar_availability()
# 2. Beste Slots auswählen (mit Overbooking-Check)
selected_slots = self._select_best_slots(session, candidate_slots)
if not selected_slots:
# Fallback: Wenn alles "voll" ist (sehr unwahrscheinlich bei Faktor 3),
# nehmen wir trotzdem den am wenigsten gebuchten Slot.
selected_slots = candidate_slots[:2]
# 3. Job anlegen
job_uuid = str(uuid.uuid4())
new_job = ProposalJob(
job_uuid=job_uuid,
customer_email=customer_email,
customer_name=customer_name,
customer_company=customer_company,
status='pending'
)
session.add(new_job)
session.flush() # ID generieren
# 4. Slots speichern
for slot in selected_slots:
new_slot = ProposedSlot(
job_id=new_job.id,
start_time=slot['start'],
end_time=slot['end']
)
session.add(new_slot)
session.commit()
return new_job.job_uuid, selected_slots
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def _select_best_slots(self, session, candidate_slots):
"""Wählt Slots aus, die noch nicht 'voll' sind (Faktor 3)."""
valid_slots = []
# Wir betrachten nur Vorschläge der letzten 24h als "aktiv"
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
for slot in candidate_slots:
# Wie oft wurde dieser Start-Zeitpunkt in den letzten 24h vorgeschlagen?
count = session.query(func.count(ProposedSlot.id)).filter(ProposedSlot.start_time == slot['start']).filter(ProposedSlot.job.has(ProposalJob.created_at >= yesterday)).scalar()
if count < MAX_PROPOSALS_PER_SLOT:
valid_slots.append(slot)
if len(valid_slots) >= 2:
break
return valid_slots
def _mock_calendar_availability(self):
"""Simuliert freie Termine für morgen."""
tomorrow = datetime.date.today() + datetime.timedelta(days=1)
# Ein Slot Vormittags (10:30), einer Nachmittags (14:00)
return [
{
'start': datetime.datetime.combine(tomorrow, datetime.time(10, 30)),
'end': datetime.datetime.combine(tomorrow, datetime.time(11, 15))
},
{
'start': datetime.datetime.combine(tomorrow, datetime.time(14, 0)),
'end': datetime.datetime.combine(tomorrow, datetime.time(14, 45))
}
]
def get_job_status(self, job_uuid):
session = self.Session()
job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first()
status = job.status if job else None
session.close()
return status
def get_job_details(self, job_uuid):
"""Holt alle Details zu einem Job inklusive der Slots."""
session = self.Session()
job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first()
if not job:
session.close()
return None
# Wir müssen die Daten extrahieren, bevor die Session geschlossen wird
details = {
'uuid': job.job_uuid,
'email': job.customer_email,
'name': job.customer_name,
'company': job.customer_company,
'status': job.status,
'slots': [{'start': s.start_time, 'end': s.end_time} for s in job.slots]
}
session.close()
return details
def update_job_status(self, job_uuid, new_status):
session = self.Session()
job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first()
if job:
job.status = new_status
if new_status == 'approved':
job.approved_at = datetime.datetime.now()
session.commit()
session.close()

View File

@@ -0,0 +1,45 @@
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
from datetime import datetime
Base = declarative_base()
class ProposalJob(Base):
__tablename__ = 'proposal_jobs'
id = Column(Integer, primary_key=True)
job_uuid = Column(String, unique=True, nullable=False) # Für die API-Links
customer_email = Column(String, nullable=False)
customer_name = Column(String, nullable=True)
customer_company = Column(String, nullable=True)
# Status-Tracking
status = Column(String, default='pending') # pending, approved, rejected, sent, failed
# Audit-Trail
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
approved_at = Column(DateTime, nullable=True)
# Verknüpfung zu den vorgeschlagenen Slots
slots = relationship("ProposedSlot", back_populates="job")
class ProposedSlot(Base):
__tablename__ = 'proposed_slots'
id = Column(Integer, primary_key=True)
job_id = Column(Integer, ForeignKey('proposal_jobs.id'))
start_time = Column(DateTime, nullable=False)
end_time = Column(DateTime, nullable=False)
# Wir brauchen kein 'is_blocked' Flag mehr, da wir dynamisch zählen,
# wie oft 'start_time' in den letzten 24h verwendet wurde.
job = relationship("ProposalJob", back_populates="slots")
# DB Setup Helper
def init_db(db_path='sqlite:///trading_twins/trading_twins.db'):
engine = create_engine(db_path)
Base.metadata.create_all(engine)
return sessionmaker(bind=engine)

View File

@@ -0,0 +1,130 @@
import time
import threading
import logging
import datetime
from .manager import TradingTwinsManager
from .teams_notification import send_approval_card
from .email_sender import send_email_via_graph
import os
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("TradingTwinsOrchestrator")
TIMEOUT_SECONDS = 300 # 5 Minuten
SIGNATURE_FILE = "trading_twins/signature.html"
BANNER_IMAGE = "trading_twins/RoboPlanetBannerWebinarEinladung.png"
class TradingTwinsOrchestrator:
def __init__(self):
self.manager = TradingTwinsManager()
def process_lead(self, customer_email, customer_name, customer_company):
"""
Startet den gesamten Prozess für einen Lead.
"""
logger.info(f"Neuer Lead eingegangen: {customer_email}")
# 1. Job und Slots erstellen (mit Faktor-3 Logik)
job_uuid, slots = self.manager.create_proposal_job(
customer_email, customer_name, customer_company
)
logger.info(f"Job erstellt: {job_uuid}. Slots: {slots}")
# 2. Teams Benachrichtigung senden
# Formatieren der Uhrzeit für die Teams-Nachricht
send_time = (datetime.datetime.now() + datetime.timedelta(seconds=TIMEOUT_SECONDS)).strftime("%H:%M")
success = send_approval_card(job_uuid, customer_name, send_time)
if not success:
logger.error("Konnte Teams-Benachrichtigung nicht senden!")
# Fallback? Trotzdem Timer starten oder abbrechen?
# Wir machen weiter, da E-Mail-Versand Priorität hat.
# 3. Timer starten für automatischen Versand
timer = threading.Timer(TIMEOUT_SECONDS, self._check_timeout, args=[job_uuid])
timer.start()
return job_uuid
def _check_timeout(self, job_uuid):
"""
Wird nach Ablauf des Timers aufgerufen.
Prüft den Status und sendet ggf. automatisch.
"""
logger.info(f"Timer abgelaufen für Job {job_uuid}. Prüfe Status...")
current_status = self.manager.get_job_status(job_uuid)
if current_status == 'pending':
logger.info(f"Job {job_uuid} ist noch 'pending'. Löse automatischen Versand aus.")
self._trigger_email_send(job_uuid)
elif current_status == 'approved':
logger.info(f"Job {job_uuid} wurde bereits manuell genehmigt.")
elif current_status == 'cancelled':
logger.info(f"Job {job_uuid} wurde manuell abgebrochen.")
else:
logger.warning(f"Unbekannter Status für Job {job_uuid}: {current_status}")
def _trigger_email_send(self, job_uuid):
"""
Hier wird der tatsächliche E-Mail-Versand angestoßen.
"""
# Job Details laden
job_details = self.manager.get_job_details(job_uuid)
if not job_details:
logger.error(f"Konnte Job {job_uuid} nicht finden!")
return
# E-Mail Body zusammenbauen
try:
with open(SIGNATURE_FILE, "r") as f:
signature_html = f.read()
except FileNotFoundError:
logger.warning("Signatur-Datei nicht gefunden!")
signature_html = "<br>Viele Grüße<br>Ihr RoboPlanet Team"
# Dynamische Terminvorschläge formatieren
slots_text = ""
for slot in job_details['slots']:
# Format: "Morgen, 14:00 Uhr" oder Datum
start = slot['start']
slots_text += f"<li>{start.strftime('%d.%m.%Y um %H:%M Uhr')}</li>"
email_body = f"""
<html>
<body>
<p>Hallo {job_details['name']},</p>
<p>vielen Dank für Ihr Interesse an Trading Twins.</p>
<p>Gerne würde ich Ihnen in einem kurzen Gespräch (ca. 15-30 Min) zeigen, wie wir Sie unterstützen können.</p>
<p>Hätten Sie an einem dieser Termine Zeit?</p>
<ul>
{slots_text}
</ul>
<p>Ich freue mich auf Ihre Rückmeldung.</p>
{signature_html}
</body>
</html>
"""
# Banner Check
banner_path = BANNER_IMAGE if os.path.exists(BANNER_IMAGE) else None
# Senden
success = send_email_via_graph(
to_email=job_details['email'],
subject="Ihr Termin für Trading Twins",
body_html=email_body,
banner_path=banner_path
)
if success:
logger.info(f"🚀 E-MAIL WURDE VERSENDET für Job {job_uuid}")
self.manager.update_job_status(job_uuid, 'sent')
else:
logger.error(f"❌ Fehler beim E-Mail-Versand für Job {job_uuid}")
self.manager.update_job_status(job_uuid, 'failed')
if __name__ == "__main__":
# Test-Lauf
orchestrator = TradingTwinsOrchestrator()
orchestrator.process_lead("test@example.com", "Max Mustermann", "Musterfirma GmbH")

View File

@@ -0,0 +1,27 @@
<br>
<div style="font-family: Arial, sans-serif; font-size: 14px; color: #333;">
<p>Freundliche Grüße<br>
<strong>Elizabeta Melcer</strong><br>
Inside Sales Managerin</p>
<p>
<strong>RoboPlanet GmbH</strong><br>
Schatzbogen 39, 81829 München<br>
T: +49 89 420490-402 | M: +49 175 8334071<br>
<a href="mailto:e.melcer@robo-planet.de">e.melcer@robo-planet.de</a> | <a href="http://www.robo-planet.de">www.robo-planet.de</a>
</p>
<p>
<a href="#">LinkedIn</a> | <a href="#">Instagram</a> | <a href="#">Newsletteranmeldung</a>
</p>
<p style="font-size: 10px; color: #777;">
Sitz der Gesellschaft München | Geschäftsführung: Axel Banoth<br>
Registergericht AG München, HRB 296113 | USt.-IdNr. DE400464410<br>
<a href="#">Hinweispflichten zum Datenschutz</a>
</p>
<!-- Platzhalter für das Bild -->
<img src="https://robo-planet.de/wp-content/uploads/2024/01/RoboPlanet_Logo.png" alt="RoboPlanet Logo" width="150"><br>
<img src="cid:banner_image" alt="Webinar Einladung" width="400">
</div>

View File

@@ -0,0 +1,70 @@
import requests
import json
import os
from datetime import datetime
# Default-Webhook (Platzhalter) - sollte in .env stehen
DEFAULT_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL", "")
def send_approval_card(job_uuid, customer_name, time_string, webhook_url=DEFAULT_WEBHOOK_URL):
"""
Sendet eine Adaptive Card an Teams mit Approve/Deny Buttons.
"""
# Die URL unserer API (muss von außen erreichbar sein, z.B. via ngrok oder Server-IP)
api_base_url = os.getenv("API_BASE_URL", "http://localhost:8004")
card_payload = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"contentUrl": None,
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.4",
"body": [
{
"type": "TextBlock",
"text": f"🤖 Automatisierte E-Mail an {customer_name}",
"weight": "Bolder",
"size": "Medium"
},
{
"type": "TextBlock",
"text": f"(via Trading Twins) wird um {time_string} Uhr ausgesendet.",
"isSubtle": True,
"wrap": True
},
{
"type": "TextBlock",
"text": "Wenn Du bis dahin NICHT reagierst, wird die E-Mail automatisch gesendet.",
"color": "Attention",
"wrap": True
}
],
"actions": [
{
"type": "Action.OpenUrl",
"title": "✅ JETZT Aussenden",
"url": f"{api_base_url}/action/approve/{job_uuid}"
},
{
"type": "Action.OpenUrl",
"title": "❌ STOP Aussendung",
"url": f"{api_base_url}/action/cancel/{job_uuid}"
}
]
}
}
]
}
try:
response = requests.post(webhook_url, json=card_payload)
response.raise_for_status()
return True
except Exception as e:
print(f"Fehler beim Senden an Teams: {e}")
return False

View File

@@ -0,0 +1,139 @@
import unittest
from unittest.mock import patch, MagicMock
import time
import os
import logging
from datetime import datetime, timedelta
# Importiere unsere Module
from trading_twins.orchestrator import TradingTwinsOrchestrator
from trading_twins.manager import TradingTwinsManager
from trading_twins.models import init_db, ProposalJob, ProposedSlot
# Logging reduzieren
logging.basicConfig(level=logging.INFO)
class TestTradingTwinsDryRun(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Wir nutzen eine temporäre Test-Datenbank
cls.test_db_path = 'sqlite:///trading_twins/test_dry_run.db'
if os.path.exists("trading_twins/test_dry_run.db"):
os.remove("trading_twins/test_dry_run.db")
# Manager mit Test-DB initialisieren
cls.manager = TradingTwinsManager(db_path=cls.test_db_path)
def setUp(self):
# Orchestrator neu erstellen
self.orchestrator = TradingTwinsOrchestrator()
self.orchestrator.manager = self.manager # Inject Test Manager
# Timer drastisch verkürzen für Tests
self.orchestrator_timeout_patch = patch('trading_twins.orchestrator.TIMEOUT_SECONDS', 2)
self.orchestrator_timeout_patch.start()
def tearDown(self):
self.orchestrator_timeout_patch.stop()
@patch('trading_twins.orchestrator.send_email_via_graph')
@patch('trading_twins.orchestrator.send_approval_card')
def test_1_happy_path_timeout(self, mock_teams, mock_email):
"""Testet den automatischen Versand nach Timeout."""
print("\n--- TEST 1: Happy Path (Timeout -> Auto-Send) ---")
mock_teams.return_value = True
mock_email.return_value = True
# Lead verarbeiten
job_uuid = self.orchestrator.process_lead("test1@example.com", "Kunde Eins", "Firma A")
print(f"Job {job_uuid} gestartet. Warte auf Timeout (2s).")
time.sleep(3) # Warte länger als Timeout
# Prüfungen
mock_teams.assert_called_once()
mock_email.assert_called_once() # E-Mail muss versendet worden sein
status = self.manager.get_job_status(job_uuid)
self.assertEqual(status, 'sent')
print("✅ E-Mail wurde automatisch versendet.")
@patch('trading_twins.orchestrator.send_email_via_graph')
@patch('trading_twins.orchestrator.send_approval_card')
def test_2_manual_cancel(self, mock_teams, mock_email):
"""Testet den manuellen Abbruch."""
print("\n--- TEST 2: Manueller Abbruch (STOP) ---")
mock_teams.return_value = True
# Lead verarbeiten
job_uuid = self.orchestrator.process_lead("test2@example.com", "Kunde Zwei", "Firma B")
# Simuliere Klick auf "STOP" (direkter DB-Update wie API es tun würde)
print("Simuliere Klick auf 'STOP'...")
self.manager.update_job_status(job_uuid, 'cancelled')
print("Warte auf Timeout (2s).")
time.sleep(3)
# Prüfungen
mock_teams.assert_called_once()
mock_email.assert_not_called() # E-Mail darf NICHT versendet werden
status = self.manager.get_job_status(job_uuid)
self.assertEqual(status, 'cancelled')
print("✅ Abbruch erfolgreich, keine E-Mail gesendet.")
@patch('trading_twins.manager.TradingTwinsManager._mock_calendar_availability')
def test_3_overbooking_factor_3(self, mock_calendar):
"""
Testet die Faktor-3 Logik.
Wir stellen 4 Slots zur Verfügung.
Wir erzeugen 4 Leads.
Die ersten 3 sollten Slot A bekommen.
Der 4. Lead sollte Slot A NICHT mehr bekommen, sondern Slot B (oder andere).
"""
print("\n--- TEST 3: Überbuchungs-Logik (Faktor 3) ---")
# Setup Mock Calendar: Gibt immer dieselben 4 Slots zurück
tomorrow = datetime.now().date() + timedelta(days=1)
slot_a = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=10)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=10, minute=45))}
slot_b = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=11)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=11, minute=45))}
slot_c = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=14)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=14, minute=45))}
# Mock gibt diese Liste zurück
mock_calendar.return_value = [slot_a, slot_b, slot_c]
# Wir feuern 4 Leads ab
uuids = []
for i in range(1, 5):
uuid, slots = self.manager.create_proposal_job(f"bulk{i}@test.com", f"Bulk {i}", "Bulk Corp")
uuids.append((uuid, slots))
# print(f"Lead {i} bekam Slots: {[s['start'].strftime('%H:%M') for s in slots]}")
# Analyse
# Lead 1: Bekommt A, B (A hat Count 0 -> 1)
# Lead 2: Bekommt A, B (A hat Count 1 -> 2)
# Lead 3: Bekommt A, B (A hat Count 2 -> 3 -> VOLL)
# Lead 4: Sollte A NICHT bekommen, sondern B, C
slots_lead_1 = uuids[0][1]
slots_lead_4 = uuids[3][1]
start_time_lead_1_first_slot = slots_lead_1[0]['start']
start_time_lead_4_first_slot = slots_lead_4[0]['start']
print(f"Lead 1 Slot 1: {start_time_lead_1_first_slot}")
print(f"Lead 4 Slot 1: {start_time_lead_4_first_slot}")
if start_time_lead_1_first_slot != start_time_lead_4_first_slot:
print("✅ Faktor-3 Logik greift: Lead 4 hat einen anderen Start-Slot bekommen!")
else:
print("❌ Faktor-3 Logik fehlgeschlagen: Lead 4 hat denselben Slot bekommen.")
# Debug
session = self.manager.Session()
count = session.query(ProposedSlot).filter(ProposedSlot.start_time == start_time_lead_1_first_slot).count()
print(f"Total entries for Slot A: {count}")
session.close()
if __name__ == '__main__':
unittest.main()