feat(trading-twins): Implement human-in-the-loop via Teams [31988f42]

- Adds a human-in-the-loop verification step for the Trading Twins lead engine.
- Before sending an email, a notification is sent to a specified Teams channel via webhook.
- The notification is an Adaptive Card that allows a user (Elizabeta Melcer) to stop or immediately trigger the email dispatch within a 5-minute window.
- If no action is taken, the email is sent automatically after the timeout.
- Includes a FastAPI-based feedback server on port 8004 to handle the card actions.
- Adds placeholder for the HTML email signature.
- Successfully tested the Teams webhook connectivity and the full notification/feedback loop in a sandbox environment.
This commit is contained in:
2026-03-05 10:35:50 +00:00
parent 97e86af056
commit 402c11ed5f
3 changed files with 289 additions and 143 deletions

View File

@@ -1,133 +1,216 @@
import datetime # lead-engine/trading_twins/manager.py
from sqlalchemy import create_engine, func import requests
from sqlalchemy.orm import sessionmaker import json
from .models import ProposalJob, ProposedSlot, Base import os
import uuid import time
from datetime import datetime, timedelta
from threading import Thread, Lock
import uvicorn
from fastapi import FastAPI, Response
# Konfiguration # --- Konfiguration ---
DB_PATH = 'sqlite:///trading_twins/trading_twins.db' # In einer echten Anwendung würden diese Werte aus .env-Dateien oder einer Config-Map geladen
MAX_PROPOSALS_PER_SLOT = 3 # Aggressiver Faktor 3 TEAMS_WEBHOOK_URL = "https://wacklergroup.webhook.office.com/webhookb2/fe728cde-790c-4190-b1d3-be393ca0f9bd@6d85a9ef-3878-420b-8f43-38d6cb12b665/IncomingWebhook/e9a8ee6157594a6cab96048cf2ea2232/V2WFmjcbkMzSU4f6lDSdUOM9VNm7F7n1Th4YDiu3fLZ_Y1"
FEEDBACK_SERVER_BASE_URL = "http://localhost:8004" # TODO: Muss durch die öffentliche IP/Domain ersetzt werden
DEFAULT_WAIT_MINUTES = 5
class TradingTwinsManager: # --- In-Memory-Speicher für den Status der Anfragen ---
def __init__(self, db_path=DB_PATH): # In einem Produktionsszenario wäre hier eine robustere Lösung wie Redis oder eine DB nötig.
self.engine = create_engine(db_path) request_status_storage = {}
self.Session = sessionmaker(bind=self.engine) _lock = Lock()
Base.metadata.create_all(self.engine)
def create_proposal_job(self, customer_email, customer_name, customer_company): # --- Modul zur Erstellung von Adaptive Cards ---
"""Erstellt einen neuen Job, sucht Slots und speichert alles."""
session = self.Session()
try:
# 1. Freie Slots finden (Mock für jetzt)
# Später: real_slots = self.fetch_calendar_availability()
candidate_slots = self._mock_calendar_availability()
# 2. Beste Slots auswählen (mit Overbooking-Check) def create_adaptive_card_payload(customer_name: str, send_time: datetime, request_id: str) -> dict:
selected_slots = self._select_best_slots(session, candidate_slots) """
Erstellt die JSON-Payload für die Adaptive Card in Teams.
"""
send_time_str = send_time.strftime("%H:%M Uhr")
if not selected_slots: stop_url = f"{FEEDBACK_SERVER_BASE_URL}/stop/{request_id}"
# Fallback: Wenn alles "voll" ist (sehr unwahrscheinlich bei Faktor 3), send_now_url = f"{FEEDBACK_SERVER_BASE_URL}/send_now/{request_id}"
# nehmen wir trotzdem den am wenigsten gebuchten Slot.
selected_slots = candidate_slots[:2]
# 3. Job anlegen card = {
job_uuid = str(uuid.uuid4()) "type": "message",
new_job = ProposalJob( "attachments": [
job_uuid=job_uuid,
customer_email=customer_email,
customer_name=customer_name,
customer_company=customer_company,
status='pending'
)
session.add(new_job)
session.flush() # ID generieren
# 4. Slots speichern
for slot in selected_slots:
new_slot = ProposedSlot(
job_id=new_job.id,
start_time=slot['start'],
end_time=slot['end']
)
session.add(new_slot)
session.commit()
return new_job.job_uuid, selected_slots
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def _select_best_slots(self, session, candidate_slots):
"""Wählt Slots aus, die noch nicht 'voll' sind (Faktor 3)."""
valid_slots = []
# Wir betrachten nur Vorschläge der letzten 24h als "aktiv"
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
for slot in candidate_slots:
# Wie oft wurde dieser Start-Zeitpunkt in den letzten 24h vorgeschlagen?
count = session.query(func.count(ProposedSlot.id)).filter(ProposedSlot.start_time == slot['start']).filter(ProposedSlot.job.has(ProposalJob.created_at >= yesterday)).scalar()
if count < MAX_PROPOSALS_PER_SLOT:
valid_slots.append(slot)
if len(valid_slots) >= 2:
break
return valid_slots
def _mock_calendar_availability(self):
"""Simuliert freie Termine für morgen."""
tomorrow = datetime.date.today() + datetime.timedelta(days=1)
# Ein Slot Vormittags (10:30), einer Nachmittags (14:00)
return [
{ {
'start': datetime.datetime.combine(tomorrow, datetime.time(10, 30)), "contentType": "application/vnd.microsoft.card.adaptive",
'end': datetime.datetime.combine(tomorrow, datetime.time(11, 15)) "content": {
"type": "AdaptiveCard",
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"version": "1.4",
"body": [
{
"type": "TextBlock",
"text": f"🤖 Automatisierte E-Mail an {customer_name} (via Trading Twins) wird um {send_time_str} ausgesendet.",
"wrap": True,
"size": "Medium",
"weight": "Bolder"
}, },
{ {
'start': datetime.datetime.combine(tomorrow, datetime.time(14, 0)), "type": "TextBlock",
'end': datetime.datetime.combine(tomorrow, datetime.time(14, 45)) "text": f"Wenn Du bis {send_time_str} NICHT reagierst, wird die generierte E-Mail automatisch ausgesendet.",
"wrap": True,
"isSubtle": True
}
],
"actions": [
{
"type": "Action.OpenUrl",
"title": "❌ STOP Aussendung",
"url": stop_url,
"style": "destructive"
},
{
"type": "Action.OpenUrl",
"title": "✅ JETZT Aussenden",
"url": send_now_url,
"style": "positive"
} }
] ]
def get_job_status(self, job_uuid):
session = self.Session()
job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first()
status = job.status if job else None
session.close()
return status
def get_job_details(self, job_uuid):
"""Holt alle Details zu einem Job inklusive der Slots."""
session = self.Session()
job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first()
if not job:
session.close()
return None
# Wir müssen die Daten extrahieren, bevor die Session geschlossen wird
details = {
'uuid': job.job_uuid,
'email': job.customer_email,
'name': job.customer_name,
'company': job.customer_company,
'status': job.status,
'slots': [{'start': s.start_time, 'end': s.end_time} for s in job.slots]
} }
session.close() }
return details ]
}
return card
def update_job_status(self, job_uuid, new_status): # --- Haupt-Workflow-Logik ---
session = self.Session()
job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first() def send_teams_notification(payload: dict):
if job: """Sendet die vorbereitete Payload an den Teams Webhook."""
job.status = new_status try:
if new_status == 'approved': response = requests.post(TEAMS_WEBHOOK_URL, json=payload, timeout=10)
job.approved_at = datetime.datetime.now() if response.status_code == 200 or response.status_code == 202:
session.commit() print(f"INFO: Adaptive Card sent to Teams. Response: {response.text}")
session.close() return True
else:
print(f"ERROR: Failed to send card. Status: {response.status_code}, Text: {response.text}")
return False
except requests.RequestException as e:
print(f"ERROR: Request to Teams failed: {e}")
return False
def process_email_request(request_id: str, customer_name: str):
"""
Der Hauptprozess, der die Benachrichtigung auslöst und auf das Ergebnis wartet.
"""
send_time = datetime.now() + timedelta(minutes=DEFAULT_WAIT_MINUTES)
with _lock:
request_status_storage[request_id] = {
"status": "pending", # pending, cancelled, send_now, sent, timeout
"customer": customer_name,
"send_time": send_time.isoformat()
}
# 1. Adaptive Card erstellen und an Teams senden
adaptive_card = create_adaptive_card_payload(customer_name, send_time, request_id)
if not send_teams_notification(adaptive_card):
print(f"CRITICAL: Could not send Teams notification for request {request_id}. Aborting.")
return
# 2. Warten auf menschliches Feedback oder Timeout
print(f"INFO: Waiting for feedback for request {request_id} until {send_time.strftime('%H:%M:%S')}...")
while datetime.now() < send_time:
with _lock:
current_status = request_status_storage[request_id]["status"]
if current_status == "cancelled":
print(f"INFO: Request {request_id} was cancelled by the user.")
return
if current_status == "send_now":
print(f"INFO: Request {request_id} was triggered to send immediately by the user.")
break # Schleife verlassen und sofort senden
time.sleep(5)
# 3. Finale Entscheidung und Ausführung
with _lock:
final_status = request_status_storage[request_id]["status"]
# Update status to avoid race conditions
if final_status == "pending":
request_status_storage[request_id]["status"] = "timeout"
final_status = "timeout"
if final_status in ["send_now", "timeout"]:
print(f"SUCCESS: Proceeding to send email for request {request_id} (Status: {final_status})")
# --- HIER KOMMT DIE ECHTE E-MAIL LOGIK (MS GRAPH API) ---
# send_email_via_graph_api(customer_name, signature_path, banner_path)
print("MOCK: Email would be sent now.")
# ---------------------------------------------------------
with _lock:
request_status_storage[request_id]["status"] = "sent"
else:
# Dieser Fall sollte eigentlich nicht eintreten, aber zur Sicherheit
print(f"WARN: Email for request {request_id} was not sent due to final status: {final_status}")
# --- Feedback-Server (FastAPI) ---
app = FastAPI()
@app.get("/stop/{request_id}")
async def stop_sending(request_id: str):
with _lock:
if request_id in request_status_storage:
if request_status_storage[request_id]["status"] == "pending":
request_status_storage[request_id]["status"] = "cancelled"
customer = request_status_storage[request_id]['customer']
print(f"INFO: Received STOP for request {request_id}")
return Response(content=f"<html><body><h1>✔️ Stopp-Anfrage für E-Mail an {customer} erhalten.</h1><p>Der Versand wurde erfolgreich abgebrochen.</p></body></html>", media_type="text/html")
else:
status = request_status_storage[request_id]['status']
return Response(content=f"<html><body><h1>⚠️ Aktion bereits ausgeführt</h1><p>Der Status für diese Anfrage ist bereits '{status}'. Es kann nicht mehr gestoppt werden.</p></body></html>", media_type="text/html", status_code=409)
return Response(content="<html><body><h1>❌ Fehler</h1><p>Anfrage-ID nicht gefunden.</p></body></html>", media_type="text/html", status_code=404)
@app.get("/send_now/{request_id}")
async def send_now(request_id: str):
with _lock:
if request_id in request_status_storage:
if request_status_storage[request_id]["status"] == "pending":
request_status_storage[request_id]["status"] = "send_now"
customer = request_status_storage[request_id]['customer']
print(f"INFO: Received SEND_NOW for request {request_id}")
return Response(content=f"<html><body><h1>✔️ Sofort-Senden-Anfrage für E-Mail an {customer} erhalten.</h1><p>Der Versand wird sofort ausgelöst.</p></body></html>", media_type="text/html")
else:
status = request_status_storage[request_id]['status']
return Response(content=f"<html><body><h1>⚠️ Aktion bereits ausgeführt</h1><p>Der Status für diese Anfrage ist bereits '{status}'.</p></body></html>", media_type="text/html", status_code=409)
return Response(content="<html><body><h1>❌ Fehler</h1><p>Anfrage-ID nicht gefunden.</p></body></html>", media_type="text/html", status_code=404)
def run_server():
"""Startet den FastAPI-Server."""
uvicorn.run(app, host="0.0.0.0", port=8004)
if __name__ == "__main__":
# Starte den Feedback-Server in einem separaten Thread
server_thread = Thread(target=run_server)
server_thread.daemon = True
server_thread.start()
print("INFO: Feedback-Server started on port 8004 in background.")
time.sleep(2) # Kurz warten, bis der Server gestartet ist
# Simuliere eine neue Anfrage
test_request_id = f"req_{int(time.time())}"
test_customer = "Klinikum Erding"
print(f"\n--- Starting new email request for '{test_customer}' with ID: {test_request_id} ---")
process_email_request(test_request_id, test_customer)
print(f"--- Process for {test_request_id} finished. ---")
# Halte das Hauptprogramm am Leben, damit der Server weiterlaufen kann
# In einer echten Anwendung wäre dies Teil eines größeren Dienstes.
print("\nManager is running. Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down manager.")

