Files
Brancheneinstufung2/lead-engine/trading_twins/manager.py

474 lines
21 KiB
Python

# lead-engine/trading_twins/manager.py
from email.mime.text import MIMEText
import base64
import requests
import json
import os
import time
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from threading import Thread, Lock
import uvicorn
import logging
from fastapi import FastAPI, Response, BackgroundTasks
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from sqlalchemy.orm import sessionmaker
# --- Setup Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
import msal
from .models import init_db, ProposalJob, ProposedSlot
class TestLeadPayload(BaseModel):
company_name: str
contact_name: str
opener: str
def format_date_for_email(dt: datetime) -> str:
"""Formats a datetime object to 'Heute HH:MM', 'Morgen HH:MM', or 'DD.MM. HH:MM'."""
now = datetime.now(TZ_BERLIN).date()
if dt.date() == now:
return dt.strftime("Heute %H:%M Uhr")
elif dt.date() == (now + timedelta(days=1)):
return dt.strftime("Morgen %H:%M Uhr")
else:
return dt.strftime("%d.%m. %H:%M Uhr")
# --- Setup ---
TZ_BERLIN = ZoneInfo("Europe/Berlin")
DB_FILE_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "trading_twins.db")
if not os.path.exists(os.path.dirname(DB_FILE_PATH)):
os.makedirs(os.path.dirname(DB_FILE_PATH))
SessionLocal = init_db(f"sqlite:///{DB_FILE_PATH}")
# --- Config ---
TEAMS_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL", "")
FEEDBACK_SERVER_BASE_URL = os.getenv("FEEDBACK_SERVER_BASE_URL", "http://localhost:8004")
WORDPRESS_BOOKING_URL = os.getenv("WORDPRESS_BOOKING_URL", "")
MS_BOOKINGS_URL = os.getenv("MS_BOOKINGS_URL", "https://outlook.office365.com/owa/calendar/IHR_BOOKINGS_NAME@robo-planet.de/bookings/")
DEFAULT_WAIT_MINUTES = 5
SENDER_EMAIL = os.getenv("SENDER_EMAIL", "info@robo-planet.de")
TEST_RECEIVER_EMAIL = "floke.com@gmail.com"
SIGNATURE_FILE_PATH = os.path.join(os.path.dirname(__file__), "signature.html")
BANNER_FILE_PATH = os.path.join(os.path.dirname(__file__), "RoboPlanetBannerWebinarEinladung.png")
# Credentials
AZURE_CLIENT_ID = os.getenv("INFO_Application_ID")
AZURE_CLIENT_SECRET = os.getenv("INFO_Secret")
AZURE_TENANT_ID = os.getenv("INFO_Tenant_ID")
CAL_APPID = os.getenv("CAL_APPID")
CAL_SECRET = os.getenv("CAL_SECRET")
CAL_TENNANT_ID = os.getenv("CAL_TENNANT_ID")
GRAPH_API_ENDPOINT = "https://graph.microsoft.com/v1.0"
# --- Auth & Calendar Logic (unchanged, proven) ---
def get_access_token(client_id, client_secret, tenant_id):
if not all([client_id, client_secret, tenant_id]): return None
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority, client_credential=client_secret)
result = app.acquire_token_silent([".default"], account=None) or app.acquire_token_for_client(scopes=[".default"])
return result.get('access_token')
def get_availability(target_email, app_creds):
logging.info(f"Requesting availability for {target_email}")
token = get_access_token(*app_creds)
if not token:
logging.error("Failed to acquire access token for calendar.")
return None
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Prefer": 'outlook.timezone="Europe/Berlin"'}
start_time = datetime.now(TZ_BERLIN).replace(hour=0, minute=0, second=0, microsecond=0)
end_time = start_time + timedelta(days=3)
# Use 15-minute intervals for finer granularity
payload = {"schedules": [target_email], "startTime": {"dateTime": start_time.isoformat(), "timeZone": str(TZ_BERLIN)}, "endTime": {"dateTime": end_time.isoformat(), "timeZone": str(TZ_BERLIN)}, "availabilityViewInterval": 15}
try:
url = f"{GRAPH_API_ENDPOINT}/users/{target_email}/calendar/getSchedule"
r = requests.post(url, headers=headers, json=payload)
logging.info(f"Graph API getSchedule status code: {r.status_code}")
if r.status_code == 200:
view = r.json()['value'][0].get('availabilityView', '')
logging.info(f"Availability View received (Length: {len(view)})")
return start_time, view, 15
else:
logging.error(f"Graph API Error Response: {r.text}")
except Exception as e:
logging.error(f"Exception during Graph API call: {e}")
return None
def find_slots(start, view, interval):
"""
Parses availability string: '0'=Free, '2'=Busy.
Returns 2 free slots (start times) within business hours (09:00 - 16:30),
excluding weekends (Sat/Sun), lunch breaks, and holidays.
"""
# Hardcoded Bavarian Holidays for 2026 (Common in Munich)
HOLIDAYS_2026 = {
(1, 1), # Neujahr
(1, 6), # Heilige Drei Könige
(4, 3), # Karfreitag
(4, 6), # Ostermontag
(5, 1), # Maifeiertag
(5, 14), # Christi Himmelfahrt
(5, 25), # Pfingstmontag
(6, 4), # Fronleichnam
(8, 15), # Mariä Himmelfahrt
(10, 3), # Tag der Deutschen Einheit
(11, 1), # Allerheiligen
(12, 25), # 1. Weihnachtstag
(12, 26) # 2. Weihnachtstag
}
slots = []
first_slot = None
# Iterate through the view string
for i, status in enumerate(view):
if status == '0': # '0' means Free
slot_time = start + timedelta(minutes=i * interval)
# Constraints:
# 1. Mon-Fri only
# 2. Business hours (09:00 - 16:30)
# 3. Future only
# 4. No Holidays (Bavaria)
# 5. Lunch break (12:00 - 12:30)
is_holiday = (slot_time.month, slot_time.day) in HOLIDAYS_2026
is_weekend = slot_time.weekday() >= 5
is_lunch = 12 == slot_time.hour and slot_time.minute < 30
is_business_hours = 9 <= slot_time.hour < 17
if not is_weekend and not is_holiday and not is_lunch and is_business_hours and slot_time > datetime.now(TZ_BERLIN):
# Max start time 16:30
if slot_time.hour == 16 and slot_time.minute > 30:
continue
if first_slot is None:
first_slot = slot_time
slots.append(first_slot)
else:
# Second slot should be at least 3 hours after the first
if slot_time >= first_slot + timedelta(hours=3):
slots.append(slot_time)
break
return slots
def is_slot_free(target_email: str, app_creds: tuple, target_time: datetime) -> bool:
"""Checks via MS Graph API if a specific 15-minute slot is still free."""
logging.info(f"Live-Checking slot availability for {target_email} at {target_time}")
token = get_access_token(*app_creds)
if not token:
logging.warning("Failed to get token for live-check. Proceeding anyway.")
return True # Fallback: assume free to not block booking
end_time = target_time + timedelta(minutes=15)
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Prefer": 'outlook.timezone="Europe/Berlin"'}
payload = {
"schedules": [target_email],
"startTime": {"dateTime": target_time.isoformat(), "timeZone": "Europe/Berlin"},
"endTime": {"dateTime": end_time.isoformat(), "timeZone": "Europe/Berlin"},
"availabilityViewInterval": 15
}
try:
url = f"{GRAPH_API_ENDPOINT}/users/{target_email}/calendar/getSchedule"
r = requests.post(url, headers=headers, json=payload)
if r.status_code == 200:
view = r.json().get('value', [{}])[0].get('availabilityView', '')
if view:
logging.info(f"Live-Check view for {target_time}: '{view}'")
return view[0] == '0' # '0' means free
else:
logging.error(f"Live-check failed with status {r.status_code}: {r.text}")
except Exception as e:
logging.error(f"Live-check threw an exception: {e}")
return True # Fallback
def create_calendar_invite(lead_email, company, start_time):
catchall = os.getenv("EMAIL_CATCHALL"); lead_email = catchall if catchall else lead_email
token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID)
if not token: return False
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
end_time = start_time + timedelta(minutes=15)
payload = {
"subject": f"Kennenlerngespräch RoboPlanet <> {company}",
"body": {"contentType": "HTML", "content": "Vielen Dank für die Terminbuchung."},
"start": {"dateTime": start_time.isoformat(), "timeZone": "Europe/Berlin"},
"end": {"dateTime": end_time.isoformat(), "timeZone": "Europe/Berlin"},
"attendees": [{"emailAddress": {"address": lead_email}}, {"emailAddress": {"address": "e.melcer@robo-planet.de"}}],
"isOnlineMeeting": True, "onlineMeetingProvider": "teamsForBusiness"
}
r = requests.post(f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/calendar/events", headers=headers, json=payload)
return r.status_code in [200, 201]
# --- FastAPI Server ---
app = FastAPI()
@app.get("/test_lead", status_code=202)
def trigger_test_lead(background_tasks: BackgroundTasks):
req_id = f"test_{int(time.time())}"
background_tasks.add_task(process_lead, req_id, "Testfirma GmbH", "Wir haben Ihre Anfrage erhalten.", TEST_RECEIVER_EMAIL, "Max Mustermann")
return {"status": "Test lead triggered", "id": req_id}
@app.post("/test_specific_lead", status_code=202)
def trigger_specific_test_lead(payload: TestLeadPayload, background_tasks: BackgroundTasks):
"""Triggers a lead process with specific data but sends email to the TEST_RECEIVER_EMAIL."""
req_id = f"test_specific_{int(time.time())}"
# Key difference: Use data from payload, but force the receiver email
background_tasks.add_task(
process_lead,
request_id=req_id,
company=payload.company_name,
opener=payload.opener,
receiver=TEST_RECEIVER_EMAIL, # <--- FORCED TEST EMAIL
name=payload.contact_name
)
return {"status": "Specific test lead triggered", "id": req_id}
@app.get("/stop/{job_uuid}")
def stop(job_uuid: str):
db = SessionLocal(); job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first()
if job: job.status = "cancelled"; db.commit(); db.close(); return Response("Gestoppt.")
db.close(); return Response("Not Found", 404)
@app.get("/send_now/{job_uuid}")
def send_now(job_uuid: str):
db = SessionLocal(); job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first()
if job: job.status = "send_now"; db.commit(); db.close(); return Response("Wird gesendet.")
db.close(); return Response("Not Found", 404)
SUCCESS_HTML = """
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; text-align: center; color: #333; padding: 40px 20px; background: transparent; }}
.icon {{ font-size: 48px; margin-bottom: 20px; }}
h2 {{ margin-bottom: 10px; color: #10b981; }}
p {{ font-size: 16px; line-height: 1.5; color: #4b5563; }}
.details {{ font-weight: bold; margin-top: 15px; color: #111827; }}
</style>
</head>
<body>
<div class="icon">✅</div>
<h2>Termin erfolgreich gebucht!</h2>
<p>Vielen Dank für die Terminbuchung.</p>
<p class="details">Wir bestätigen Ihren Termin am {date} um {time} Uhr.</p>
<p>Sie erhalten in Kürze eine separate Kalendereinladung inkl. Microsoft Teams-Link an Ihre E-Mail-Adresse.</p>
</body>
</html>
"""
FALLBACK_HTML = """
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; text-align: center; color: #333; margin: 0; padding: 0; background: transparent; }}
.message-box {{ padding: 20px; background-color: #fef3c7; color: #92400e; border-bottom: 1px solid #fcd34d; margin-bottom: 20px; }}
h3 {{ margin: 0 0 10px 0; font-size: 18px; }}
p {{ margin: 0; font-size: 14px; }}
.iframe-container {{ width: 100%; height: 800px; border: none; }}
</style>
</head>
<body>
<div class="message-box">
<h3>⚠️ Termin leider nicht mehr verfügbar</h3>
<p>Der von Ihnen gewählte Termin wurde in der Zwischenzeit leider anderweitig vergeben.<br>
Bitte wählen Sie direkt hier einen neuen, passenden Termin aus unserem Kalender:</p>
</div>
<!-- BITTE DEN ECHTEN MS BOOKINGS LINK EINTRAGEN -->
<iframe class="iframe-container" src="{ms_bookings_url}" scrolling="yes"></iframe>
</body>
</html>
"""
@app.get("/book_slot/{job_uuid}/{ts}")
def book_slot(job_uuid: str, ts: int):
slot_time = datetime.fromtimestamp(ts, tz=TZ_BERLIN)
db = SessionLocal()
job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first()
if not job or job.status == "booked":
db.close()
# If job doesn't exist or is already booked, show the fallback calendar to allow a new booking safely
return HTMLResponse(content=FALLBACK_HTML.format(ms_bookings_url=MS_BOOKINGS_URL))
# LIVE CHECK: Is the slot still free in the calendar?
app_creds = (CAL_APPID, CAL_SECRET, CAL_TENNANT_ID)
if not is_slot_free("e.melcer@robo-planet.de", app_creds, slot_time):
logging.warning(f"RACE CONDITION PREVENTED: Slot {slot_time} for job {job_uuid} is taken!")
db.close()
return HTMLResponse(content=FALLBACK_HTML.format(ms_bookings_url=MS_BOOKINGS_URL))
if create_calendar_invite(job.customer_email, job.customer_company, slot_time):
job.status = "booked"
db.commit()
db.close()
date_str = slot_time.strftime('%d.%m.%Y')
time_str = slot_time.strftime('%H:%M')
html_content = SUCCESS_HTML.format(date=date_str, time=time_str)
return HTMLResponse(content=html_content)
db.close()
return HTMLResponse(content="<p>Es gab einen internen Fehler bei der Kalenderbuchung. Bitte versuchen Sie es später erneut.</p>", status_code=500)
# --- Workflow Logic ---
def send_email(subject, body, to_email):
"""
Sends an email using Microsoft Graph API, attaching a dynamically generated
HTML signature with multiple inline images.
"""
logging.info(f"Preparing to send email to {to_email} with subject: '{subject}'")
# 1. Read the signature file
try:
with open(SIGNATURE_FILE_PATH, 'r', encoding='utf-8') as f:
signature_html = f.read()
except Exception as e:
logging.error(f"Could not read signature file: {e}")
signature_html = "" # Fallback to no signature
# 2. Find and prepare all signature images as attachments
attachments = []
image_dir = os.path.dirname(SIGNATURE_FILE_PATH)
image_files = [f for f in os.listdir(image_dir) if f.startswith('image') and f.endswith('.png')]
for filename in image_files:
try:
with open(os.path.join(image_dir, filename), "rb") as f:
content_bytes = f.read()
content_b64 = base64.b64encode(content_bytes).decode("utf-8")
attachments.append({
"@odata.type": "#microsoft.graph.fileAttachment",
"name": filename,
"contentBytes": content_b64,
"isInline": True,
"contentId": filename
})
except Exception as e:
logging.error(f"Could not process image {filename}: {e}")
# 3. Get access token
catchall = os.getenv("EMAIL_CATCHALL"); to_email = catchall if catchall else to_email
token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID)
if not token:
logging.error("Failed to get access token for sending email.")
return
# 4. Construct and send the email
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
full_body = body + signature_html
payload = {
"message": {
"subject": subject,
"body": {"contentType": "HTML", "content": full_body},
"toRecipients": [{"emailAddress": {"address": to_email}}]
},
"saveToSentItems": "true"
}
if attachments:
payload["message"]["attachments"] = attachments
response = requests.post(f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/sendMail", headers=headers, json=payload)
logging.info(f"Send mail API response status: {response.status_code}")
if response.status_code not in [200, 202]:
logging.error(f"Error sending mail: {response.text}")
def process_lead(request_id, company, opener, receiver, name):
logging.info(f"--- Starting process_lead for request_id: {request_id} ---")
db = SessionLocal()
try:
job = ProposalJob(job_uuid=request_id, customer_email=receiver, customer_company=company, customer_name=name, status="pending")
db.add(job)
db.commit()
logging.info(f"Job {request_id} created and saved to DB.")
cal_data = get_availability("e.melcer@robo-planet.de", (CAL_APPID, CAL_SECRET, CAL_TENNANT_ID))
suggestions = find_slots(*cal_data) if cal_data else []
if not suggestions:
logging.warning(f"No slots found via API for job {request_id}. Creating fallback slots.")
now = datetime.now(TZ_BERLIN)
tomorrow = (now + timedelta(days=1)).replace(hour=10, minute=0, second=0, microsecond=0)
overmorrow = (now + timedelta(days=2)).replace(hour=14, minute=0, second=0, microsecond=0)
suggestions = [tomorrow, overmorrow]
logging.info(f"Found/created {len(suggestions)} slot suggestions for job {request_id}.")
for s in suggestions:
db.add(ProposedSlot(job_id=job.id, start_time=s, end_time=s + timedelta(minutes=15)))
db.commit()
send_time = datetime.now(TZ_BERLIN) + timedelta(minutes=DEFAULT_WAIT_MINUTES)
logging.info(f"Sending Teams approval card for job {request_id}.")
from .teams_notification import send_approval_card
send_approval_card(job_uuid=request_id, customer_name=company, time_string=send_time.strftime("%H:%M"), webhook_url=TEAMS_WEBHOOK_URL, api_base_url=FEEDBACK_SERVER_BASE_URL)
logging.info(f"Waiting for response or timeout until {send_time.strftime('%H:%M:%S')} for job {request_id}")
wait_until = datetime.now(TZ_BERLIN) + timedelta(minutes=DEFAULT_WAIT_MINUTES)
while datetime.now(TZ_BERLIN) < wait_until:
db.refresh(job)
if job.status in ["cancelled", "send_now"]:
logging.info(f"Status for job {request_id} changed to '{job.status}'. Exiting wait loop.")
break
time.sleep(5)
db.refresh(job)
if job.status == "cancelled":
logging.info(f"Job {request_id} was cancelled. No email will be sent.")
return
logging.info(f"Timeout reached or 'Send Now' clicked for job {request_id}. Proceeding to send email.")
booking_html = "<ul>"
for s in suggestions:
if WORDPRESS_BOOKING_URL:
link = f"{WORDPRESS_BOOKING_URL}?job_uuid={request_id}&ts={int(s.timestamp())}"
else:
link = f"{FEEDBACK_SERVER_BASE_URL}/book_slot/{request_id}/{int(s.timestamp())}"
booking_html += f'<li><a href="{link}">{format_date_for_email(s)}</a></li>'
booking_html += "</ul>"
fallback_url = WORDPRESS_BOOKING_URL if WORDPRESS_BOOKING_URL else MS_BOOKINGS_URL
booking_html += f'<br><p>Alternativ buchen Sie direkt einen passenden Termin in <a href="{fallback_url}">meinem Kalender</a>.</p>'
try:
with open(SIGNATURE_FILE_PATH, 'r') as f:
sig = f.read()
except:
sig = ""
# Format the opener text into proper HTML paragraphs
opener_html = "".join([f"<p>{line}</p>" for line in opener.split('\n') if line.strip()])
email_body = f"""
<p>Hallo {name},</p>
{opener_html}
<p>Ich freue mich auf den Austausch und schlage Ihnen hierfür konkrete Termine vor:</p>
<ul>
{booking_html}
</ul>
<p>Mit freundlichen Grüßen,</p>
"""
send_email(f"Ihr Kontakt mit RoboPlanet - {company}", email_body, receiver)
job.status = "sent"
db.commit()
logging.info(f"--- Finished process_lead for request_id: {request_id} ---")
except Exception as e:
logging.error(f"FATAL error in process_lead for request_id {request_id}: {e}", exc_info=True)
finally:
db.close()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8004)