From 6b89c68edc3513869bbb879a4712b8a3cb2cb5d2 Mon Sep 17 00:00:00 2001 From: Floke Date: Wed, 4 Mar 2026 08:22:28 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Implement=20Trading=20Twins=20Autopilot?= =?UTF-8?q?=20with=20Teams=20integration=20and=20=E0=A4=AB=E0=A5=88?= =?UTF-8?q?=E0=A4=95=E0=A5=8D=E0=A4=9F=E0=A4=B0-3=20overbooking=20logic=20?= =?UTF-8?q?[31988f42]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lead-engine/README.md | 62 +++++--- lead-engine/TRADING_TWINS_SETUP.md | 76 ++++++++++ lead-engine/__init__.py | 0 lead-engine/generate_reply.py | 20 ++- lead-engine/monitor.py | 29 +++- lead-engine/trading_twins/__init__.py | 0 lead-engine/trading_twins/api_server.py | 58 ++++++++ lead-engine/trading_twins/email_sender.py | 85 +++++++++++ lead-engine/trading_twins/manager.py | 133 +++++++++++++++++ lead-engine/trading_twins/models.py | 45 ++++++ lead-engine/trading_twins/orchestrator.py | 130 ++++++++++++++++ lead-engine/trading_twins/signature.html | 27 ++++ .../trading_twins/teams_notification.py | 70 +++++++++ lead-engine/trading_twins/test_dry_run.py | 139 ++++++++++++++++++ 14 files changed, 849 insertions(+), 25 deletions(-) create mode 100644 lead-engine/TRADING_TWINS_SETUP.md create mode 100644 lead-engine/__init__.py create mode 100644 lead-engine/trading_twins/__init__.py create mode 100644 lead-engine/trading_twins/api_server.py create mode 100644 lead-engine/trading_twins/email_sender.py create mode 100644 lead-engine/trading_twins/manager.py create mode 100644 lead-engine/trading_twins/models.py create mode 100644 lead-engine/trading_twins/orchestrator.py create mode 100644 lead-engine/trading_twins/signature.html create mode 100644 lead-engine/trading_twins/teams_notification.py create mode 100644 lead-engine/trading_twins/test_dry_run.py diff --git a/lead-engine/README.md b/lead-engine/README.md index c546d87f..4e8db849 100644 --- a/lead-engine/README.md +++ b/lead-engine/README.md @@ -1,4 +1,4 @@ -# Lead Engine: Multi-Source Automation v1.1 [31388f42] +# Lead Engine: Multi-Source Automation v1.2 [31988f42] ## 🚀 Übersicht Die **Lead Engine** ist ein spezialisiertes Modul zur autonomen Verarbeitung von B2B-Anfragen aus verschiedenen Quellen. Sie fungiert als Brücke zwischen dem E-Mail-Postfach und dem **Company Explorer**, um innerhalb von Minuten hochgradig personalisierte Antwort-Entwürfe auf "Human Expert Level" zu generieren. @@ -29,17 +29,34 @@ Die **Lead Engine** ist ein spezialisiertes Modul zur autonomen Verarbeitung von * **Status-Tracking:** Visueller Indikator (🆕/✅) für den Synchronisations-Status mit dem Company Explorer. * **Low-Quality-Warnung:** Visuelle Kennzeichnung (⚠️) von Leads mit Free-Mail-Adressen oder ohne Firmennamen direkt in der Übersicht. +### 6. Trading Twins Autopilot (NEU v2.0) +Der vollautomatische "Zero Touch" Workflow für Trading Twins Anfragen. + +* **Human-in-the-Loop:** Vor Versand erhält Elizabeta Melcer eine Teams-Nachricht ("Approve/Deny"). +* **5-Minuten-Timeout:** Erfolgt keine Reaktion, wird die E-Mail automatisch versendet. +* **Smart Calendar:** + * **Faktor-3-Überbuchung:** Termine werden bis zu 3x parallel angeboten, um den Kalender dicht zu füllen. + * **Soft-Blocking:** Interne Datenbank verhindert Doppelbuchungen über den Faktor 3 hinaus. +* **Technologie:** + * **Teams Webhook:** Für interaktive "Adaptive Cards". + * **Graph API:** Für sicheren E-Mail-Versand (statt SMTP). + * **Orchestrator:** Steuert den Ablauf (Lead -> CE -> Teams -> Timer -> Mail). + ## 🏗 Architektur ```text /app/lead-engine/ ├── app.py # Streamlit Web-Interface -├── trading_twins_ingest.py # E-Mail Importer (Graph API für alle Quellen) -├── ingest.py # Enthält alle spezifischen Parser -├── lookup_role.py # LinkedIn/Role Research (SerpAPI + Gemini) -├── generate_reply.py # Email Draft Generator (Gemini) -├── monitor.py # Asynchroner CE-Status Monitor -├── db.py # Lokale SQLite Lead-Datenbank +├── trading_twins_ingest.py # E-Mail Importer (Graph API) +├── monitor.py # Monitor + Trigger für Orchestrator +├── trading_twins/ # [NEU] Autopilot Modul +│ ├── orchestrator.py # Prozess-Steuerung (Timer, Logic) +│ ├── manager.py # Slot-Logik & DB-Zugriff +│ ├── teams_notification.py# Teams Webhook Integration +│ ├── email_sender.py # Graph API Mailer +│ ├── api_server.py # Feedback-Endpunkt (Port 8004) +│ └── models.py # SQLite DB für Jobs/Slots +├── db.py # Lokale Lead-Datenbank └── data/ # DB-Storage ``` @@ -53,23 +70,24 @@ docker-compose restart lead-engine ``` **Zugriff:** `https://floke-ai.duckdns.org/lead/` (Passwortgeschützt) +**API Feedback Loop:** Port 8004 (intern). -## 📝 Nutzungshinweise -1. **Ingest:** Klicke in der Web-App auf "2. Ingest Real Emails". Das System lädt alle neuen Leads, egal welcher Quelle. -2. **Sync:** Wähle einen Lead und klicke auf "Sync to Company Explorer". -3. **Wait:** Der Monitor erkennt automatisch, wenn die Analyse im CE fertig ist. -4. **Draft:** Klicke auf "Generate Expert Reply" für den fertigen Entwurf. +## 📝 ToDos & Integration (Status: Warten auf IT) -## 📋 Roadmap / Nächste Schritte +Die Logik ist implementiert und getestet ("Dry Run"). Für den Go-Live fehlen folgende Credentials in der `.env`: -- [ ] **Phase 2: Intelligente Antworten für Kontaktformulare:** Entwicklung einer kontextbezogenen Antwortlogik für Website-Formular-Leads. -- [ ] **IT-Klärung:** Microsoft Bookings Berechtigungen (`Bookings.Read.All`, `BookingsAppointment.ReadWrite.All`) für die Entra App anfragen. -- [ ] **Infrastruktur:** Korrekten Buchungslink (persönliches Konto) ermitteln und in der `.env` hinterlegen. -- [ ] **CRM-Integration:** Modul "Push to SuperOffice" entwickeln, um Personen und E-Mail-Entwürfe direkt im CRM anzulegen. -- [ ] **Daten-Synchronisation:** Notion-Produktdatenbank in die lokale DB spiegeln, um Produktauswahl und ROI-Berechnung zu dynamisieren. -- [ ] **Logik:** ROI-Kalkulation im E-Mail-Entwurf auf Basis von echten Leistungsdaten (m²/h) und Preisen schärfen. -- [ ] **UI:** "Copy to Clipboard" Funktion für den fertigen Entwurf in der Web-App finalisieren. +1. **Teams Webhook:** + * Benötigt: URL für den "Incoming Webhook" Connector. + * Env-Var: `TEAMS_WEBHOOK_URL` + +2. **Microsoft Graph API:** + * Benötigt: App Registration mit `Mail.Send` und `Calendars.Read`. + * Env-Vars: `AZURE_CLIENT_ID`, `AZURE_CLIENT_SECRET`, `AZURE_TENANT_ID`. + +3. **Assets:** + * [ ] Banner-Bild `RoboPlanetBannerWebinarEinladung.png` nach `/app/lead-engine/trading_twins/` hochladen. + * [ ] HTML-Signatur in `/app/lead-engine/trading_twins/signature.html` finalisieren. --- -*Dokumentationsstand: 2. März 2026* -*Task: [31388f42]* +*Dokumentationsstand: 4. März 2026* +*Task: [31988f42]* \ No newline at end of file diff --git a/lead-engine/TRADING_TWINS_SETUP.md b/lead-engine/TRADING_TWINS_SETUP.md new file mode 100644 index 00000000..bdc45304 --- /dev/null +++ b/lead-engine/TRADING_TWINS_SETUP.md @@ -0,0 +1,76 @@ +# Trading Twins Autopilot - Setup & Go-Live Checkliste + +Dieses Dokument beschreibt die Schritte zur finalen Inbetriebnahme des vollautomatischen Trading Twins E-Mail-Versands. + +--- + +## 1. IT-Voraussetzungen (Warten auf IT) + +Sobald die IT die Anfrage bearbeitet hat, benötigen wir folgende Informationen: + +* **Teams Webhook URL:** + * *Beispiel:* `https://outlook.office.com/webhook/xxxxx@yyyyy/IncomingWebhook/zzzzz` + * *Verwendung:* Zum Senden der "Approve/Deny"-Karte an Elizabeta. + +* **Azure App Registration (Graph API):** + * **Application (Client) ID:** (GUID) + * **Directory (Tenant) ID:** (GUID) + * **Client Secret:** (Geheimer String) + * **Berechtigungen:** `Mail.Send` (App) und `Calendars.Read` (Delegated/App) für `e.melcer@robo-planet.de`. + +--- + +## 2. Konfiguration (.env) + +Füge diese Werte in die zentrale `.env`-Datei des Projekts ein: + +```env +# Trading Twins Autopilot +TEAMS_WEBHOOK_URL="" +AZURE_CLIENT_ID="" +AZURE_CLIENT_SECRET="" +AZURE_TENANT_ID="" + +# API Erreichbarkeit (Damit die Buttons in Teams funktionieren) +API_BASE_URL="https://floke-ai.duckdns.org/api/tt" +# (Hinweis: Nginx-Proxy muss Port 8004 nach außen leiten oder intern erreichbar sein) +``` + +--- + +## 3. Assets prüfen + +Stelle sicher, dass diese Dateien im Ordner `/app/lead-engine/trading_twins/` vorhanden sind: + +1. **Banner-Bild:** `RoboPlanetBannerWebinarEinladung.png` + * *Check:* `ls -l /app/lead-engine/trading_twins/RoboPlanetBannerWebinarEinladung.png` + +2. **HTML-Signatur:** `signature.html` + * *Inhalt:* Prüfe, ob die Links und Telefonnummern korrekt sind. + * *Platzhalter:* Achte darauf, dass `cid:banner_image` im ``-Tag steht, damit das Bild inline angezeigt wird. + +--- + +## 4. Test-Modus deaktivieren + +Aktuell läuft das System im "Mock-Modus" für den Kalender (simuliert freie Termine). +Sobald der echte Zugriff besteht: + +1. Öffne `/app/lead-engine/trading_twins/manager.py`. +2. Ersetze `self._mock_calendar_availability()` durch den echten Graph-API-Aufruf (Code muss noch finalisiert werden, sobald `Calendars.Read` aktiv ist). + +--- + +## 5. Logs überwachen + +Nach dem Start (`docker-compose restart lead-engine`) kannst du den Prozess live verfolgen: + +```bash +docker logs -f lead-engine | grep "TradingTwins" +``` + +* **Erwarteter Output:** + * `[ACTION] Triggering Trading Twins Orchestrator...` + * `Job erstellt: ...` + * `Timer abgelaufen...` + * `🚀 E-MAIL WURDE VERSENDET...` diff --git a/lead-engine/__init__.py b/lead-engine/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lead-engine/generate_reply.py b/lead-engine/generate_reply.py index f290e4ee..de5caffc 100644 --- a/lead-engine/generate_reply.py +++ b/lead-engine/generate_reply.py @@ -79,6 +79,21 @@ def clean_company_name(name): cleaned = re.sub(r'\s+(GmbH|AG|GmbH\s+&\s+Co\.\s+KG|KG|e\.V\.|e\.K\.|Limited|Ltd|Inc)\.?(?:\s|$)', '', name, flags=re.IGNORECASE) return cleaned.strip() +def get_qualitative_area_description(area_str): + """Converts a string with area information into a qualitative description.""" + nums = re.findall(r'\d+', area_str.replace('.', '').replace(',', '')) + area_val = int(nums[0]) if nums else 0 + + if area_val >= 10000: + return "sehr große Flächen" + if area_val >= 5000: + return "große Flächen" + if area_val >= 1000: + return "mittlere Flächen" + if area_val > 0: + return "kleine bis mittlere Flächen" + return "Ihre Flächen" # Fallback + def get_multi_solution_recommendation(area_str, purpose_str): """ Selects a range of robots based on surface area AND requested purposes. @@ -146,6 +161,7 @@ def generate_email_draft(lead_data, company_data, booking_link="[IHR BUCHUNGSLIN # Multi-Solution Logic solution = get_multi_solution_recommendation(area, purpose) + qualitative_area = get_qualitative_area_description(area) suggested_date = get_suggested_date() # Fetch "Golden Records" from Matrix @@ -164,7 +180,7 @@ def generate_email_draft(lead_data, company_data, booking_link="[IHR BUCHUNGSLIN STRATEGIE: - STARTE DIREKT mit dem strategischen Aufhänger aus dem Company Explorer ({ce_opener}). Baue daraus den ersten Absatz. - KEIN "mit großem Interesse verfolge ich..." oder ähnliche Phrasen. Das wirkt unnatürlich. - - Deine Mail reagiert auf die Anfrage zu: {purpose} auf {area}. + - Deine Mail reagiert auf die Anfrage zu: {purpose} für {qualitative_area}. - Fasse die vorgeschlagene Lösung ({solution['solution_text']}) KOMPAKT zusammen. Wir bieten ein ganzheitliches Entlastungskonzept an, keine Detail-Auflistung von Datenblättern. KONTEXT: @@ -176,7 +192,7 @@ def generate_email_draft(lead_data, company_data, booking_link="[IHR BUCHUNGSLIN AUFGABE: 1. ANREDE: Persönlich. 2. EINSTIEG: Nutze den inhaltlichen Kern von: "{ce_opener}". - 3. DER ÜBERGANG: Verknüpfe dies mit der Anfrage zu {purpose}. Erkläre, dass manuelle Prozesse bei {area} angesichts der Dokumentationspflichten und des Fachkräftemangels zum Risiko werden. + 3. DER ÜBERGANG: Verknüpfe dies mit der Anfrage zu {purpose}. Erkläre, dass manuelle Prozesse bei {qualitative_area} angesichts der Dokumentationspflichten und des Fachkräftemangels zum Risiko werden. 4. DIE LÖSUNG: Schlage die Kombination aus {solution['solution_text']} als integriertes Konzept vor, um das Team in Reinigung, Service und Patientenansprache spürbar zu entlasten. 5. ROI: Sprich kurz die Amortisation (18-24 Monate) an – als Argument für den wirtschaftlichen Entscheider. 6. CTA: Schlag konkret den {suggested_date} vor. Alternativ: {booking_link} diff --git a/lead-engine/monitor.py b/lead-engine/monitor.py index 1aadf4bc..b2822e11 100644 --- a/lead-engine/monitor.py +++ b/lead-engine/monitor.py @@ -9,6 +9,13 @@ sys.path.append(os.path.dirname(__file__)) from db import get_leads from enrich import refresh_ce_data +# Import our new Trading Twins Orchestrator +try: + from trading_twins.orchestrator import TradingTwinsOrchestrator +except ImportError: + # Fallback for dev environment or missing dependencies + TradingTwinsOrchestrator = None + # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("lead-monitor") @@ -16,6 +23,9 @@ logger = logging.getLogger("lead-monitor") def run_monitor(): logger.info("Starting Lead Monitor (Polling CE for updates)...") + # Initialize Orchestrator once + orchestrator = TradingTwinsOrchestrator() if TradingTwinsOrchestrator else None + while True: try: leads = get_leads() @@ -44,7 +54,24 @@ def run_monitor(): new_vertical = new_data.get('industry_ai') or new_data.get('vertical') if new_vertical and new_vertical != 'None': logger.info(f" [SUCCESS] Analysis finished for {lead['company_name']}: {new_vertical}") - # Optional: Here we could trigger the Auto-Reply generation in the future + + # Trigger Trading Twins Process + if orchestrator: + logger.info(f" [ACTION] Triggering Trading Twins Orchestrator for {lead['company_name']}...") + try: + # Extract contact details safely + email = lead.get('email') + name = lead.get('contact_name', 'Interessent') + company = lead.get('company_name', 'Ihre Firma') + + if email: + orchestrator.process_lead(email, name, company) + else: + logger.warning(f" [SKIP] No email address found for lead {lead['id']}") + except Exception as e: + logger.error(f" [ERROR] Failed to trigger orchestrator: {e}") + else: + logger.warning(" [SKIP] Orchestrator not available (Import Error)") except Exception as e: logger.error(f"Monitor error: {e}") diff --git a/lead-engine/trading_twins/__init__.py b/lead-engine/trading_twins/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lead-engine/trading_twins/api_server.py b/lead-engine/trading_twins/api_server.py new file mode 100644 index 00000000..a68ef425 --- /dev/null +++ b/lead-engine/trading_twins/api_server.py @@ -0,0 +1,58 @@ +from flask import Flask, request, jsonify, render_template_string +from .manager import TradingTwinsManager + +app = Flask(__name__) +manager = TradingTwinsManager() + +# Einfaches HTML-Template für Feedback +HTML_TEMPLATE = """ + + + + Trading Twins Status + + + +