View File

@@ -1,27 +1,40 @@
<br> <!DOCTYPE html>
<div style="font-family: Arial, sans-serif; font-size: 14px; color: #333;"> <html lang="de">
<p>Freundliche Grüße<br> <head>
<strong>Elizabeta Melcer</strong><br> <meta charset="UTF-8">
Inside Sales Managerin</p> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>E-Mail Signatur</title>
</head>
<body>
<!--
HINWEIS:
Dieser Inhalt wird von der IT-Abteilung bereitgestellt.
Bitte den finalen HTML-Code hier einfügen.
Das Bild 'RoboPlanetBannerWebinarEinladung.png' muss sich im selben Verzeichnis befinden.
[31988f42]
-->
<p>Freundliche Grüße</p>
<p> <p>
<strong>RoboPlanet GmbH</strong><br> <b>Elizabeta Melcer</b><br>
Inside Sales Managerin
</p>
<p>
<!-- Wackler Logo -->
<b>RoboPlanet GmbH</b><br>
Schatzbogen 39, 81829 München<br> Schatzbogen 39, 81829 München<br>
T: +49 89 420490-402 | M: +49 175 8334071<br> T: +49 89 420490-402 | M: +49 175 8334071<br>
<a href="mailto:e.melcer@robo-planet.de">e.melcer@robo-planet.de</a> | <a href="http://www.robo-planet.de">www.robo-planet.de</a> <a href="mailto:e.melcer@robo-planet.de">e.melcer@robo-planet.de</a> | <a href="http://www.robo-planet.de">www.robo-planet.de</a>
</p> </p>
<p> <p>
<a href="#">LinkedIn</a> | <a href="#">Instagram</a> | <a href="#">Newsletteranmeldung</a> <a href="#">LinkedIn</a> | <a href="#">Instagram</a> | <a href="#">Newsletteranmeldung</a>
</p> </p>
<p style="font-size: smaller; color: grey;">
<p style="font-size: 10px; color: #777;">
Sitz der Gesellschaft München | Geschäftsführung: Axel Banoth<br> Sitz der Gesellschaft München | Geschäftsführung: Axel Banoth<br>
Registergericht AG München, HRB 296113 | USt.-IdNr. DE400464410<br> Registergericht AG München, HRB 296113 | USt.-IdNr. DE400464410<br>
<a href="#">Hinweispflichten zum Datenschutz</a> <a href="#">Hinweispflichten zum Datenschutz</a>
</p> </p>
<p>
<!-- Platzhalter für das Bild --> <img src="RoboPlanetBannerWebinarEinladung.png" alt="RoboPlanet Webinar Einladung">
<img src="https://robo-planet.de/wp-content/uploads/2024/01/RoboPlanet_Logo.png" alt="RoboPlanet Logo" width="150"><br> </p>
<img src="cid:banner_image" alt="Webinar Einladung" width="400"> </body>
</div> </html>

