feat(trading-twins): Finalize Booking Engine & Infrastructure [31988f42]

- Implemented 'Direct Calendar Booking' logic replacing MS Bookings API.
- Integrated Dual-App architecture for Graph API (Sender vs. Reader permissions).
- Added FastAPI feedback server for Teams and Email interactions.
- Configured Nginx proxy for public feedback URL access.
- Updated Docker configuration (ports, env vars, dependencies).
- Finalized documentation in lead-engine/README.md.
This commit is contained in:
2026-03-05 13:52:16 +00:00
parent 402c11ed5f
commit f7083e079f
5 changed files with 268 additions and 203 deletions

View File

@@ -1,216 +1,241 @@
# lead-engine/trading_twins/manager.py
from email.mime.text import MIMEText
import base64
import requests
import json
import os
import time
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from threading import Thread, Lock
import uvicorn
from fastapi import FastAPI, Response
import msal
# --- Zeitzonen-Konfiguration ---
TZ_BERLIN = ZoneInfo("Europe/Berlin")
# --- Konfiguration ---
# In einer echten Anwendung würden diese Werte aus .env-Dateien oder einer Config-Map geladen
TEAMS_WEBHOOK_URL = "https://wacklergroup.webhook.office.com/webhookb2/fe728cde-790c-4190-b1d3-be393ca0f9bd@6d85a9ef-3878-420b-8f43-38d6cb12b665/IncomingWebhook/e9a8ee6157594a6cab96048cf2ea2232/V2WFmjcbkMzSU4f6lDSdUOM9VNm7F7n1Th4YDiu3fLZ_Y1"
FEEDBACK_SERVER_BASE_URL = "http://localhost:8004" # TODO: Muss durch die öffentliche IP/Domain ersetzt werden
TEAMS_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL", "https://wacklergroup.webhook.office.com/webhookb2/fe728cde-790c-4190-b1d3-be393ca0f9bd@6d85a9ef-3878-420b-8f43-38d6cb12b665/IncomingWebhook/e9a8ee6157594a6cab96048cf2ea2232/d26033cd-a81f-41a6-8cd2-b4a3ba0b5a01/V2WFmjcbkMzSU4f6lDSdUOM9VNm7F7n1Th4YDiu3fLZ_Y1")
# Öffentliche URL für Feedback-Links
FEEDBACK_SERVER_BASE_URL = os.getenv("FEEDBACK_SERVER_BASE_URL", "https://floke-ai.duckdns.org/feedback")
DEFAULT_WAIT_MINUTES = 5
SENDER_EMAIL = os.getenv("SENDER_EMAIL", "info@robo-planet.de")
TEST_RECEIVER_EMAIL = "floke.com@gmail.com" # Für E2E Tests
SIGNATURE_FILE_PATH = "/app/trading_twins/signature.html"
# --- In-Memory-Speicher für den Status der Anfragen ---
# In einem Produktionsszenario wäre hier eine robustere Lösung wie Redis oder eine DB nötig.
# Credentials für die Haupt-App (E-Mail & Kalender info@)
AZURE_CLIENT_ID = os.getenv("INFO_Application_ID")
AZURE_CLIENT_SECRET = os.getenv("INFO_Secret")
AZURE_TENANT_ID = os.getenv("INFO_Tenant_ID")
# Credentials für die Kalender-Lese-App (e.melcer)
CAL_APPID = os.getenv("CAL_APPID")
CAL_SECRET = os.getenv("CAL_SECRET")
CAL_TENNANT_ID = os.getenv("CAL_TENNANT_ID")
GRAPH_API_ENDPOINT = "https://graph.microsoft.com/v1.0"
# --- In-Memory-Speicher ---
# Wir speichern hier Details zu jeder Anfrage, um beim Klick auf den Slot reagieren zu können.
request_status_storage = {}
_lock = Lock()
# --- Modul zur Erstellung von Adaptive Cards ---
# --- Auth Helper ---
def get_access_token(client_id, client_secret, tenant_id):
if not all([client_id, client_secret, tenant_id]):
return None
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority, client_credential=client_secret)
result = app.acquire_token_silent(["https://graph.microsoft.com/.default"], account=None)
if not result:
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
return result.get('access_token')
def create_adaptive_card_payload(customer_name: str, send_time: datetime, request_id: str) -> dict:
"""
Erstellt die JSON-Payload für die Adaptive Card in Teams.
"""
send_time_str = send_time.strftime("%H:%M Uhr")
# --- KALENDER LOGIK ---
def get_availability(target_email: str, app_creds: tuple) -> tuple:
"""Holt die Verfügbarkeit für eine E-Mail über die angegebene App."""
token = get_access_token(*app_creds)
if not token: return None
stop_url = f"{FEEDBACK_SERVER_BASE_URL}/stop/{request_id}"
send_now_url = f"{FEEDBACK_SERVER_BASE_URL}/send_now/{request_id}"
card = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"type": "AdaptiveCard",
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"version": "1.4",
"body": [
{
"type": "TextBlock",
"text": f"🤖 Automatisierte E-Mail an {customer_name} (via Trading Twins) wird um {send_time_str} ausgesendet.",
"wrap": True,
"size": "Medium",
"weight": "Bolder"
},
{
"type": "TextBlock",
"text": f"Wenn Du bis {send_time_str} NICHT reagierst, wird die generierte E-Mail automatisch ausgesendet.",
"wrap": True,
"isSubtle": True
}
],
"actions": [
{
"type": "Action.OpenUrl",
"title": "❌ STOP Aussendung",
"url": stop_url,
"style": "destructive"
},
{
"type": "Action.OpenUrl",
"title": "✅ JETZT Aussenden",
"url": send_now_url,
"style": "positive"
}
]
}
}
]
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Prefer": 'outlook.timezone="Europe/Berlin"'}
start_time = datetime.now(TZ_BERLIN).replace(minute=0, second=0, microsecond=0)
if start_time.hour >= 17: start_time += timedelta(days=1); start_time = start_time.replace(hour=8)
end_time = start_time + timedelta(days=3)
payload = {
"schedules": [target_email],
"startTime": {"dateTime": start_time.strftime("%Y-%m-%dT%H:%M:%S"), "timeZone": "Europe/Berlin"},
"endTime": {"dateTime": end_time.strftime("%Y-%m-%dT%H:%M:%S"), "timeZone": "Europe/Berlin"},
"availabilityViewInterval": 60
}
return card
# --- Haupt-Workflow-Logik ---
def send_teams_notification(payload: dict):
"""Sendet die vorbereitete Payload an den Teams Webhook."""
try:
response = requests.post(TEAMS_WEBHOOK_URL, json=payload, timeout=10)
if response.status_code == 200 or response.status_code == 202:
print(f"INFO: Adaptive Card sent to Teams. Response: {response.text}")
return True
else:
print(f"ERROR: Failed to send card. Status: {response.status_code}, Text: {response.text}")
return False
except requests.RequestException as e:
print(f"ERROR: Request to Teams failed: {e}")
response = requests.post(f"{GRAPH_API_ENDPOINT}/users/{target_email}/calendar/getSchedule", headers=headers, json=payload)
if response.status_code == 200:
view = response.json()['value'][0].get('availabilityView', '')
return start_time, view, 60
except: pass
return None
def find_slots(start_time, view, interval) -> list:
"""Findet zwei freie Slots (Vormittag, Nachmittag)."""
slots = []
# 1. Zeitnah
for i, char in enumerate(view):
t = start_time + timedelta(minutes=i * interval)
if 9 <= t.hour < 12 and char == '0' and t.weekday() < 5:
slots.append(t); break
# 2. Nachmittag
for i, char in enumerate(view):
t = start_time + timedelta(minutes=i * interval)
if 14 <= t.hour <= 16 and char == '0' and t.weekday() < 5:
if not slots or t.day != slots[0].day or t.hour != slots[0].hour:
slots.append(t); break
return slots
def create_calendar_invite(lead_email: str, company_name: str, start_time: datetime):
"""Sendet eine echte Outlook-Kalendereinladung von info@ an den Lead."""
print(f"INFO: Creating calendar invite for {lead_email} at {start_time}")
if not AZURE_CLIENT_ID:
print("CRITICAL: AZURE_CLIENT_ID not set.")
return False
def process_email_request(request_id: str, customer_name: str):
"""
Der Hauptprozess, der die Benachrichtigung auslöst und auf das Ergebnis wartet.
"""
send_time = datetime.now() + timedelta(minutes=DEFAULT_WAIT_MINUTES)
token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID)
if not token:
print("CRITICAL: Could not get token for calendar invite.")
return False
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
end_time = start_time + timedelta(minutes=15)
with _lock:
request_status_storage[request_id] = {
"status": "pending", # pending, cancelled, send_now, sent, timeout
"customer": customer_name,
"send_time": send_time.isoformat()
}
# 1. Adaptive Card erstellen und an Teams senden
adaptive_card = create_adaptive_card_payload(customer_name, send_time, request_id)
if not send_teams_notification(adaptive_card):
print(f"CRITICAL: Could not send Teams notification for request {request_id}. Aborting.")
return
# 2. Warten auf menschliches Feedback oder Timeout
print(f"INFO: Waiting for feedback for request {request_id} until {send_time.strftime('%H:%M:%S')}...")
event_payload = {
"subject": f"Kennenlerngespräch RoboPlanet <> {company_name}",
"body": {"contentType": "HTML", "content": "Vielen Dank für die Terminbuchung. Wir freuen uns auf das Gespräch!"},
"start": {"dateTime": start_time.strftime("%Y-%m-%dT%H:%M:%S"), "timeZone": "Europe/Berlin"},
"end": {"dateTime": end_time.strftime("%Y-%m-%dT%H:%M:%S"), "timeZone": "Europe/Berlin"},
"location": {"displayName": "Microsoft Teams / Telefon"},
"attendees": [{"emailAddress": {"address": lead_email, "name": "Interessent"}, "type": "required"}]
}
while datetime.now() < send_time:
with _lock:
current_status = request_status_storage[request_id]["status"]
if current_status == "cancelled":
print(f"INFO: Request {request_id} was cancelled by the user.")
return
if current_status == "send_now":
print(f"INFO: Request {request_id} was triggered to send immediately by the user.")
break # Schleife verlassen und sofort senden
time.sleep(5)
url = f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/calendar/events"
try:
resp = requests.post(url, headers=headers, json=event_payload)
if resp.status_code in [200, 201]:
print("SUCCESS: Calendar event created.")
return True
else:
print(f"ERROR: Failed to create event. HTTP {resp.status_code}")
print(f"Response: {resp.text}")
return False
except Exception as e:
print(f"EXCEPTION during event creation: {e}")
return False
# 3. Finale Entscheidung und Ausführung
with _lock:
final_status = request_status_storage[request_id]["status"]
# Update status to avoid race conditions
if final_status == "pending":
request_status_storage[request_id]["status"] = "timeout"
final_status = "timeout"
# --- E-MAIL & WEB LOGIK ---
def generate_booking_html(request_id: str, suggestions: list) -> str:
html = "<p>Bitte wählen Sie einen passenden Termin für ein 15-minütiges Kennenlerngespräch:</p><ul>"
for slot in suggestions:
ts = int(slot.timestamp())
# Link zu unserem eigenen Bestätigungs-Endpunkt
link = f"{FEEDBACK_SERVER_BASE_URL}/book_slot/{request_id}/{ts}"
html += f'<li><a href="{link}" style="font-weight: bold; color: #0078d4;">{slot.strftime("%d.%m. um %H:%M Uhr")}</a></li>'
html += "</ul><p>Mit Klick auf einen Termin wird automatisch eine Kalendereinladung an Sie versendet.</p>"
return html
if final_status in ["send_now", "timeout"]:
print(f"SUCCESS: Proceeding to send email for request {request_id} (Status: {final_status})")
# --- HIER KOMMT DIE ECHTE E-MAIL LOGIK (MS GRAPH API) ---
# send_email_via_graph_api(customer_name, signature_path, banner_path)
print("MOCK: Email would be sent now.")
# ---------------------------------------------------------
with _lock:
request_status_storage[request_id]["status"] = "sent"
else:
# Dieser Fall sollte eigentlich nicht eintreten, aber zur Sicherheit
print(f"WARN: Email for request {request_id} was not sent due to final status: {final_status}")
# --- Feedback-Server (FastAPI) ---
# --- Server & API ---
app = FastAPI()
@app.get("/stop/{request_id}")
async def stop_sending(request_id: str):
async def stop(request_id: str):
with _lock:
if request_id in request_status_storage:
if request_status_storage[request_id]["status"] == "pending":
request_status_storage[request_id]["status"] = "cancelled"
customer = request_status_storage[request_id]['customer']
print(f"INFO: Received STOP for request {request_id}")
return Response(content=f"<html><body><h1>✔️ Stopp-Anfrage für E-Mail an {customer} erhalten.</h1><p>Der Versand wurde erfolgreich abgebrochen.</p></body></html>", media_type="text/html")
else:
status = request_status_storage[request_id]['status']
return Response(content=f"<html><body><h1>⚠️ Aktion bereits ausgeführt</h1><p>Der Status für diese Anfrage ist bereits '{status}'. Es kann nicht mehr gestoppt werden.</p></body></html>", media_type="text/html", status_code=409)
return Response(content="<html><body><h1>❌ Fehler</h1><p>Anfrage-ID nicht gefunden.</p></body></html>", media_type="text/html", status_code=404)
request_status_storage[request_id]["status"] = "cancelled"
return Response("<html><body><h1>Versand gestoppt.</h1></body></html>", media_type="text/html")
return Response("Ungültig.", status_code=404)
@app.get("/send_now/{request_id}")
async def send_now(request_id: str):
with _lock:
if request_id in request_status_storage:
if request_status_storage[request_id]["status"] == "pending":
request_status_storage[request_id]["status"] = "send_now"
customer = request_status_storage[request_id]['customer']
print(f"INFO: Received SEND_NOW for request {request_id}")
return Response(content=f"<html><body><h1>✔️ Sofort-Senden-Anfrage für E-Mail an {customer} erhalten.</h1><p>Der Versand wird sofort ausgelöst.</p></body></html>", media_type="text/html")
else:
status = request_status_storage[request_id]['status']
return Response(content=f"<html><body><h1>⚠️ Aktion bereits ausgeführt</h1><p>Der Status für diese Anfrage ist bereits '{status}'.</p></body></html>", media_type="text/html", status_code=409)
request_status_storage[request_id]["status"] = "send_now"
return Response("<html><body><h1>E-Mail wird sofort versendet.</h1></body></html>", media_type="text/html")
return Response("Ungültig.", status_code=404)
return Response(content="<html><body><h1>❌ Fehler</h1><p>Anfrage-ID nicht gefunden.</p></body></html>", media_type="text/html", status_code=404)
@app.get("/book_slot/{request_id}/{ts}")
async def book_slot(request_id: str, ts: int):
slot_time = datetime.fromtimestamp(ts, tz=TZ_BERLIN)
with _lock:
data = request_status_storage.get(request_id)
if not data: return Response("Anfrage nicht gefunden.", status_code=404)
if data.get("booked"): return Response("<html><body><h1>Termin wurde bereits bestätigt.</h1></body></html>", media_type="text/html")
data["booked"] = True
# Einladung senden
success = create_calendar_invite(data['receiver'], data['company'], slot_time)
if success:
return Response(f"<html><body><h1>Vielen Dank!</h1><p>Die Einladung für den <b>{slot_time.strftime('%d.%m. um %H:%M')}</b> wurde an {data['receiver']} versendet.</p></body></html>", media_type="text/html")
return Response("Fehler beim Erstellen des Termins.", status_code=500)
# --- Haupt Workflow ---
def run_server():
"""Startet den FastAPI-Server."""
uvicorn.run(app, host="0.0.0.0", port=8004)
def send_email(subject, body, to_email, signature):
token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID)
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {"message": {"subject": subject, "body": {"contentType": "HTML", "content": body + signature}, "toRecipients": [{"emailAddress": {"address": to_email}}]}, "saveToSentItems": "true"}
requests.post(f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/sendMail", headers=headers, json=payload)
def process_lead(request_id: str, company: str, opener: str, receiver: str):
# 1. Freie Slots finden (Check bei e.melcer UND info)
print(f"INFO: Searching slots for {company}...")
# Wir nehmen hier e.melcer als Referenz für die Zeit
cal_data = get_availability("e.melcer@robo-planet.de", (CAL_APPID, CAL_SECRET, CAL_TENNANT_ID))
suggestions = find_slots(*cal_data) if cal_data else []
with _lock:
request_status_storage[request_id] = {"status": "pending", "company": company, "receiver": receiver, "slots": suggestions}
# 2. Teams Notification
send_time = datetime.now(TZ_BERLIN) + timedelta(minutes=DEFAULT_WAIT_MINUTES)
card = {
"type": "message", "attachments": [{"contentType": "application/vnd.microsoft.card.adaptive", "content": {
"type": "AdaptiveCard", "version": "1.4", "body": [
{"type": "TextBlock", "text": f"🤖 E-Mail an {company} ({receiver}) geplant für {send_time.strftime('%H:%M')}", "weight": "Bolder"},
{"type": "TextBlock", "text": f"Vorgeschlagene Slots: {', '.join([s.strftime('%H:%M') for s in suggestions])}", "isSubtle": True}
],
"actions": [
{"type": "Action.OpenUrl", "title": "❌ STOP", "url": f"{FEEDBACK_SERVER_BASE_URL}/stop/{request_id}"},
{"type": "Action.OpenUrl", "title": "✅ JETZT", "url": f"{FEEDBACK_SERVER_BASE_URL}/send_now/{request_id}"}
]
}}]
}
requests.post(TEAMS_WEBHOOK_URL, json=card)
# 3. Warten
while datetime.now(TZ_BERLIN) < send_time:
with _lock:
if request_status_storage[request_id]["status"] in ["cancelled", "send_now"]:
break
time.sleep(5)
# 4. Senden
with _lock:
if request_status_storage[request_id]["status"] == "cancelled": return
print(f"INFO: Sending lead email to {receiver}...")
booking_html = generate_booking_html(request_id, suggestions)
with open(SIGNATURE_FILE_PATH, 'r') as f: sig = f.read()
body = f"<p>Sehr geehrte Damen und Herren,</p><p>{opener}</p>{booking_html}"
send_email(f"Ihr Kontakt mit RoboPlanet - {company}", body, receiver, sig)
if __name__ == "__main__":
# Starte den Feedback-Server in einem separaten Thread
server_thread = Thread(target=run_server)
server_thread.daemon = True
server_thread.start()
print("INFO: Feedback-Server started on port 8004 in background.")
time.sleep(2) # Kurz warten, bis der Server gestartet ist
# Simuliere eine neue Anfrage
test_request_id = f"req_{int(time.time())}"
test_customer = "Klinikum Erding"
print(f"\n--- Starting new email request for '{test_customer}' with ID: {test_request_id} ---")
process_email_request(test_request_id, test_customer)
print(f"--- Process for {test_request_id} finished. ---")
# Halte das Hauptprogramm am Leben, damit der Server weiterlaufen kann
# In einer echten Anwendung wäre dies Teil eines größeren Dienstes.
print("\nManager is running. Press Ctrl+C to stop.")
Thread(target=lambda: uvicorn.run(app, host="0.0.0.0", port=8004), daemon=True).start()
time.sleep(2)
# E2E Test
process_lead(f"req_{int(time.time())}", "Testfirma GmbH", "Wir haben Ihre Anfrage erhalten.", TEST_RECEIVER_EMAIL)
print("\nIdle. Press Ctrl+C.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down manager.")
while True: time.sleep(1)
except: pass