Files
Brancheneinstufung2/lead-engine/trading_twins/manager.py
Floke 6d4a0564e6 feat(lead-engine): Implement Teams notification and email enhancements [31988f42]
- Enhanced Teams Adaptive Card with precise email send time and re-added emojis to action buttons (" JETZT Aussenden", " STOP Aussendung").
- Modified email sending logic to include HTML signature from `signature.html` and an inline banner image from `RoboPlanetBannerWebinarEinladung.png`.
- Documented future enhancements in `lead-engine/README.md`:
    - Race-condition protection for calendar bookings with a live calendar check.
    - Integration of booking confirmation pages into the WordPress website (iFrame first, then API integration).
2026-03-08 20:01:20 +00:00

237 lines
11 KiB
Python

# lead-engine/trading_twins/manager.py
from email.mime.text import MIMEText
import base64
import requests
import json
import os
import time
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from threading import Thread, Lock
import uvicorn
from fastapi import FastAPI, Response, BackgroundTasks
from sqlalchemy.orm import sessionmaker
import msal
from .models import init_db, ProposalJob, ProposedSlot
# --- Setup ---
TZ_BERLIN = ZoneInfo("Europe/Berlin")
DB_FILE_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "trading_twins.db")
if not os.path.exists(os.path.dirname(DB_FILE_PATH)):
os.makedirs(os.path.dirname(DB_FILE_PATH))
SessionLocal = init_db(f"sqlite:///{DB_FILE_PATH}")
# --- Config ---
TEAMS_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL", "")
FEEDBACK_SERVER_BASE_URL = os.getenv("FEEDBACK_SERVER_BASE_URL", "http://localhost:8004")
DEFAULT_WAIT_MINUTES = 5
SENDER_EMAIL = os.getenv("SENDER_EMAIL", "info@robo-planet.de")
TEST_RECEIVER_EMAIL = "floke.com@gmail.com"
SIGNATURE_FILE_PATH = os.path.join(os.path.dirname(__file__), "signature.html")
BANNER_FILE_PATH = os.path.join(os.path.dirname(__file__), "RoboPlanetBannerWebinarEinladung.png")
# Credentials
AZURE_CLIENT_ID = os.getenv("INFO_Application_ID")
AZURE_CLIENT_SECRET = os.getenv("INFO_Secret")
AZURE_TENANT_ID = os.getenv("INFO_Tenant_ID")
CAL_APPID = os.getenv("CAL_APPID")
CAL_SECRET = os.getenv("CAL_SECRET")
CAL_TENNANT_ID = os.getenv("CAL_TENNANT_ID")
GRAPH_API_ENDPOINT = "https://graph.microsoft.com/v1.0"
# --- Auth & Calendar Logic (unchanged, proven) ---
def get_access_token(client_id, client_secret, tenant_id):
if not all([client_id, client_secret, tenant_id]): return None
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority, client_credential=client_secret)
result = app.acquire_token_silent([".default"], account=None) or app.acquire_token_for_client(scopes=[".default"])
return result.get('access_token')
def get_availability(target_email, app_creds):
print(f"DEBUG: Requesting availability for {target_email}")
token = get_access_token(*app_creds)
if not token:
print("DEBUG: Failed to acquire access token.")
return None
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Prefer": 'outlook.timezone="Europe/Berlin"'}
start_time = datetime.now(TZ_BERLIN).replace(hour=0, minute=0, second=0, microsecond=0)
end_time = start_time + timedelta(days=3)
# Use 15-minute intervals for finer granularity
payload = {"schedules": [target_email], "startTime": {"dateTime": start_time.isoformat()}, "endTime": {"dateTime": end_time.isoformat()}, "availabilityViewInterval": 15}
try:
url = f"{GRAPH_API_ENDPOINT}/users/{target_email}/calendar/getSchedule"
r = requests.post(url, headers=headers, json=payload)
print(f"DEBUG: API Status Code: {r.status_code}")
if r.status_code == 200:
view = r.json()['value'][0].get('availabilityView', '')
print(f"DEBUG: Availability View received (Length: {len(view)})")
return start_time, view, 15
else:
print(f"DEBUG: API Error Response: {r.text}")
except Exception as e:
print(f"DEBUG: Exception during API call: {e}")
pass
return None
def find_slots(start, view, interval):
"""
Parses availability string: '0'=Free, '2'=Busy.
Returns 2 free slots (start times) within business hours (09:00 - 16:30),
excluding weekends (Sat/Sun), with approx. 3 hours distance between them.
"""
slots = []
first_slot = None
# Iterate through the view string
for i, status in enumerate(view):
if status == '0': # '0' means Free
slot_time = start + timedelta(minutes=i * interval)
# Constraints:
# 1. Mon-Fri only
# 2. Business hours (09:00 - 16:30)
# 3. Future only
if slot_time.weekday() < 5 and (9 <= slot_time.hour < 17) and slot_time > datetime.now(TZ_BERLIN):
# Max start time 16:30
if slot_time.hour == 16 and slot_time.minute > 30:
continue
if first_slot is None:
first_slot = slot_time
slots.append(first_slot)
else:
# Second slot should be at least 3 hours after the first
if slot_time >= first_slot + timedelta(hours=3):
slots.append(slot_time)
break
return slots
def create_calendar_invite(lead_email, company, start_time):
catchall = os.getenv("EMAIL_CATCHALL"); lead_email = catchall if catchall else lead_email
token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID)
if not token: return False
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
end_time = start_time + timedelta(minutes=15)
payload = {
"subject": f"Kennenlerngespräch RoboPlanet <> {company}",
"body": {"contentType": "HTML", "content": "Vielen Dank für die Terminbuchung."},
"start": {"dateTime": start_time.isoformat(), "timeZone": "Europe/Berlin"},
"end": {"dateTime": end_time.isoformat(), "timeZone": "Europe/Berlin"},
"attendees": [{"emailAddress": {"address": lead_email}}, {"emailAddress": {"address": "e.melcer@robo-planet.de"}}],
"isOnlineMeeting": True, "onlineMeetingProvider": "teamsForBusiness"
}
r = requests.post(f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/calendar/events", headers=headers, json=payload)
return r.status_code in [200, 201]
# --- FastAPI Server ---
app = FastAPI()
@app.get("/test_lead", status_code=202)
def trigger_test_lead(background_tasks: BackgroundTasks):
req_id = f"test_{int(time.time())}"
background_tasks.add_task(process_lead, req_id, "Testfirma GmbH", "Wir haben Ihre Anfrage erhalten.", TEST_RECEIVER_EMAIL, "Max Mustermann")
return {"status": "Test lead triggered", "id": req_id}
@app.get("/stop/{job_uuid}")
def stop(job_uuid: str):
db = SessionLocal(); job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first()
if job: job.status = "cancelled"; db.commit(); db.close(); return Response("Gestoppt.")
db.close(); return Response("Not Found", 404)
@app.get("/send_now/{job_uuid}")
def send_now(job_uuid: str):
db = SessionLocal(); job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first()
if job: job.status = "send_now"; db.commit(); db.close(); return Response("Wird gesendet.")
db.close(); return Response("Not Found", 404)
@app.get("/book_slot/{job_uuid}/{ts}")
def book_slot(job_uuid: str, ts: int):
slot_time = datetime.fromtimestamp(ts, tz=TZ_BERLIN)
db = SessionLocal(); job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first()
if not job or job.status == "booked": db.close(); return Response("Fehler.", 400)
if create_calendar_invite(job.customer_email, job.customer_company, slot_time):
job.status = "booked"; db.commit(); db.close(); return Response(f"Gebucht!")
db.close(); return Response("Fehler bei Kalender.", 500)
# --- Workflow Logic ---
def send_email(subject, body, to_email, signature, banner_path=None):
attachments = []
if banner_path and os.path.exists(banner_path):
with open(banner_path, "rb") as f:
content_bytes = f.read()
content_b64 = base64.b64encode(content_bytes).decode("utf-8")
attachments.append({
"@odata.type": "#microsoft.graph.fileAttachment",
"name": "RoboPlanetBannerWebinarEinladung.png",
"contentBytes": content_b64,
"isInline": True,
"contentId": "banner_image"
})
catchall = os.getenv("EMAIL_CATCHALL"); to_email = catchall if catchall else to_email
token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID)
if not token: return
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {"message": {"subject": subject, "body": {"contentType": "HTML", "content": body + signature}, "toRecipients": [{"emailAddress": {"address": to_email}}]}, "saveToSentItems": "true"}
if attachments: payload["message"]["attachments"] = attachments
requests.post(f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/sendMail", headers=headers, json=payload)
def process_lead(request_id, company, opener, receiver, name):
db = SessionLocal()
job = ProposalJob(job_uuid=request_id, customer_email=receiver, customer_company=company, customer_name=name, status="pending")
db.add(job); db.commit()
cal_data = get_availability("e.melcer@robo-planet.de", (CAL_APPID, CAL_SECRET, CAL_TENNANT_ID))
suggestions = find_slots(*cal_data) if cal_data else []
# --- FALLBACK LOGIC ---
if not suggestions:
print("WARNING: No slots found via API. Creating fallback slots.")
now = datetime.now(TZ_BERLIN)
# Tomorrow 10:00
tomorrow = (now + timedelta(days=1)).replace(hour=10, minute=0, second=0, microsecond=0)
# Day after tomorrow 14:00
overmorrow = (now + timedelta(days=2)).replace(hour=14, minute=0, second=0, microsecond=0)
suggestions = [tomorrow, overmorrow]
# --------------------
for s in suggestions: db.add(ProposedSlot(job_id=job.id, start_time=s, end_time=s+timedelta(minutes=15)))
db.commit()
send_time = datetime.now(TZ_BERLIN) + timedelta(minutes=DEFAULT_WAIT_MINUTES)
# Using the more detailed card from teams_notification.py
from .teams_notification import send_approval_card
send_approval_card(job_uuid=request_id, customer_name=company, time_string=send_time.strftime("%H:%M"), webhook_url=TEAMS_WEBHOOK_URL, api_base_url=FEEDBACK_SERVER_BASE_URL)
send_time = datetime.now(TZ_BERLIN) + timedelta(minutes=DEFAULT_WAIT_MINUTES)
while datetime.now(TZ_BERLIN) < send_time:
db.refresh(job)
if job.status in ["cancelled", "send_now"]: break
time.sleep(5)
if job.status == "cancelled": db.close(); return
booking_html = "<ul>"
for s in suggestions: booking_html += f'<li><a href="{FEEDBACK_SERVER_BASE_URL}/book_slot/{request_id}/{int(s.timestamp())}">{s.strftime("%d.%m %H:%M")}</a></li>'
booking_html += "</ul>"
try:
with open(SIGNATURE_FILE_PATH, 'r') as f: sig = f.read()
except: sig = ""
# THIS IS THE CORRECTED EMAIL BODY
email_body = f"""
<p>Hallo {name},</p>
<p>{opener}</p>
<p>Hätten Sie an einem dieser Termine Zeit für ein kurzes Gespräch?</p>
{booking_html}
"""
send_email(f"Ihr Kontakt mit RoboPlanet - {company}", email_body, receiver, sig, BANNER_FILE_PATH)
job.status = "sent"; db.commit(); db.close()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8004)