diff --git a/lead-engine/trading_twins/manager.py b/lead-engine/trading_twins/manager.py index 85b9ac22..a7ede8fd 100644 --- a/lead-engine/trading_twins/manager.py +++ b/lead-engine/trading_twins/manager.py @@ -1,133 +1,216 @@ -import datetime -from sqlalchemy import create_engine, func -from sqlalchemy.orm import sessionmaker -from .models import ProposalJob, ProposedSlot, Base -import uuid +# lead-engine/trading_twins/manager.py +import requests +import json +import os +import time +from datetime import datetime, timedelta +from threading import Thread, Lock +import uvicorn +from fastapi import FastAPI, Response -# Konfiguration -DB_PATH = 'sqlite:///trading_twins/trading_twins.db' -MAX_PROPOSALS_PER_SLOT = 3 # Aggressiver Faktor 3 +# --- Konfiguration --- +# In einer echten Anwendung würden diese Werte aus .env-Dateien oder einer Config-Map geladen +TEAMS_WEBHOOK_URL = "https://wacklergroup.webhook.office.com/webhookb2/fe728cde-790c-4190-b1d3-be393ca0f9bd@6d85a9ef-3878-420b-8f43-38d6cb12b665/IncomingWebhook/e9a8ee6157594a6cab96048cf2ea2232/V2WFmjcbkMzSU4f6lDSdUOM9VNm7F7n1Th4YDiu3fLZ_Y1" +FEEDBACK_SERVER_BASE_URL = "http://localhost:8004" # TODO: Muss durch die öffentliche IP/Domain ersetzt werden +DEFAULT_WAIT_MINUTES = 5 -class TradingTwinsManager: - def __init__(self, db_path=DB_PATH): - self.engine = create_engine(db_path) - self.Session = sessionmaker(bind=self.engine) - Base.metadata.create_all(self.engine) +# --- In-Memory-Speicher für den Status der Anfragen --- +# In einem Produktionsszenario wäre hier eine robustere Lösung wie Redis oder eine DB nötig. +request_status_storage = {} +_lock = Lock() - def create_proposal_job(self, customer_email, customer_name, customer_company): - """Erstellt einen neuen Job, sucht Slots und speichert alles.""" - session = self.Session() - try: - # 1. Freie Slots finden (Mock für jetzt) - # Später: real_slots = self.fetch_calendar_availability() - candidate_slots = self._mock_calendar_availability() - - # 2. Beste Slots auswählen (mit Overbooking-Check) - selected_slots = self._select_best_slots(session, candidate_slots) - - if not selected_slots: - # Fallback: Wenn alles "voll" ist (sehr unwahrscheinlich bei Faktor 3), - # nehmen wir trotzdem den am wenigsten gebuchten Slot. - selected_slots = candidate_slots[:2] +# --- Modul zur Erstellung von Adaptive Cards --- - # 3. Job anlegen - job_uuid = str(uuid.uuid4()) - new_job = ProposalJob( - job_uuid=job_uuid, - customer_email=customer_email, - customer_name=customer_name, - customer_company=customer_company, - status='pending' - ) - session.add(new_job) - session.flush() # ID generieren +def create_adaptive_card_payload(customer_name: str, send_time: datetime, request_id: str) -> dict: + """ + Erstellt die JSON-Payload für die Adaptive Card in Teams. + """ + send_time_str = send_time.strftime("%H:%M Uhr") + + stop_url = f"{FEEDBACK_SERVER_BASE_URL}/stop/{request_id}" + send_now_url = f"{FEEDBACK_SERVER_BASE_URL}/send_now/{request_id}" - # 4. Slots speichern - for slot in selected_slots: - new_slot = ProposedSlot( - job_id=new_job.id, - start_time=slot['start'], - end_time=slot['end'] - ) - session.add(new_slot) - - session.commit() - return new_job.job_uuid, selected_slots - - except Exception as e: - session.rollback() - raise e - finally: - session.close() - - def _select_best_slots(self, session, candidate_slots): - """Wählt Slots aus, die noch nicht 'voll' sind (Faktor 3).""" - valid_slots = [] - - # Wir betrachten nur Vorschläge der letzten 24h als "aktiv" - yesterday = datetime.datetime.now() - datetime.timedelta(days=1) - - for slot in candidate_slots: - # Wie oft wurde dieser Start-Zeitpunkt in den letzten 24h vorgeschlagen? - count = session.query(func.count(ProposedSlot.id)).filter(ProposedSlot.start_time == slot['start']).filter(ProposedSlot.job.has(ProposalJob.created_at >= yesterday)).scalar() - - if count < MAX_PROPOSALS_PER_SLOT: - valid_slots.append(slot) - - if len(valid_slots) >= 2: - break - - return valid_slots - - def _mock_calendar_availability(self): - """Simuliert freie Termine für morgen.""" - tomorrow = datetime.date.today() + datetime.timedelta(days=1) - - # Ein Slot Vormittags (10:30), einer Nachmittags (14:00) - return [ + card = { + "type": "message", + "attachments": [ { - 'start': datetime.datetime.combine(tomorrow, datetime.time(10, 30)), - 'end': datetime.datetime.combine(tomorrow, datetime.time(11, 15)) - }, - { - 'start': datetime.datetime.combine(tomorrow, datetime.time(14, 0)), - 'end': datetime.datetime.combine(tomorrow, datetime.time(14, 45)) + "contentType": "application/vnd.microsoft.card.adaptive", + "content": { + "type": "AdaptiveCard", + "$schema": "http://adaptivecards.io/schemas/adaptive-card.json", + "version": "1.4", + "body": [ + { + "type": "TextBlock", + "text": f"🤖 Automatisierte E-Mail an {customer_name} (via Trading Twins) wird um {send_time_str} ausgesendet.", + "wrap": True, + "size": "Medium", + "weight": "Bolder" + }, + { + "type": "TextBlock", + "text": f"Wenn Du bis {send_time_str} NICHT reagierst, wird die generierte E-Mail automatisch ausgesendet.", + "wrap": True, + "isSubtle": True + } + ], + "actions": [ + { + "type": "Action.OpenUrl", + "title": "❌ STOP Aussendung", + "url": stop_url, + "style": "destructive" + }, + { + "type": "Action.OpenUrl", + "title": "✅ JETZT Aussenden", + "url": send_now_url, + "style": "positive" + } + ] + } } ] + } + return card - def get_job_status(self, job_uuid): - session = self.Session() - job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first() - status = job.status if job else None - session.close() - return status +# --- Haupt-Workflow-Logik --- - def get_job_details(self, job_uuid): - """Holt alle Details zu einem Job inklusive der Slots.""" - session = self.Session() - job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first() - if not job: - session.close() - return None - - # Wir müssen die Daten extrahieren, bevor die Session geschlossen wird - details = { - 'uuid': job.job_uuid, - 'email': job.customer_email, - 'name': job.customer_name, - 'company': job.customer_company, - 'status': job.status, - 'slots': [{'start': s.start_time, 'end': s.end_time} for s in job.slots] +def send_teams_notification(payload: dict): + """Sendet die vorbereitete Payload an den Teams Webhook.""" + try: + response = requests.post(TEAMS_WEBHOOK_URL, json=payload, timeout=10) + if response.status_code == 200 or response.status_code == 202: + print(f"INFO: Adaptive Card sent to Teams. Response: {response.text}") + return True + else: + print(f"ERROR: Failed to send card. Status: {response.status_code}, Text: {response.text}") + return False + except requests.RequestException as e: + print(f"ERROR: Request to Teams failed: {e}") + return False + +def process_email_request(request_id: str, customer_name: str): + """ + Der Hauptprozess, der die Benachrichtigung auslöst und auf das Ergebnis wartet. + """ + send_time = datetime.now() + timedelta(minutes=DEFAULT_WAIT_MINUTES) + + with _lock: + request_status_storage[request_id] = { + "status": "pending", # pending, cancelled, send_now, sent, timeout + "customer": customer_name, + "send_time": send_time.isoformat() } - session.close() - return details - def update_job_status(self, job_uuid, new_status): - session = self.Session() - job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first() - if job: - job.status = new_status - if new_status == 'approved': - job.approved_at = datetime.datetime.now() - session.commit() - session.close() + # 1. Adaptive Card erstellen und an Teams senden + adaptive_card = create_adaptive_card_payload(customer_name, send_time, request_id) + if not send_teams_notification(adaptive_card): + print(f"CRITICAL: Could not send Teams notification for request {request_id}. Aborting.") + return + + # 2. Warten auf menschliches Feedback oder Timeout + print(f"INFO: Waiting for feedback for request {request_id} until {send_time.strftime('%H:%M:%S')}...") + + while datetime.now() < send_time: + with _lock: + current_status = request_status_storage[request_id]["status"] + + if current_status == "cancelled": + print(f"INFO: Request {request_id} was cancelled by the user.") + return + + if current_status == "send_now": + print(f"INFO: Request {request_id} was triggered to send immediately by the user.") + break # Schleife verlassen und sofort senden + + time.sleep(5) + + # 3. Finale Entscheidung und Ausführung + with _lock: + final_status = request_status_storage[request_id]["status"] + # Update status to avoid race conditions + if final_status == "pending": + request_status_storage[request_id]["status"] = "timeout" + final_status = "timeout" + + + if final_status in ["send_now", "timeout"]: + print(f"SUCCESS: Proceeding to send email for request {request_id} (Status: {final_status})") + # --- HIER KOMMT DIE ECHTE E-MAIL LOGIK (MS GRAPH API) --- + # send_email_via_graph_api(customer_name, signature_path, banner_path) + print("MOCK: Email would be sent now.") + # --------------------------------------------------------- + with _lock: + request_status_storage[request_id]["status"] = "sent" + + else: + # Dieser Fall sollte eigentlich nicht eintreten, aber zur Sicherheit + print(f"WARN: Email for request {request_id} was not sent due to final status: {final_status}") + + +# --- Feedback-Server (FastAPI) --- + +app = FastAPI() + +@app.get("/stop/{request_id}") +async def stop_sending(request_id: str): + with _lock: + if request_id in request_status_storage: + if request_status_storage[request_id]["status"] == "pending": + request_status_storage[request_id]["status"] = "cancelled" + customer = request_status_storage[request_id]['customer'] + print(f"INFO: Received STOP for request {request_id}") + return Response(content=f"

