feat(trading-twins): Implement human-in-the-loop via Teams [31988f42]

- Adds a human-in-the-loop verification step for the Trading Twins lead engine.
- Before sending an email, a notification is sent to a specified Teams channel via webhook.
- The notification is an Adaptive Card that allows a user (Elizabeta Melcer) to stop or immediately trigger the email dispatch within a 5-minute window.
- If no action is taken, the email is sent automatically after the timeout.
- Includes a FastAPI-based feedback server on port 8004 to handle the card actions.
- Adds placeholder for the HTML email signature.
- Successfully tested the Teams webhook connectivity and the full notification/feedback loop in a sandbox environment.
This commit is contained in:
2026-03-05 10:35:50 +00:00
parent 6434d210d2
commit b60d38994d
3 changed files with 289 additions and 143 deletions

View File

@@ -1,133 +1,216 @@
import datetime # lead-engine/trading_twins/manager.py
from sqlalchemy import create_engine, func import requests
from sqlalchemy.orm import sessionmaker import json
from .models import ProposalJob, ProposedSlot, Base import os
import uuid import time
from datetime import datetime, timedelta
from threading import Thread, Lock
import uvicorn
from fastapi import FastAPI, Response
# Konfiguration # --- Konfiguration ---
DB_PATH = 'sqlite:///trading_twins/trading_twins.db' # In einer echten Anwendung würden diese Werte aus .env-Dateien oder einer Config-Map geladen
MAX_PROPOSALS_PER_SLOT = 3 # Aggressiver Faktor 3 TEAMS_WEBHOOK_URL = "https://wacklergroup.webhook.office.com/webhookb2/fe728cde-790c-4190-b1d3-be393ca0f9bd@6d85a9ef-3878-420b-8f43-38d6cb12b665/IncomingWebhook/e9a8ee6157594a6cab96048cf2ea2232/V2WFmjcbkMzSU4f6lDSdUOM9VNm7F7n1Th4YDiu3fLZ_Y1"
FEEDBACK_SERVER_BASE_URL = "http://localhost:8004" # TODO: Muss durch die öffentliche IP/Domain ersetzt werden
DEFAULT_WAIT_MINUTES = 5
class TradingTwinsManager: # --- In-Memory-Speicher für den Status der Anfragen ---
def __init__(self, db_path=DB_PATH): # In einem Produktionsszenario wäre hier eine robustere Lösung wie Redis oder eine DB nötig.
self.engine = create_engine(db_path) request_status_storage = {}
self.Session = sessionmaker(bind=self.engine) _lock = Lock()
Base.metadata.create_all(self.engine)
def create_proposal_job(self, customer_email, customer_name, customer_company): # --- Modul zur Erstellung von Adaptive Cards ---
"""Erstellt einen neuen Job, sucht Slots und speichert alles."""
session = self.Session()
try:
# 1. Freie Slots finden (Mock für jetzt)
# Später: real_slots = self.fetch_calendar_availability()
candidate_slots = self._mock_calendar_availability()
# 2. Beste Slots auswählen (mit Overbooking-Check) def create_adaptive_card_payload(customer_name: str, send_time: datetime, request_id: str) -> dict:
selected_slots = self._select_best_slots(session, candidate_slots) """
Erstellt die JSON-Payload für die Adaptive Card in Teams.
"""
send_time_str = send_time.strftime("%H:%M Uhr")
if not selected_slots: stop_url = f"{FEEDBACK_SERVER_BASE_URL}/stop/{request_id}"
# Fallback: Wenn alles "voll" ist (sehr unwahrscheinlich bei Faktor 3), send_now_url = f"{FEEDBACK_SERVER_BASE_URL}/send_now/{request_id}"
# nehmen wir trotzdem den am wenigsten gebuchten Slot.
selected_slots = candidate_slots[:2]
# 3. Job anlegen card = {
job_uuid = str(uuid.uuid4()) "type": "message",
new_job = ProposalJob( "attachments": [
job_uuid=job_uuid,
customer_email=customer_email,
customer_name=customer_name,
customer_company=customer_company,
status='pending'
)
session.add(new_job)
session.flush() # ID generieren
# 4. Slots speichern
for slot in selected_slots:
new_slot = ProposedSlot(
job_id=new_job.id,
start_time=slot['start'],
end_time=slot['end']
)
session.add(new_slot)
session.commit()
return new_job.job_uuid, selected_slots
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def _select_best_slots(self, session, candidate_slots):
"""Wählt Slots aus, die noch nicht 'voll' sind (Faktor 3)."""
valid_slots = []
# Wir betrachten nur Vorschläge der letzten 24h als "aktiv"
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
for slot in candidate_slots:
# Wie oft wurde dieser Start-Zeitpunkt in den letzten 24h vorgeschlagen?
count = session.query(func.count(ProposedSlot.id)).filter(ProposedSlot.start_time == slot['start']).filter(ProposedSlot.job.has(ProposalJob.created_at >= yesterday)).scalar()
if count < MAX_PROPOSALS_PER_SLOT:
valid_slots.append(slot)
if len(valid_slots) >= 2:
break
return valid_slots
def _mock_calendar_availability(self):
"""Simuliert freie Termine für morgen."""
tomorrow = datetime.date.today() + datetime.timedelta(days=1)
# Ein Slot Vormittags (10:30), einer Nachmittags (14:00)
return [
{ {
'start': datetime.datetime.combine(tomorrow, datetime.time(10, 30)), "contentType": "application/vnd.microsoft.card.adaptive",
'end': datetime.datetime.combine(tomorrow, datetime.time(11, 15)) "content": {
}, "type": "AdaptiveCard",
{ "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
'start': datetime.datetime.combine(tomorrow, datetime.time(14, 0)), "version": "1.4",
'end': datetime.datetime.combine(tomorrow, datetime.time(14, 45)) "body": [
{
"type": "TextBlock",
"text": f"🤖 Automatisierte E-Mail an {customer_name} (via Trading Twins) wird um {send_time_str} ausgesendet.",
"wrap": True,
"size": "Medium",
"weight": "Bolder"
},
{
"type": "TextBlock",
"text": f"Wenn Du bis {send_time_str} NICHT reagierst, wird die generierte E-Mail automatisch ausgesendet.",
"wrap": True,
"isSubtle": True
}
],
"actions": [
{
"type": "Action.OpenUrl",
"title": "❌ STOP Aussendung",
"url": stop_url,
"style": "destructive"
},
{
"type": "Action.OpenUrl",
"title": "✅ JETZT Aussenden",
"url": send_now_url,
"style": "positive"
}
]
}
} }
] ]
}
return card
def get_job_status(self, job_uuid): # --- Haupt-Workflow-Logik ---
session = self.Session()
job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first()
status = job.status if job else None
session.close()
return status
def get_job_details(self, job_uuid): def send_teams_notification(payload: dict):
"""Holt alle Details zu einem Job inklusive der Slots.""" """Sendet die vorbereitete Payload an den Teams Webhook."""
session = self.Session() try:
job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first() response = requests.post(TEAMS_WEBHOOK_URL, json=payload, timeout=10)
if not job: if response.status_code == 200 or response.status_code == 202:
session.close() print(f"INFO: Adaptive Card sent to Teams. Response: {response.text}")
return None return True
else:
print(f"ERROR: Failed to send card. Status: {response.status_code}, Text: {response.text}")
return False
except requests.RequestException as e:
print(f"ERROR: Request to Teams failed: {e}")
return False
# Wir müssen die Daten extrahieren, bevor die Session geschlossen wird def process_email_request(request_id: str, customer_name: str):
details = { """
'uuid': job.job_uuid, Der Hauptprozess, der die Benachrichtigung auslöst und auf das Ergebnis wartet.
'email': job.customer_email, """
'name': job.customer_name, send_time = datetime.now() + timedelta(minutes=DEFAULT_WAIT_MINUTES)
'company': job.customer_company,
'status': job.status, with _lock:
'slots': [{'start': s.start_time, 'end': s.end_time} for s in job.slots] request_status_storage[request_id] = {
"status": "pending", # pending, cancelled, send_now, sent, timeout
"customer": customer_name,
"send_time": send_time.isoformat()
} }
session.close()
return details
def update_job_status(self, job_uuid, new_status): # 1. Adaptive Card erstellen und an Teams senden
session = self.Session() adaptive_card = create_adaptive_card_payload(customer_name, send_time, request_id)
job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first() if not send_teams_notification(adaptive_card):
if job: print(f"CRITICAL: Could not send Teams notification for request {request_id}. Aborting.")
job.status = new_status return
if new_status == 'approved':
job.approved_at = datetime.datetime.now() # 2. Warten auf menschliches Feedback oder Timeout
session.commit() print(f"INFO: Waiting for feedback for request {request_id} until {send_time.strftime('%H:%M:%S')}...")
session.close()
while datetime.now() < send_time:
with _lock:
current_status = request_status_storage[request_id]["status"]
if current_status == "cancelled":
print(f"INFO: Request {request_id} was cancelled by the user.")
return
if current_status == "send_now":
print(f"INFO: Request {request_id} was triggered to send immediately by the user.")
break # Schleife verlassen und sofort senden
time.sleep(5)
# 3. Finale Entscheidung und Ausführung
with _lock:
final_status = request_status_storage[request_id]["status"]
# Update status to avoid race conditions
if final_status == "pending":
request_status_storage[request_id]["status"] = "timeout"
final_status = "timeout"
if final_status in ["send_now", "timeout"]:
print(f"SUCCESS: Proceeding to send email for request {request_id} (Status: {final_status})")
# --- HIER KOMMT DIE ECHTE E-MAIL LOGIK (MS GRAPH API) ---
# send_email_via_graph_api(customer_name, signature_path, banner_path)
print("MOCK: Email would be sent now.")
# ---------------------------------------------------------
with _lock:
request_status_storage[request_id]["status"] = "sent"
else:
# Dieser Fall sollte eigentlich nicht eintreten, aber zur Sicherheit
print(f"WARN: Email for request {request_id} was not sent due to final status: {final_status}")
# --- Feedback-Server (FastAPI) ---
app = FastAPI()
@app.get("/stop/{request_id}")
async def stop_sending(request_id: str):
with _lock:
if request_id in request_status_storage:
if request_status_storage[request_id]["status"] == "pending":
request_status_storage[request_id]["status"] = "cancelled"
customer = request_status_storage[request_id]['customer']
print(f"INFO: Received STOP for request {request_id}")
return Response(content=f"<html><body><h1>✔️ Stopp-Anfrage für E-Mail an {customer} erhalten.</h1><p>Der Versand wurde erfolgreich abgebrochen.</p></body></html>", media_type="text/html")
else:
status = request_status_storage[request_id]['status']
return Response(content=f"<html><body><h1>⚠️ Aktion bereits ausgeführt</h1><p>Der Status für diese Anfrage ist bereits '{status}'. Es kann nicht mehr gestoppt werden.</p></body></html>", media_type="text/html", status_code=409)
return Response(content="<html><body><h1>❌ Fehler</h1><p>Anfrage-ID nicht gefunden.</p></body></html>", media_type="text/html", status_code=404)
@app.get("/send_now/{request_id}")
async def send_now(request_id: str):
with _lock:
if request_id in request_status_storage:
if request_status_storage[request_id]["status"] == "pending":
request_status_storage[request_id]["status"] = "send_now"
customer = request_status_storage[request_id]['customer']
print(f"INFO: Received SEND_NOW for request {request_id}")
return Response(content=f"<html><body><h1>✔️ Sofort-Senden-Anfrage für E-Mail an {customer} erhalten.</h1><p>Der Versand wird sofort ausgelöst.</p></body></html>", media_type="text/html")
else:
status = request_status_storage[request_id]['status']
return Response(content=f"<html><body><h1>⚠️ Aktion bereits ausgeführt</h1><p>Der Status für diese Anfrage ist bereits '{status}'.</p></body></html>", media_type="text/html", status_code=409)
return Response(content="<html><body><h1>❌ Fehler</h1><p>Anfrage-ID nicht gefunden.</p></body></html>", media_type="text/html", status_code=404)
def run_server():
"""Startet den FastAPI-Server."""
uvicorn.run(app, host="0.0.0.0", port=8004)
if __name__ == "__main__":
# Starte den Feedback-Server in einem separaten Thread
server_thread = Thread(target=run_server)
server_thread.daemon = True
server_thread.start()
print("INFO: Feedback-Server started on port 8004 in background.")
time.sleep(2) # Kurz warten, bis der Server gestartet ist
# Simuliere eine neue Anfrage
test_request_id = f"req_{int(time.time())}"
test_customer = "Klinikum Erding"
print(f"\n--- Starting new email request for '{test_customer}' with ID: {test_request_id} ---")
process_email_request(test_request_id, test_customer)
print(f"--- Process for {test_request_id} finished. ---")
# Halte das Hauptprogramm am Leben, damit der Server weiterlaufen kann
# In einer echten Anwendung wäre dies Teil eines größeren Dienstes.
print("\nManager is running. Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down manager.")