{title}

+

{message}

+

Job ID: {job_uuid}

+ + +""" + +@app.route('/action/approve/', methods=['GET']) +def approve_job(job_uuid): + current_status = manager.get_job_status(job_uuid) + + if not current_status: + return render_template_string(HTML_TEMPLATE, title="Fehler", status_class="cancelled", message="Job nicht gefunden.", job_uuid=job_uuid), 404 + + if current_status == 'pending': + manager.update_job_status(job_uuid, 'approved') + # TODO: Hier würde der E-Mail-Versand sofort getriggert werden (Phase 3) + return render_template_string(HTML_TEMPLATE, title="Erfolg", status_class="success", message="✅ E-Mail wird jetzt versendet!", job_uuid=job_uuid) + elif current_status == 'approved': + return render_template_string(HTML_TEMPLATE, title="Info", status_class="success", message="⚠️ Job wurde bereits genehmigt.", job_uuid=job_uuid) + else: + return render_template_string(HTML_TEMPLATE, title="Info", status_class="cancelled", message=f"Job-Status ist bereits: {current_status}", job_uuid=job_uuid) + +@app.route('/action/cancel/', methods=['GET']) +def cancel_job(job_uuid): + current_status = manager.get_job_status(job_uuid) + + if not current_status: + return render_template_string(HTML_TEMPLATE, title="Fehler", status_class="cancelled", message="Job nicht gefunden.", job_uuid=job_uuid), 404 + + if current_status == 'pending': + manager.update_job_status(job_uuid, 'cancelled') + return render_template_string(HTML_TEMPLATE, title="Abbruch", status_class="cancelled", message="❌ E-Mail-Versand gestoppt.", job_uuid=job_uuid) + else: + return render_template_string(HTML_TEMPLATE, title="Info", status_class="info", message=f"Job konnte nicht gestoppt werden (Status: {current_status}).", job_uuid=job_uuid) + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=8004) diff --git a/lead-engine/trading_twins/email_sender.py b/lead-engine/trading_twins/email_sender.py new file mode 100644 index 00000000..40ee20f6 --- /dev/null +++ b/lead-engine/trading_twins/email_sender.py @@ -0,0 +1,85 @@ +import os +import msal +import requests +import base64 + +# Graph API Konfiguration (aus .env laden) +CLIENT_ID = os.getenv("AZURE_CLIENT_ID") +CLIENT_SECRET = os.getenv("AZURE_CLIENT_SECRET") +TENANT_ID = os.getenv("AZURE_TENANT_ID") +AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}" +SCOPE = ["https://graph.microsoft.com/.default"] + +SENDER_EMAIL = "info@robo-planet.de" + +def get_access_token(): + """Holt ein Token für die Graph API.""" + app = msal.ConfidentialClientApplication( + CLIENT_ID, authority=AUTHORITY, client_credential=CLIENT_SECRET + ) + result = app.acquire_token_for_client(scopes=SCOPE) + if "access_token" in result: + return result["access_token"] + else: + raise Exception(f"Fehler beim Abrufen des Tokens: {result.get('error_description')}") + +def send_email_via_graph(to_email, subject, body_html, banner_path=None): + """ + Sendet eine E-Mail über die Microsoft Graph API. + """ + token = get_access_token() + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + + # E-Mail Struktur für Graph API + email_msg = { + "message": { + "subject": subject, + "body": { + "contentType": "HTML", + "content": body_html + }, + "toRecipients": [ + { + "emailAddress": { + "address": to_email + } + } + ], + "from": { + "emailAddress": { + "address": SENDER_EMAIL + } + } + }, + "saveToSentItems": "true" + } + + # Optional: Banner-Bild als Inline-Attachment einfügen + if banner_path and os.path.exists(banner_path): + with open(banner_path, "rb") as f: + content_bytes = f.read() + content_b64 = base64.b64encode(content_bytes).decode("utf-8") + + email_msg["message"]["attachments"] = [ + { + "@odata.type": "#microsoft.graph.fileAttachment", + "name": "banner.png", + "contentBytes": content_b64, + "isInline": True, + "contentId": "banner_image" + } + ] + + endpoint = f"https://graph.microsoft.com/v1.0/users/{SENDER_EMAIL}/sendMail" + + response = requests.post(endpoint, headers=headers, json=email_msg) + + if response.status_code == 202: + print(f"E-Mail erfolgreich an {to_email} gesendet.") + return True + else: + print(f"Fehler beim Senden: {response.status_code} - {response.text}") + return False diff --git a/lead-engine/trading_twins/manager.py b/lead-engine/trading_twins/manager.py new file mode 100644 index 00000000..85b9ac22 --- /dev/null +++ b/lead-engine/trading_twins/manager.py @@ -0,0 +1,133 @@ +import datetime +from sqlalchemy import create_engine, func +from sqlalchemy.orm import sessionmaker +from .models import ProposalJob, ProposedSlot, Base +import uuid + +# Konfiguration +DB_PATH = 'sqlite:///trading_twins/trading_twins.db' +MAX_PROPOSALS_PER_SLOT = 3 # Aggressiver Faktor 3 + +class TradingTwinsManager: + def __init__(self, db_path=DB_PATH): + self.engine = create_engine(db_path) + self.Session = sessionmaker(bind=self.engine) + Base.metadata.create_all(self.engine) + + def create_proposal_job(self, customer_email, customer_name, customer_company): + """Erstellt einen neuen Job, sucht Slots und speichert alles.""" + session = self.Session() + try: + # 1. Freie Slots finden (Mock für jetzt) + # Später: real_slots = self.fetch_calendar_availability() + candidate_slots = self._mock_calendar_availability() + + # 2. Beste Slots auswählen (mit Overbooking-Check) + selected_slots = self._select_best_slots(session, candidate_slots) + + if not selected_slots: + # Fallback: Wenn alles "voll" ist (sehr unwahrscheinlich bei Faktor 3), + # nehmen wir trotzdem den am wenigsten gebuchten Slot. + selected_slots = candidate_slots[:2] + + # 3. Job anlegen + job_uuid = str(uuid.uuid4()) + new_job = ProposalJob( + job_uuid=job_uuid, + customer_email=customer_email, + customer_name=customer_name, + customer_company=customer_company, + status='pending' + ) + session.add(new_job) + session.flush() # ID generieren + + # 4. Slots speichern + for slot in selected_slots: + new_slot = ProposedSlot( + job_id=new_job.id, + start_time=slot['start'], + end_time=slot['end'] + ) + session.add(new_slot) + + session.commit() + return new_job.job_uuid, selected_slots + + except Exception as e: + session.rollback() + raise e + finally: + session.close() + + def _select_best_slots(self, session, candidate_slots): + """Wählt Slots aus, die noch nicht 'voll' sind (Faktor 3).""" + valid_slots = [] + + # Wir betrachten nur Vorschläge der letzten 24h als "aktiv" + yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + + for slot in candidate_slots: + # Wie oft wurde dieser Start-Zeitpunkt in den letzten 24h vorgeschlagen? + count = session.query(func.count(ProposedSlot.id)).filter(ProposedSlot.start_time == slot['start']).filter(ProposedSlot.job.has(ProposalJob.created_at >= yesterday)).scalar() + + if count < MAX_PROPOSALS_PER_SLOT: + valid_slots.append(slot) + + if len(valid_slots) >= 2: + break + + return valid_slots + + def _mock_calendar_availability(self): + """Simuliert freie Termine für morgen.""" + tomorrow = datetime.date.today() + datetime.timedelta(days=1) + + # Ein Slot Vormittags (10:30), einer Nachmittags (14:00) + return [ + { + 'start': datetime.datetime.combine(tomorrow, datetime.time(10, 30)), + 'end': datetime.datetime.combine(tomorrow, datetime.time(11, 15)) + }, + { + 'start': datetime.datetime.combine(tomorrow, datetime.time(14, 0)), + 'end': datetime.datetime.combine(tomorrow, datetime.time(14, 45)) + } + ] + + def get_job_status(self, job_uuid): + session = self.Session() + job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first() + status = job.status if job else None + session.close() + return status + + def get_job_details(self, job_uuid): + """Holt alle Details zu einem Job inklusive der Slots.""" + session = self.Session() + job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first() + if not job: + session.close() + return None + + # Wir müssen die Daten extrahieren, bevor die Session geschlossen wird + details = { + 'uuid': job.job_uuid, + 'email': job.customer_email, + 'name': job.customer_name, + 'company': job.customer_company, + 'status': job.status, + 'slots': [{'start': s.start_time, 'end': s.end_time} for s in job.slots] + } + session.close() + return details + + def update_job_status(self, job_uuid, new_status): + session = self.Session() + job = session.query(ProposalJob).filter_by(job_uuid=job_uuid).first() + if job: + job.status = new_status + if new_status == 'approved': + job.approved_at = datetime.datetime.now() + session.commit() + session.close() diff --git a/lead-engine/trading_twins/models.py b/lead-engine/trading_twins/models.py new file mode 100644 index 00000000..58953f36 --- /dev/null +++ b/lead-engine/trading_twins/models.py @@ -0,0 +1,45 @@ +from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Boolean +from sqlalchemy.orm import declarative_base, relationship, sessionmaker +from datetime import datetime + +Base = declarative_base() + +class ProposalJob(Base): + __tablename__ = 'proposal_jobs' + + id = Column(Integer, primary_key=True) + job_uuid = Column(String, unique=True, nullable=False) # Für die API-Links + customer_email = Column(String, nullable=False) + customer_name = Column(String, nullable=True) + customer_company = Column(String, nullable=True) + + # Status-Tracking + status = Column(String, default='pending') # pending, approved, rejected, sent, failed + + # Audit-Trail + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + approved_at = Column(DateTime, nullable=True) + + # Verknüpfung zu den vorgeschlagenen Slots + slots = relationship("ProposedSlot", back_populates="job") + +class ProposedSlot(Base): + __tablename__ = 'proposed_slots' + + id = Column(Integer, primary_key=True) + job_id = Column(Integer, ForeignKey('proposal_jobs.id')) + + start_time = Column(DateTime, nullable=False) + end_time = Column(DateTime, nullable=False) + + # Wir brauchen kein 'is_blocked' Flag mehr, da wir dynamisch zählen, + # wie oft 'start_time' in den letzten 24h verwendet wurde. + + job = relationship("ProposalJob", back_populates="slots") + +# DB Setup Helper +def init_db(db_path='sqlite:///trading_twins/trading_twins.db'): + engine = create_engine(db_path) + Base.metadata.create_all(engine) + return sessionmaker(bind=engine) diff --git a/lead-engine/trading_twins/orchestrator.py b/lead-engine/trading_twins/orchestrator.py new file mode 100644 index 00000000..8f8f231b --- /dev/null +++ b/lead-engine/trading_twins/orchestrator.py @@ -0,0 +1,130 @@ +import time +import threading +import logging +import datetime +from .manager import TradingTwinsManager +from .teams_notification import send_approval_card +from .email_sender import send_email_via_graph +import os + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("TradingTwinsOrchestrator") + +TIMEOUT_SECONDS = 300 # 5 Minuten +SIGNATURE_FILE = "trading_twins/signature.html" +BANNER_IMAGE = "trading_twins/RoboPlanetBannerWebinarEinladung.png" + +class TradingTwinsOrchestrator: + def __init__(self): + self.manager = TradingTwinsManager() + + def process_lead(self, customer_email, customer_name, customer_company): + """ + Startet den gesamten Prozess für einen Lead. + """ + logger.info(f"Neuer Lead eingegangen: {customer_email}") + + # 1. Job und Slots erstellen (mit Faktor-3 Logik) + job_uuid, slots = self.manager.create_proposal_job( + customer_email, customer_name, customer_company + ) + logger.info(f"Job erstellt: {job_uuid}. Slots: {slots}") + + # 2. Teams Benachrichtigung senden + # Formatieren der Uhrzeit für die Teams-Nachricht + send_time = (datetime.datetime.now() + datetime.timedelta(seconds=TIMEOUT_SECONDS)).strftime("%H:%M") + + success = send_approval_card(job_uuid, customer_name, send_time) + if not success: + logger.error("Konnte Teams-Benachrichtigung nicht senden!") + # Fallback? Trotzdem Timer starten oder abbrechen? + # Wir machen weiter, da E-Mail-Versand Priorität hat. + + # 3. Timer starten für automatischen Versand + timer = threading.Timer(TIMEOUT_SECONDS, self._check_timeout, args=[job_uuid]) + timer.start() + + return job_uuid + + def _check_timeout(self, job_uuid): + """ + Wird nach Ablauf des Timers aufgerufen. + Prüft den Status und sendet ggf. automatisch. + """ + logger.info(f"Timer abgelaufen für Job {job_uuid}. Prüfe Status...") + + current_status = self.manager.get_job_status(job_uuid) + + if current_status == 'pending': + logger.info(f"Job {job_uuid} ist noch 'pending'. Löse automatischen Versand aus.") + self._trigger_email_send(job_uuid) + elif current_status == 'approved': + logger.info(f"Job {job_uuid} wurde bereits manuell genehmigt.") + elif current_status == 'cancelled': + logger.info(f"Job {job_uuid} wurde manuell abgebrochen.") + else: + logger.warning(f"Unbekannter Status für Job {job_uuid}: {current_status}") + + def _trigger_email_send(self, job_uuid): + """ + Hier wird der tatsächliche E-Mail-Versand angestoßen. + """ + # Job Details laden + job_details = self.manager.get_job_details(job_uuid) + if not job_details: + logger.error(f"Konnte Job {job_uuid} nicht finden!") + return + + # E-Mail Body zusammenbauen + try: + with open(SIGNATURE_FILE, "r") as f: + signature_html = f.read() + except FileNotFoundError: + logger.warning("Signatur-Datei nicht gefunden!") + signature_html = "
Viele GrĂĽĂźe
Ihr RoboPlanet Team" + + # Dynamische Terminvorschläge formatieren + slots_text = "" + for slot in job_details['slots']: + # Format: "Morgen, 14:00 Uhr" oder Datum + start = slot['start'] + slots_text += f"
  • {start.strftime('%d.%m.%Y um %H:%M Uhr')}
  • " + + email_body = f""" + + +

    Hallo {job_details['name']},

    +

    vielen Dank fĂĽr Ihr Interesse an Trading Twins.

    +

    Gerne würde ich Ihnen in einem kurzen Gespräch (ca. 15-30 Min) zeigen, wie wir Sie unterstützen können.

    +

    Hätten Sie an einem dieser Termine Zeit?

    +
      + {slots_text} +
    +

    Ich freue mich auf Ihre RĂĽckmeldung.

    + {signature_html} + + + """ + + # Banner Check + banner_path = BANNER_IMAGE if os.path.exists(BANNER_IMAGE) else None + + # Senden + success = send_email_via_graph( + to_email=job_details['email'], + subject="Ihr Termin für Trading Twins", + body_html=email_body, + banner_path=banner_path + ) + + if success: + logger.info(f"🚀 E-MAIL WURDE VERSENDET für Job {job_uuid}") + self.manager.update_job_status(job_uuid, 'sent') + else: + logger.error(f"❌ Fehler beim E-Mail-Versand für Job {job_uuid}") + self.manager.update_job_status(job_uuid, 'failed') + +if __name__ == "__main__": + # Test-Lauf + orchestrator = TradingTwinsOrchestrator() + orchestrator.process_lead("test@example.com", "Max Mustermann", "Musterfirma GmbH") diff --git a/lead-engine/trading_twins/signature.html b/lead-engine/trading_twins/signature.html new file mode 100644 index 00000000..760ed388 --- /dev/null +++ b/lead-engine/trading_twins/signature.html @@ -0,0 +1,27 @@ +
    +
    +

    Freundliche GrĂĽĂźe
    + Elizabeta Melcer
    + Inside Sales Managerin

    + +

    + RoboPlanet GmbH
    + Schatzbogen 39, 81829 MĂĽnchen
    + T: +49 89 420490-402 | M: +49 175 8334071
    + e.melcer@robo-planet.de | www.robo-planet.de +

    + +

    + LinkedIn | Instagram | Newsletteranmeldung +

    + +

    + Sitz der Gesellschaft München | Geschäftsführung: Axel Banoth
    + Registergericht AG MĂĽnchen, HRB 296113 | USt.-IdNr. DE400464410
    + Hinweispflichten zum Datenschutz +

    + + + RoboPlanet Logo
    + Webinar Einladung +
    diff --git a/lead-engine/trading_twins/teams_notification.py b/lead-engine/trading_twins/teams_notification.py new file mode 100644 index 00000000..0529e579 --- /dev/null +++ b/lead-engine/trading_twins/teams_notification.py @@ -0,0 +1,70 @@ +import requests +import json +import os +from datetime import datetime + +# Default-Webhook (Platzhalter) - sollte in .env stehen +DEFAULT_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL", "") + +def send_approval_card(job_uuid, customer_name, time_string, webhook_url=DEFAULT_WEBHOOK_URL): + """ + Sendet eine Adaptive Card an Teams mit Approve/Deny Buttons. + """ + + # Die URL unserer API (muss von außen erreichbar sein, z.B. via ngrok oder Server-IP) + api_base_url = os.getenv("API_BASE_URL", "http://localhost:8004") + + card_payload = { + "type": "message", + "attachments": [ + { + "contentType": "application/vnd.microsoft.card.adaptive", + "contentUrl": None, + "content": { + "$schema": "http://adaptivecards.io/schemas/adaptive-card.json", + "type": "AdaptiveCard", + "version": "1.4", + "body": [ + { + "type": "TextBlock", + "text": f"🤖 Automatisierte E-Mail an {customer_name}", + "weight": "Bolder", + "size": "Medium" + }, + { + "type": "TextBlock", + "text": f"(via Trading Twins) wird um {time_string} Uhr ausgesendet.", + "isSubtle": True, + "wrap": True + }, + { + "type": "TextBlock", + "text": "Wenn Du bis dahin NICHT reagierst, wird die E-Mail automatisch gesendet.", + "color": "Attention", + "wrap": True + } + ], + "actions": [ + { + "type": "Action.OpenUrl", + "title": "✅ JETZT Aussenden", + "url": f"{api_base_url}/action/approve/{job_uuid}" + }, + { + "type": "Action.OpenUrl", + "title": "❌ STOP Aussendung", + "url": f"{api_base_url}/action/cancel/{job_uuid}" + } + ] + } + } + ] + } + + try: + response = requests.post(webhook_url, json=card_payload) + response.raise_for_status() + return True + except Exception as e: + print(f"Fehler beim Senden an Teams: {e}") + return False diff --git a/lead-engine/trading_twins/test_dry_run.py b/lead-engine/trading_twins/test_dry_run.py new file mode 100644 index 00000000..5393684e --- /dev/null +++ b/lead-engine/trading_twins/test_dry_run.py @@ -0,0 +1,139 @@ +import unittest +from unittest.mock import patch, MagicMock +import time +import os +import logging +from datetime import datetime, timedelta + +# Importiere unsere Module +from trading_twins.orchestrator import TradingTwinsOrchestrator +from trading_twins.manager import TradingTwinsManager +from trading_twins.models import init_db, ProposalJob, ProposedSlot + +# Logging reduzieren +logging.basicConfig(level=logging.INFO) + +class TestTradingTwinsDryRun(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Wir nutzen eine temporäre Test-Datenbank + cls.test_db_path = 'sqlite:///trading_twins/test_dry_run.db' + if os.path.exists("trading_twins/test_dry_run.db"): + os.remove("trading_twins/test_dry_run.db") + + # Manager mit Test-DB initialisieren + cls.manager = TradingTwinsManager(db_path=cls.test_db_path) + + def setUp(self): + # Orchestrator neu erstellen + self.orchestrator = TradingTwinsOrchestrator() + self.orchestrator.manager = self.manager # Inject Test Manager + + # Timer drastisch verkürzen für Tests + self.orchestrator_timeout_patch = patch('trading_twins.orchestrator.TIMEOUT_SECONDS', 2) + self.orchestrator_timeout_patch.start() + + def tearDown(self): + self.orchestrator_timeout_patch.stop() + + @patch('trading_twins.orchestrator.send_email_via_graph') + @patch('trading_twins.orchestrator.send_approval_card') + def test_1_happy_path_timeout(self, mock_teams, mock_email): + """Testet den automatischen Versand nach Timeout.""" + print("\n--- TEST 1: Happy Path (Timeout -> Auto-Send) ---") + mock_teams.return_value = True + mock_email.return_value = True + + # Lead verarbeiten + job_uuid = self.orchestrator.process_lead("test1@example.com", "Kunde Eins", "Firma A") + + print(f"Job {job_uuid} gestartet. Warte auf Timeout (2s).") + time.sleep(3) # Warte länger als Timeout + + # Prüfungen + mock_teams.assert_called_once() + mock_email.assert_called_once() # E-Mail muss versendet worden sein + + status = self.manager.get_job_status(job_uuid) + self.assertEqual(status, 'sent') + print("✅ E-Mail wurde automatisch versendet.") + + @patch('trading_twins.orchestrator.send_email_via_graph') + @patch('trading_twins.orchestrator.send_approval_card') + def test_2_manual_cancel(self, mock_teams, mock_email): + """Testet den manuellen Abbruch.""" + print("\n--- TEST 2: Manueller Abbruch (STOP) ---") + mock_teams.return_value = True + + # Lead verarbeiten + job_uuid = self.orchestrator.process_lead("test2@example.com", "Kunde Zwei", "Firma B") + + # Simuliere Klick auf "STOP" (direkter DB-Update wie API es tun würde) + print("Simuliere Klick auf 'STOP'...") + self.manager.update_job_status(job_uuid, 'cancelled') + + print("Warte auf Timeout (2s).") + time.sleep(3) + + # Prüfungen + mock_teams.assert_called_once() + mock_email.assert_not_called() # E-Mail darf NICHT versendet werden + + status = self.manager.get_job_status(job_uuid) + self.assertEqual(status, 'cancelled') + print("✅ Abbruch erfolgreich, keine E-Mail gesendet.") + + @patch('trading_twins.manager.TradingTwinsManager._mock_calendar_availability') + def test_3_overbooking_factor_3(self, mock_calendar): + """ + Testet die Faktor-3 Logik. + Wir stellen 4 Slots zur Verfügung. + Wir erzeugen 4 Leads. + Die ersten 3 sollten Slot A bekommen. + Der 4. Lead sollte Slot A NICHT mehr bekommen, sondern Slot B (oder andere). + """ + print("\n--- TEST 3: Überbuchungs-Logik (Faktor 3) ---") + + # Setup Mock Calendar: Gibt immer dieselben 4 Slots zurück + tomorrow = datetime.now().date() + timedelta(days=1) + slot_a = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=10)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=10, minute=45))} + slot_b = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=11)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=11, minute=45))} + slot_c = {'start': datetime.combine(tomorrow, datetime.min.time().replace(hour=14)), 'end': datetime.combine(tomorrow, datetime.min.time().replace(hour=14, minute=45))} + + # Mock gibt diese Liste zurück + mock_calendar.return_value = [slot_a, slot_b, slot_c] + + # Wir feuern 4 Leads ab + uuids = [] + for i in range(1, 5): + uuid, slots = self.manager.create_proposal_job(f"bulk{i}@test.com", f"Bulk {i}", "Bulk Corp") + uuids.append((uuid, slots)) + # print(f"Lead {i} bekam Slots: {[s['start'].strftime('%H:%M') for s in slots]}") + + # Analyse + # Lead 1: Bekommt A, B (A hat Count 0 -> 1) + # Lead 2: Bekommt A, B (A hat Count 1 -> 2) + # Lead 3: Bekommt A, B (A hat Count 2 -> 3 -> VOLL) + # Lead 4: Sollte A NICHT bekommen, sondern B, C + + slots_lead_1 = uuids[0][1] + slots_lead_4 = uuids[3][1] + + start_time_lead_1_first_slot = slots_lead_1[0]['start'] + start_time_lead_4_first_slot = slots_lead_4[0]['start'] + + print(f"Lead 1 Slot 1: {start_time_lead_1_first_slot}") + print(f"Lead 4 Slot 1: {start_time_lead_4_first_slot}") + + if start_time_lead_1_first_slot != start_time_lead_4_first_slot: + print("✅ Faktor-3 Logik greift: Lead 4 hat einen anderen Start-Slot bekommen!") + else: + print("❌ Faktor-3 Logik fehlgeschlagen: Lead 4 hat denselben Slot bekommen.") + # Debug + session = self.manager.Session() + count = session.query(ProposedSlot).filter(ProposedSlot.start_time == start_time_lead_1_first_slot).count() + print(f"Total entries for Slot A: {count}") + session.close() + +if __name__ == '__main__': + unittest.main()