Files
Brancheneinstufung2/lead-engine/trading_twins/manager.py
Floke a9b7dbaaca [31988f42] Lead-Engine: Produktivsetzung und Anfrage per Teams
Implementiert:
*   **End-to-End Test-Button pro Lead:** Ein neuer Button "🧪 Test-Versand (an floke.com@gmail.com)" wurde in der Lead-Detailansicht hinzugefügt, um spezifische Leads sicher zu testen.
*   **Verbesserte E-Mail-Generierung:**
    *   Der LLM-Prompt wurde optimiert, um redundante Termin-Vorschläge und Betreffzeilen im generierten E-Mail-Text zu vermeiden.
    *   Der E-Mail-Body wurde umstrukturiert für eine klarere und leserlichere Integration des LLM-generierten Textes und der dynamischen Terminvorschläge.
*   **HTML-Signatur mit Inline-Bildern:**
    *   Ein Skript zum Extrahieren von HTML-Signaturen und eingebetteten Bildern aus -Dateien wurde erstellt und ausgeführt.
    *   Die -Funktion wurde überarbeitet, um die neue HTML-Signatur und alle zugehörigen Bilder dynamisch als Inline-Anhänge zu versenden.
*   **Bugfixes und verbesserte Diagnosefähigkeit:**
    *   Der  für  wurde durch Verschieben der Funktion in den globalen Bereich behoben.
    *   Die  im Kalender-Abruf wurde durch die explizite Übergabe der Zeitzoneninformation an die Graph API korrigiert.
    *   Fehlende Uhrzeit in Teams-Nachrichten behoben.
    *   Umfassendes Logging wurde in kritischen Funktionen (, , ) implementiert, um die Diagnosefähigkeit bei zukünftigen Problemen zu verbessern.
2026-03-09 08:21:33 +00:00

336 lines
15 KiB
Python