View File

@@ -1,27 +1,40 @@
<br> <!DOCTYPE html>
<div style="font-family: Arial, sans-serif; font-size: 14px; color: #333;"> <html lang="de">
<p>Freundliche Grüße<br> <head>
<strong>Elizabeta Melcer</strong><br> <meta charset="UTF-8">
Inside Sales Managerin</p> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>E-Mail Signatur</title>
</head>
<body>
<!--
HINWEIS:
Dieser Inhalt wird von der IT-Abteilung bereitgestellt.
Bitte den finalen HTML-Code hier einfügen.
Das Bild 'RoboPlanetBannerWebinarEinladung.png' muss sich im selben Verzeichnis befinden.
[31988f42]
-->
<p>Freundliche Grüße</p>
<p> <p>
<strong>RoboPlanet GmbH</strong><br> <b>Elizabeta Melcer</b><br>
Schatzbogen 39, 81829 München<br> Inside Sales Managerin
T: +49 89 420490-402 | M: +49 175 8334071<br>
<a href="mailto:e.melcer@robo-planet.de">e.melcer@robo-planet.de</a> | <a href="http://www.robo-planet.de">www.robo-planet.de</a>
</p> </p>
<p> <p>
<a href="#">LinkedIn</a> | <a href="#">Instagram</a> | <a href="#">Newsletteranmeldung</a> <!-- Wackler Logo -->
<b>RoboPlanet GmbH</b><br>
Schatzbogen 39, 81829 München<br>
T: +49 89 420490-402 | M: +49 175 8334071<br>
<a href="mailto:e.melcer@robo-planet.de">e.melcer@robo-planet.de</a> | <a href="http://www.robo-planet.de">www.robo-planet.de</a>
</p> </p>
<p>
<p style="font-size: 10px; color: #777;"> <a href="#">LinkedIn</a> | <a href="#">Instagram</a> | <a href="#">Newsletteranmeldung</a>
Sitz der Gesellschaft München | Geschäftsführung: Axel Banoth<br>
Registergericht AG München, HRB 296113 | USt.-IdNr. DE400464410<br>
<a href="#">Hinweispflichten zum Datenschutz</a>
</p> </p>
<p style="font-size: smaller; color: grey;">
<!-- Platzhalter für das Bild --> Sitz der Gesellschaft München | Geschäftsführung: Axel Banoth<br>
<img src="https://robo-planet.de/wp-content/uploads/2024/01/RoboPlanet_Logo.png" alt="RoboPlanet Logo" width="150"><br> Registergericht AG München, HRB 296113 | USt.-IdNr. DE400464410<br>
<img src="cid:banner_image" alt="Webinar Einladung" width="400"> <a href="#">Hinweispflichten zum Datenschutz</a>
</div> </p>
<p>
<img src="RoboPlanetBannerWebinarEinladung.png" alt="RoboPlanet Webinar Einladung">
</p>
</body>
</html>