✔️ Stopp-Anfrage für E-Mail an {customer} erhalten.

Der Versand wurde erfolgreich abgebrochen.

", media_type="text/html") + else: + status = request_status_storage[request_id]['status'] + return Response(content=f"

⚠️ Aktion bereits ausgeführt

Der Status für diese Anfrage ist bereits '{status}'. Es kann nicht mehr gestoppt werden.

", media_type="text/html", status_code=409) + + return Response(content="

❌ Fehler

Anfrage-ID nicht gefunden.

", media_type="text/html", status_code=404) + + +@app.get("/send_now/{request_id}") +async def send_now(request_id: str): + with _lock: + if request_id in request_status_storage: + if request_status_storage[request_id]["status"] == "pending": + request_status_storage[request_id]["status"] = "send_now" + customer = request_status_storage[request_id]['customer'] + print(f"INFO: Received SEND_NOW for request {request_id}") + return Response(content=f"

✔️ Sofort-Senden-Anfrage für E-Mail an {customer} erhalten.

Der Versand wird sofort ausgelöst.

", media_type="text/html") + else: + status = request_status_storage[request_id]['status'] + return Response(content=f"

⚠️ Aktion bereits ausgeführt

Der Status für diese Anfrage ist bereits '{status}'.

", media_type="text/html", status_code=409) + + return Response(content="

