# lead-engine/trading_twins/manager.py from email.mime.text import MIMEText import base64 import requests import json import os import time from datetime import datetime, timedelta from zoneinfo import ZoneInfo from threading import Thread, Lock import uvicorn from fastapi import FastAPI, Response import msal # --- Zeitzonen-Konfiguration --- TZ_BERLIN = ZoneInfo("Europe/Berlin") # --- Konfiguration --- TEAMS_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL", "https://wacklergroup.webhook.office.com/webhookb2/fe728cde-790c-4190-b1d3-be393ca0f9bd@6d85a9ef-3878-420b-8f43-38d6cb12b665/IncomingWebhook/e9a8ee6157594a6cab96048cf2ea2232/d26033cd-a81f-41a6-8cd2-b4a3ba0b5a01/V2WFmjcbkMzSU4f6lDSdUOM9VNm7F7n1Th4YDiu3fLZ_Y1") # Öffentliche URL für Feedback-Links FEEDBACK_SERVER_BASE_URL = os.getenv("FEEDBACK_SERVER_BASE_URL", "https://floke-ai.duckdns.org/feedback") DEFAULT_WAIT_MINUTES = 5 SENDER_EMAIL = os.getenv("SENDER_EMAIL", "info@robo-planet.de") TEST_RECEIVER_EMAIL = "floke.com@gmail.com" # Für E2E Tests SIGNATURE_FILE_PATH = "/app/trading_twins/signature.html" # Credentials für die Haupt-App (E-Mail & Kalender info@) AZURE_CLIENT_ID = os.getenv("INFO_Application_ID") AZURE_CLIENT_SECRET = os.getenv("INFO_Secret") AZURE_TENANT_ID = os.getenv("INFO_Tenant_ID") # Credentials für die Kalender-Lese-App (e.melcer) CAL_APPID = os.getenv("CAL_APPID") CAL_SECRET = os.getenv("CAL_SECRET") CAL_TENNANT_ID = os.getenv("CAL_TENNANT_ID") GRAPH_API_ENDPOINT = "https://graph.microsoft.com/v1.0" # --- In-Memory-Speicher --- # Wir speichern hier Details zu jeder Anfrage, um beim Klick auf den Slot reagieren zu können. request_status_storage = {} _lock = Lock() # --- Auth Helper --- def get_access_token(client_id, client_secret, tenant_id): if not all([client_id, client_secret, tenant_id]): return None authority = f"https://login.microsoftonline.com/{tenant_id}" app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority, client_credential=client_secret) result = app.acquire_token_silent(["https://graph.microsoft.com/.default"], account=None) if not result: result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) return result.get('access_token') # --- KALENDER LOGIK --- def get_availability(target_email: str, app_creds: tuple) -> tuple: """Holt die Verfügbarkeit für eine E-Mail über die angegebene App.""" token = get_access_token(*app_creds) if not token: return None headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Prefer": 'outlook.timezone="Europe/Berlin"'} # Basis: Heute 00:00 Uhr start_time = datetime.now(TZ_BERLIN).replace(hour=0, minute=0, second=0, microsecond=0) end_time = start_time + timedelta(days=3) # 3 Tage Vorschau payload = { "schedules": [target_email], "startTime": {"dateTime": start_time.strftime("%Y-%m-%dT%H:%M:%S"), "timeZone": "Europe/Berlin"}, "endTime": {"dateTime": end_time.strftime("%Y-%m-%dT%H:%M:%S"), "timeZone": "Europe/Berlin"}, "availabilityViewInterval": 60 } try: response = requests.post(f"{GRAPH_API_ENDPOINT}/users/{target_email}/calendar/getSchedule", headers=headers, json=payload) if response.status_code == 200: view = response.json()['value'][0].get('availabilityView', '') # start_time ist wichtig für die Berechnung in find_slots return start_time, view, 60 except: pass return None def round_to_next_quarter_hour(dt: datetime) -> datetime: """Rundet eine Zeit auf die nächste volle Viertelstunde auf.""" minutes = (dt.minute // 15 + 1) * 15 rounded = dt.replace(minute=0, second=0, microsecond=0) + timedelta(minutes=minutes) return rounded def find_slots(start_time_base: datetime, view: str, interval: int) -> list: """ Findet zwei intelligente Slots basierend auf der Verfügbarkeit. start_time_base: Der Beginn der availabilityView (meist 00:00 Uhr heute) """ suggestions = [] now = datetime.now(TZ_BERLIN) # Frühestmöglicher Termin: Jetzt + 15 Min Puffer, gerundet auf Viertelstunde earliest_possible = round_to_next_quarter_hour(now + timedelta(minutes=15)) def is_slot_free(dt: datetime): """Prüft, ob der 60-Minuten-Block, der diesen Zeitpunkt enthält, frei ist.""" # Index in der View berechnen offset = dt - start_time_base hours_offset = int(offset.total_seconds() // 3600) if 0 <= hours_offset < len(view): return view[hours_offset] == '0' # '0' bedeutet Free return False # 1. Slot 1: Nächstmöglicher freier Termin current_search = earliest_possible while len(suggestions) < 1 and (current_search - now).days < 3: # Nur Werktags (Mo-Fr), zwischen 09:00 und 17:00 if current_search.weekday() < 5 and 9 <= current_search.hour < 17: if is_slot_free(current_search): suggestions.append(current_search) break # Weiterspringen current_search += timedelta(minutes=15) # Wenn wir 17 Uhr erreichen, springe zum nächsten Tag 09:00 if current_search.hour >= 17: current_search += timedelta(days=1) current_search = current_search.replace(hour=9, minute=0) if not suggestions: return [] first_slot = suggestions[0] # 2. Slot 2: Alternative (Nachmittag oder Folgetag) # Ziel: 2-3 Stunden später target_slot_2 = first_slot + timedelta(hours=2.5) target_slot_2 = round_to_next_quarter_hour(target_slot_2) # Suchstart für Slot 2 current_search = target_slot_2 while len(suggestions) < 2 and (current_search - now).days < 4: # Kriterien für Slot 2: # - Muss frei sein # - Muss Werktag sein # - Bevorzugt Nachmittag (13:00 - 16:30), außer wir sind schon am Folgetag, dann ab 9:00 is_working_hours = 9 <= current_search.hour < 17 is_afternoon = 13 <= current_search.hour < 17 is_next_day = current_search.date() > first_slot.date() # Wir nehmen den Slot, wenn: # a) Er am selben Tag nachmittags ist # b) ODER er am nächsten Tag zu einer vernünftigen Zeit ist (falls wir heute zu spät sind) valid_time = (current_search.date() == first_slot.date() and is_afternoon) or (is_next_day and is_working_hours) if current_search.weekday() < 5 and valid_time: if is_slot_free(current_search): suggestions.append(current_search) break current_search += timedelta(minutes=15) if current_search.hour >= 17: current_search += timedelta(days=1) current_search = current_search.replace(hour=9, minute=0) return suggestions def create_calendar_invite(lead_email: str, company_name: str, start_time: datetime): """Sendet eine echte Outlook-Kalendereinladung aus dem info@-Kalender.""" # Wir erstellen den Termin bei info@ (SENDER_EMAIL), da wir dort Schreibrechte haben sollten. target_organizer = SENDER_EMAIL print(f"INFO: Creating calendar invite for {lead_email} in {target_organizer}'s calendar") token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID) if not token: return False headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} end_time = start_time + timedelta(minutes=15) event_payload = { "subject": f"Kennenlerngespräch RoboPlanet <> {company_name}", "body": {"contentType": "HTML", "content": f"Hallo,