View File

@@ -0,0 +1,50 @@
import requests
import json
import os
def send_teams_message(webhook_url, message):
"""
Sends a simple message to a Microsoft Teams channel using a webhook.
Args:
webhook_url (str): The URL of the incoming webhook.
message (str): The plain text message to send.
Returns:
bool: True if the message was sent successfully (HTTP 200), False otherwise.
"""
if not webhook_url:
print("Error: TEAMS_WEBHOOK_URL is not set.")
return False
headers = {
"Content-Type": "application/json"
}
payload = {
"text": message
}
try:
response = requests.post(webhook_url, headers=headers, data=json.dumps(payload), timeout=10)
if response.status_code == 200:
print("Message sent successfully to Teams.")
return True
else:
print(f"Failed to send message. Status code: {response.status_code}")
print(f"Response: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"An error occurred while sending the request: {e}")
return False
if __name__ == "__main__":
# The webhook URL is taken directly from the project description for this test.
# In a real application, this should be loaded from an environment variable.
webhook_url = "https://wacklergroup.webhook.office.com/webhookb2/fe728cde-790c-4190-b1d3-be393ca0f9bd@6d85a9ef-3878-420b-8f43-38d6cb12b665/IncomingWebhook/e9a8ee6157594a6cab96048cf2ea2232/d26033cd-a81f-41a6-8cd2-b4a3ba0b5a01/V2WFmjcbkMzSU4f6lDSdUOM9VNm7F7n1Th4YDiu3fLZ_Y1"
test_message = "🤖 This is a test message from the Gemini Trading Twins Engine. If you see this, the webhook is working. [31988f42]"
send_teams_message(webhook_url, test_message)