# lead-engine/trading_twins/manager.py
from email.mime.text import MIMEText
import base64
import requests
import json
import os
import time
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from threading import Thread, Lock
import uvicorn
import logging
from fastapi import FastAPI, Response, BackgroundTasks
from pydantic import BaseModel
from sqlalchemy.orm import sessionmaker
# --- Setup Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
import msal
from .models import init_db, ProposalJob, ProposedSlot
class TestLeadPayload(BaseModel):
company_name: str
contact_name: str
opener: str
def format_date_for_email(dt: datetime) -> str:
"""Formats a datetime object to 'Heute HH:MM', 'Morgen HH:MM', or 'DD.MM. HH:MM'."""
now = datetime.now(TZ_BERLIN).date()
if dt.date() == now:
return dt.strftime("Heute %H:%M Uhr")
elif dt.date() == (now + timedelta(days=1)):
return dt.strftime("Morgen %H:%M Uhr")
else:
return dt.strftime("%d.%m. %H:%M Uhr")
# --- Setup ---
TZ_BERLIN = ZoneInfo("Europe/Berlin")
DB_FILE_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "trading_twins.db")
if not os.path.exists(os.path.dirname(DB_FILE_PATH)):
os.makedirs(os.path.dirname(DB_FILE_PATH))
SessionLocal = init_db(f"sqlite:///{DB_FILE_PATH}")
# --- Config ---
TEAMS_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL", "")
FEEDBACK_SERVER_BASE_URL = os.getenv("FEEDBACK_SERVER_BASE_URL", "http://localhost:8004")
DEFAULT_WAIT_MINUTES = 5
SENDER_EMAIL = os.getenv("SENDER_EMAIL", "info@robo-planet.de")
TEST_RECEIVER_EMAIL = "floke.com@gmail.com"
SIGNATURE_FILE_PATH = os.path.join(os.path.dirname(__file__), "signature.html")
BANNER_FILE_PATH = os.path.join(os.path.dirname(__file__), "RoboPlanetBannerWebinarEinladung.png")
# Credentials
AZURE_CLIENT_ID = os.getenv("INFO_Application_ID")
AZURE_CLIENT_SECRET = os.getenv("INFO_Secret")
AZURE_TENANT_ID = os.getenv("INFO_Tenant_ID")
CAL_APPID = os.getenv("CAL_APPID")
CAL_SECRET = os.getenv("CAL_SECRET")
CAL_TENNANT_ID = os.getenv("CAL_TENNANT_ID")
GRAPH_API_ENDPOINT = "https://graph.microsoft.com/v1.0"
# --- Auth & Calendar Logic (unchanged, proven) ---
def get_access_token(client_id, client_secret, tenant_id):
if not all([client_id, client_secret, tenant_id]): return None
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority, client_credential=client_secret)
result = app.acquire_token_silent([".default"], account=None) or app.acquire_token_for_client(scopes=[".default"])
return result.get('access_token')
def get_availability(target_email, app_creds):
logging.info(f"Requesting availability for {target_email}")
token = get_access_token(*app_creds)
if not token:
logging.error("Failed to acquire access token for calendar.")
return None
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Prefer": 'outlook.timezone="Europe/Berlin"'}
start_time = datetime.now(TZ_BERLIN).replace(hour=0, minute=0, second=0, microsecond=0)
end_time = start_time + timedelta(days=3)
# Use 15-minute intervals for finer granularity
payload = {"schedules": [target_email], "startTime": {"dateTime": start_time.isoformat(), "timeZone": str(TZ_BERLIN)}, "endTime": {"dateTime": end_time.isoformat(), "timeZone": str(TZ_BERLIN)}, "availabilityViewInterval": 15}
try:
url = f"{GRAPH_API_ENDPOINT}/users/{target_email}/calendar/getSchedule"
r = requests.post(url, headers=headers, json=payload)
logging.info(f"Graph API getSchedule status code: {r.status_code}")
if r.status_code == 200:
view = r.json()['value'][0].get('availabilityView', '')
logging.info(f"Availability View received (Length: {len(view)})")
return start_time, view, 15
else:
logging.error(f"Graph API Error Response: {r.text}")
except Exception as e:
logging.error(f"Exception during Graph API call: {e}")
return None
def find_slots(start, view, interval):
"""
Parses availability string: '0'=Free, '2'=Busy.
Returns 2 free slots (start times) within business hours (09:00 - 16:30),
excluding weekends (Sat/Sun), with approx. 3 hours distance between them.
"""
slots = []
first_slot = None
# Iterate through the view string
for i, status in enumerate(view):
if status == '0': # '0' means Free
slot_time = start + timedelta(minutes=i * interval)
# Constraints:
# 1. Mon-Fri only
# 2. Business hours (09:00 - 16:30)
# 3. Future only
if slot_time.weekday() < 5 and (9 <= slot_time.hour < 17) and slot_time > datetime.now(TZ_BERLIN):
# Max start time 16:30
if slot_time.hour == 16 and slot_time.minute > 30:
continue
if first_slot is None:
first_slot = slot_time
slots.append(first_slot)
else:
# Second slot should be at least 3 hours after the first
if slot_time >= first_slot + timedelta(hours=3):
slots.append(slot_time)
break
return slots
def create_calendar_invite(lead_email, company, start_time):
catchall = os.getenv("EMAIL_CATCHALL"); lead_email = catchall if catchall else lead_email
token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID)
if not token: return False
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
end_time = start_time + timedelta(minutes=15)
payload = {
"subject": f"Kennenlerngespräch RoboPlanet <> {company}",
"body": {"contentType": "HTML", "content": "Vielen Dank für die Terminbuchung."},
"start": {"dateTime": start_time.isoformat(), "timeZone": "Europe/Berlin"},
"end": {"dateTime": end_time.isoformat(), "timeZone": "Europe/Berlin"},
"attendees": [{"emailAddress": {"address": lead_email}}, {"emailAddress": {"address": "e.melcer@robo-planet.de"}}],
"isOnlineMeeting": True, "onlineMeetingProvider": "teamsForBusiness"
}
r = requests.post(f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/calendar/events", headers=headers, json=payload)
return r.status_code in [200, 201]
# --- FastAPI Server ---
app = FastAPI()
@app.get("/test_lead", status_code=202)
def trigger_test_lead(background_tasks: BackgroundTasks):
req_id = f"test_{int(time.time())}"
background_tasks.add_task(process_lead, req_id, "Testfirma GmbH", "Wir haben Ihre Anfrage erhalten.", TEST_RECEIVER_EMAIL, "Max Mustermann")
return {"status": "Test lead triggered", "id": req_id}
@app.post("/test_specific_lead", status_code=202)
def trigger_specific_test_lead(payload: TestLeadPayload, background_tasks: BackgroundTasks):
"""Triggers a lead process with specific data but sends email to the TEST_RECEIVER_EMAIL."""
req_id = f"test_specific_{int(time.time())}"
# Key difference: Use data from payload, but force the receiver email
background_tasks.add_task(
process_lead,
request_id=req_id,
company=payload.company_name,
opener=payload.opener,
receiver=TEST_RECEIVER_EMAIL, # <--- FORCED TEST EMAIL
name=payload.contact_name
)
return {"status": "Specific test lead triggered", "id": req_id}
@app.get("/stop/{job_uuid}")
def stop(job_uuid: str):
db = SessionLocal(); job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first()
if job: job.status = "cancelled"; db.commit(); db.close(); return Response("Gestoppt.")
db.close(); return Response("Not Found", 404)
@app.get("/send_now/{job_uuid}")
def send_now(job_uuid: str):
db = SessionLocal(); job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first()
if job: job.status = "send_now"; db.commit(); db.close(); return Response("Wird gesendet.")
db.close(); return Response("Not Found", 404)
@app.get("/book_slot/{job_uuid}/{ts}")
def book_slot(job_uuid: str, ts: int):
slot_time = datetime.fromtimestamp(ts, tz=TZ_BERLIN)
db = SessionLocal(); job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first()
if not job or job.status == "booked": db.close(); return Response("Fehler.", 400)
if create_calendar_invite(job.customer_email, job.customer_company, slot_time):
job.status = "booked"; db.commit(); db.close(); return Response(f"Gebucht!")
db.close(); return Response("Fehler bei Kalender.", 500)
# --- Workflow Logic ---
def send_email(subject, body, to_email):
"""
Sends an email using Microsoft Graph API, attaching a dynamically generated
HTML signature with multiple inline images.
"""
logging.info(f"Preparing to send email to {to_email} with subject: '{subject}'")
# 1. Read the signature file
try:
with open(SIGNATURE_FILE_PATH, 'r', encoding='utf-8') as f:
signature_html = f.read()
except Exception as e:
logging.error(f"Could not read signature file: {e}")
signature_html = "" # Fallback to no signature
# 2. Find and prepare all signature images as attachments
attachments = []
image_dir = os.path.dirname(SIGNATURE_FILE_PATH)
image_files = [f for f in os.listdir(image_dir) if f.startswith('image') and f.endswith('.png')]
for filename in image_files:
try:
with open(os.path.join(image_dir, filename), "rb") as f:
content_bytes = f.read()
content_b64 = base64.b64encode(content_bytes).decode("utf-8")
attachments.append({
"@odata.type": "#microsoft.graph.fileAttachment",
"name": filename,
"contentBytes": content_b64,
"isInline": True,
"contentId": filename
})
except Exception as e:
logging.error(f"Could not process image {filename}: {e}")
# 3. Get access token
catchall = os.getenv("EMAIL_CATCHALL"); to_email = catchall if catchall else to_email
token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID)
if not token:
logging.error("Failed to get access token for sending email.")
return
# 4. Construct and send the email
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
full_body = body + signature_html
payload = {
"message": {
"subject": subject,
"body": {"contentType": "HTML", "content": full_body},
"toRecipients": [{"emailAddress": {"address": to_email}}]
},
"saveToSentItems": "true"
}
if attachments:
payload["message"]["attachments"] = attachments
response = requests.post(f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/sendMail", headers=headers, json=payload)
logging.info(f"Send mail API response status: {response.status_code}")
if response.status_code not in [200, 202]:
logging.error(f"Error sending mail: {response.text}")
def process_lead(request_id, company, opener, receiver, name):
logging.info(f"--- Starting process_lead for request_id: {request_id} ---")
db = SessionLocal()
try:
job = ProposalJob(job_uuid=request_id, customer_email=receiver, customer_company=company, customer_name=name, status="pending")
db.add(job)
db.commit()
logging.info(f"Job {request_id} created and saved to DB.")
cal_data = get_availability("e.melcer@robo-planet.de", (CAL_APPID, CAL_SECRET, CAL_TENNANT_ID))
suggestions = find_slots(*cal_data) if cal_data else []
if not suggestions:
logging.warning(f"No slots found via API for job {request_id}. Creating fallback slots.")
now = datetime.now(TZ_BERLIN)
tomorrow = (now + timedelta(days=1)).replace(hour=10, minute=0, second=0, microsecond=0)
overmorrow = (now + timedelta(days=2)).replace(hour=14, minute=0, second=0, microsecond=0)
suggestions = [tomorrow, overmorrow]
logging.info(f"Found/created {len(suggestions)} slot suggestions for job {request_id}.")
for s in suggestions:
db.add(ProposedSlot(job_id=job.id, start_time=s, end_time=s + timedelta(minutes=15)))
db.commit()
send_time = datetime.now(TZ_BERLIN) + timedelta(minutes=DEFAULT_WAIT_MINUTES)
logging.info(f"Sending Teams approval card for job {request_id}.")
from .teams_notification import send_approval_card
send_approval_card(job_uuid=request_id, customer_name=company, time_string=send_time.strftime("%H:%M"), webhook_url=TEAMS_WEBHOOK_URL, api_base_url=FEEDBACK_SERVER_BASE_URL)
logging.info(f"Waiting for response or timeout until {send_time.strftime('%H:%M:%S')} for job {request_id}")
wait_until = datetime.now(TZ_BERLIN) + timedelta(minutes=DEFAULT_WAIT_MINUTES)
while datetime.now(TZ_BERLIN) < wait_until:
db.refresh(job)
if job.status in ["cancelled", "send_now"]:
logging.info(f"Status for job {request_id} changed to '{job.status}'. Exiting wait loop.")
break
time.sleep(5)
db.refresh(job)
if job.status == "cancelled":
logging.info(f"Job {request_id} was cancelled. No email will be sent.")
return
logging.info(f"Timeout reached or 'Send Now' clicked for job {request_id}. Proceeding to send email.")
booking_html = "<ul>"
for s in suggestions:
booking_html += f'<li><a href="{FEEDBACK_SERVER_BASE_URL}/book_slot/{request_id}/{int(s.timestamp())}">{format_date_for_email(s)}</a></li>'
booking_html += "</ul>"
try:
with open(SIGNATURE_FILE_PATH, 'r') as f:
sig = f.read()
except:
sig = ""
email_body = f"""
<p>Hallo {name},</p>
{opener}
<p>Ich freue mich auf den Austausch und schlage Ihnen hierfür konkrete Termine vor:</p>
<ul>
{booking_html}
</ul>
<p>Mit freundlichen Grüßen,</p>
"""
send_email(f"Ihr Kontakt mit RoboPlanet - {company}", email_body, receiver)
job.status = "sent"
db.commit()
logging.info(f"--- Finished process_lead for request_id: {request_id} ---")
except Exception as e:
logging.error(f"FATAL error in process_lead for request_id {request_id}: {e}", exc_info=True)
finally:
db.close()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8004)