vielen Dank für die Terminbuchung über unsere Lead-Engine. Wir freuen uns auf das Gespräch!

Beste Grüße,
RoboPlanet Team"}, "start": {"dateTime": start_time.strftime("%Y-%m-%dT%H:%M:%S"), "timeZone": "Europe/Berlin"}, "end": {"dateTime": end_time.strftime("%Y-%m-%dT%H:%M:%S"), "timeZone": "Europe/Berlin"}, "location": {"displayName": "Microsoft Teams Meeting"}, "attendees": [ {"emailAddress": {"address": lead_email, "name": "Interessent"}, "type": "required"}, {"emailAddress": {"address": "e.melcer@robo-planet.de", "name": "Elizabeta Melcer"}, "type": "required"} ], "isOnlineMeeting": True, "onlineMeetingProvider": "teamsForBusiness" } # URL zeigt auf info@ Kalender url = f"{GRAPH_API_ENDPOINT}/users/{target_organizer}/calendar/events" try: resp = requests.post(url, headers=headers, json=event_payload) if resp.status_code in [200, 201]: print(f"SUCCESS: Calendar event created for {target_organizer}.") return True else: print(f"ERROR: Failed to create event. HTTP {resp.status_code}: {resp.text}") return False except Exception as e: print(f"EXCEPTION during event creation: {e}") return False # --- E-MAIL & WEB LOGIK --- def generate_booking_html(request_id: str, suggestions: list) -> str: html = "

Bitte wählen Sie einen passenden Termin für ein 15-minütiges Kennenlerngespräch:

Mit Klick auf einen Termin wird automatisch eine Kalendereinladung an Sie versendet.