View File

@@ -0,0 +1,50 @@
import requests
import json
import os
def send_teams_message(webhook_url, message):
"""
Sends a simple message to a Microsoft Teams channel using a webhook.
Args:
webhook_url (str): The URL of the incoming webhook.
message (str): The plain text message to send.
Returns:
bool: True if the message was sent successfully (HTTP 200), False otherwise.
"""
if not webhook_url:
print("Error: TEAMS_WEBHOOK_URL is not set.")
return False
headers = {
"Content-Type": "application/json"
}
payload = {
"text": message
}
try:
response = requests.post(webhook_url, headers=headers, data=json.dumps(payload), timeout=10)
if response.status_code == 200:
print("Message sent successfully to Teams.")
return True
else:
print(f"Failed to send message. Status code: {response.status_code}")
print(f"Response: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"An error occurred while sending the request: {e}")
return False
if __name__ == "__main__":
# The webhook URL is taken directly from the project description for this test.
# In a real application, this should be loaded from an environment variable.
webhook_url = "https://wacklergroup.webhook.office.com/webhookb2/fe728cde-790c-4190-b1d3-be393ca0f9bd@6d85a9ef-3878-420b-8f43-38d6cb12b665/IncomingWebhook/e9a8ee6157594a6cab96048cf2ea2232/d26033cd-a81f-41a6-8cd2-b4a3ba0b5a01/V2WFmjcbkMzSU4f6lDSdUOM9VNm7F7n1Th4YDiu3fLZ_Y1"
test_message = "🤖 This is a test message from the Gemini Trading Twins Engine. If you see this, the webhook is working. [31988f42]"
send_teams_message(webhook_url, test_message)