❌ Fehler

Anfrage-ID nicht gefunden.

", media_type="text/html", status_code=404) + + +def run_server(): + """Startet den FastAPI-Server.""" + uvicorn.run(app, host="0.0.0.0", port=8004) + + +if __name__ == "__main__": + # Starte den Feedback-Server in einem separaten Thread + server_thread = Thread(target=run_server) + server_thread.daemon = True + server_thread.start() + print("INFO: Feedback-Server started on port 8004 in background.") + time.sleep(2) # Kurz warten, bis der Server gestartet ist + + # Simuliere eine neue Anfrage + test_request_id = f"req_{int(time.time())}" + test_customer = "Klinikum Erding" + + print(f"\n--- Starting new email request for '{test_customer}' with ID: {test_request_id} ---") + process_email_request(test_request_id, test_customer) + print(f"--- Process for {test_request_id} finished. ---") + + # Halte das Hauptprogramm am Leben, damit der Server weiterlaufen kann + # In einer echten Anwendung wäre dies Teil eines größeren Dienstes. + print("\nManager is running. Press Ctrl+C to stop.") + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nShutting down manager.") \ No newline at end of file diff --git a/lead-engine/trading_twins/signature.html b/lead-engine/trading_twins/signature.html index 760ed388..41a8cb2b 100644 --- a/lead-engine/trading_twins/signature.html +++ b/lead-engine/trading_twins/signature.html @@ -1,27 +1,40 @@ -
-
-

Freundliche Grüße
- Elizabeta Melcer
- Inside Sales Managerin

- + + + + + + E-Mail Signatur + + + +

Freundliche Grüße

- RoboPlanet GmbH
- Schatzbogen 39, 81829 München
- T: +49 89 420490-402 | M: +49 175 8334071
- e.melcer@robo-planet.de | www.robo-planet.de + Elizabeta Melcer
+ Inside Sales Managerin

-

- LinkedIn | Instagram | Newsletteranmeldung + + RoboPlanet GmbH
+ Schatzbogen 39, 81829 München
+ T: +49 89 420490-402 | M: +49 175 8334071
+ e.melcer@robo-planet.de | www.robo-planet.de

- -

- Sitz der Gesellschaft München | Geschäftsführung: Axel Banoth
- Registergericht AG München, HRB 296113 | USt.-IdNr. DE400464410
- Hinweispflichten zum Datenschutz +

+ LinkedIn | Instagram | Newsletteranmeldung

- - - RoboPlanet Logo
- Webinar Einladung -
+

+ Sitz der Gesellschaft München | Geschäftsführung: Axel Banoth
+ Registergericht AG München, HRB 296113 | USt.-IdNr. DE400464410
+ Hinweispflichten zum Datenschutz +

+

+ RoboPlanet Webinar Einladung +

+ + \ No newline at end of file diff --git a/lead-engine/trading_twins/test_teams_webhook.py b/lead-engine/trading_twins/test_teams_webhook.py new file mode 100644 index 00000000..b9c8b16e --- /dev/null +++ b/lead-engine/trading_twins/test_teams_webhook.py @@ -0,0 +1,50 @@ +import requests +import json +import os + +def send_teams_message(webhook_url, message): + """ + Sends a simple message to a Microsoft Teams channel using a webhook. + + Args: + webhook_url (str): The URL of the incoming webhook. + message (str): The plain text message to send. + + Returns: + bool: True if the message was sent successfully (HTTP 200), False otherwise. + """ + if not webhook_url: + print("Error: TEAMS_WEBHOOK_URL is not set.") + return False + + headers = { + "Content-Type": "application/json" + } + + payload = { + "text": message + } + + try: + response = requests.post(webhook_url, headers=headers, data=json.dumps(payload), timeout=10) + + if response.status_code == 200: + print("Message sent successfully to Teams.") + return True + else: + print(f"Failed to send message. Status code: {response.status_code}") + print(f"Response: {response.text}") + return False + + except requests.exceptions.RequestException as e: + print(f"An error occurred while sending the request: {e}") + return False + +if __name__ == "__main__": + # The webhook URL is taken directly from the project description for this test. + # In a real application, this should be loaded from an environment variable. + webhook_url = "https://wacklergroup.webhook.office.com/webhookb2/fe728cde-790c-4190-b1d3-be393ca0f9bd@6d85a9ef-3878-420b-8f43-38d6cb12b665/IncomingWebhook/e9a8ee6157594a6cab96048cf2ea2232/d26033cd-a81f-41a6-8cd2-b4a3ba0b5a01/V2WFmjcbkMzSU4f6lDSdUOM9VNm7F7n1Th4YDiu3fLZ_Y1" + + test_message = "🤖 This is a test message from the Gemini Trading Twins Engine. If you see this, the webhook is working. [31988f42]" + + send_teams_message(webhook_url, test_message)