" return html # --- Server & API --- app = FastAPI() @app.get("/stop/{request_id}") async def stop(request_id: str): with _lock: if request_id in request_status_storage: request_status_storage[request_id]["status"] = "cancelled" return Response("

Versand gestoppt.

", media_type="text/html") return Response("Ungültig.", status_code=404) @app.get("/send_now/{request_id}") async def send_now(request_id: str): with _lock: if request_id in request_status_storage: request_status_storage[request_id]["status"] = "send_now" return Response("

E-Mail wird sofort versendet.

", media_type="text/html") return Response("Ungültig.", status_code=404) @app.get("/book_slot/{request_id}/{ts}") async def book_slot(request_id: str, ts: int): slot_time = datetime.fromtimestamp(ts, tz=TZ_BERLIN) with _lock: data = request_status_storage.get(request_id) if not data: return Response("Anfrage nicht gefunden.", status_code=404) if data.get("booked"): return Response("

Termin wurde bereits bestätigt.

", media_type="text/html") data["booked"] = True # Einladung senden success = create_calendar_invite(data['receiver'], data['company'], slot_time) if success: return Response(f"

Vielen Dank!

Die Einladung für den {slot_time.strftime('%d.%m. um %H:%M')} wurde an {data['receiver']} versendet.

", media_type="text/html") return Response("Fehler beim Erstellen des Termins.", status_code=500) # --- Haupt Workflow --- def send_email(subject, body, to_email, signature): token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID) headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} payload = {"message": {"subject": subject, "body": {"contentType": "HTML", "content": body + signature}, "toRecipients": [{"emailAddress": {"address": to_email}}]}, "saveToSentItems": "true"} requests.post(f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/sendMail", headers=headers, json=payload) def process_lead(request_id: str, company: str, opener: str, receiver: str): # 1. Freie Slots finden (Check bei e.melcer UND info) print(f"INFO: Searching slots for {company}...") # Wir nehmen hier e.melcer als Referenz für die Zeit cal_data = get_availability("e.melcer@robo-planet.de", (CAL_APPID, CAL_SECRET, CAL_TENNANT_ID)) suggestions = find_slots(*cal_data) if cal_data else [] with _lock: request_status_storage[request_id] = {"status": "pending", "company": company, "receiver": receiver, "slots": suggestions} # 2. Teams Notification send_time = datetime.now(TZ_BERLIN) + timedelta(minutes=DEFAULT_WAIT_MINUTES) card = { "type": "message", "attachments": [{"contentType": "application/vnd.microsoft.card.adaptive", "content": { "type": "AdaptiveCard", "version": "1.4", "body": [ {"type": "TextBlock", "text": f"🤖 E-Mail an {company} ({receiver}) geplant für {send_time.strftime('%H:%M')}", "weight": "Bolder"}, {"type": "TextBlock", "text": f"Vorgeschlagene Slots: {', '.join([s.strftime('%H:%M') for s in suggestions])}", "isSubtle": True} ], "actions": [ {"type": "Action.OpenUrl", "title": "❌ STOP", "url": f"{FEEDBACK_SERVER_BASE_URL}/stop/{request_id}"}, {"type": "Action.OpenUrl", "title": "✅ JETZT", "url": f"{FEEDBACK_SERVER_BASE_URL}/send_now/{request_id}"} ] }}] } requests.post(TEAMS_WEBHOOK_URL, json=card) # 3. Warten while datetime.now(TZ_BERLIN) < send_time: with _lock: if request_status_storage[request_id]["status"] in ["cancelled", "send_now"]: break time.sleep(5) # 4. Senden with _lock: if request_status_storage[request_id]["status"] == "cancelled": return print(f"INFO: Sending lead email to {receiver}...") booking_html = generate_booking_html(request_id, suggestions) with open(SIGNATURE_FILE_PATH, 'r') as f: sig = f.read() body = f"

Sehr geehrte Damen und Herren,

{opener}

{booking_html}" send_email(f"Ihr Kontakt mit RoboPlanet - {company}", body, receiver, sig) if __name__ == "__main__": # Starte den API-Server im Hintergrund Thread(target=lambda: uvicorn.run(app, host="0.0.0.0", port=8004), daemon=True).start() print("INFO: Trading Twins Feedback Server started on port 8004.") time.sleep(2) # Optional: E2E Test Lead auslösen if os.getenv("RUN_TEST_LEAD") == "true": print("\n--- Running E2E Test Lead ---") process_lead(f"req_{int(time.time())}", "Testfirma GmbH", "Wir haben Ihre Anfrage erhalten.", TEST_RECEIVER_EMAIL) print("\n[PROD] Manager is active and waiting for leads via import or API.") try: while True: time.sleep(1) except KeyboardInterrupt: print("Shutting down.")