# lead-engine/trading_twins/manager.py from email.mime.text import MIMEText import base64 import requests import json import os import time from datetime import datetime, timedelta from zoneinfo import ZoneInfo from threading import Thread, Lock import uvicorn import logging from fastapi import FastAPI, Request, Response, BackgroundTasks from fastapi.responses import HTMLResponse from pydantic import BaseModel from sqlalchemy.orm import sessionmaker # --- Setup Logging --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') import msal # Import centralized matching service import sys sys.path.append(os.path.join(os.path.dirname(__file__), '..')) try: from company_explorer_connector import match_company except ImportError: logging.warning("Could not import company_explorer_connector. Matching logic might be disabled.") def match_company(name, **kwargs): return {"match_found": False} # --- Database Setup (SQLite) --- class TestLeadPayload(BaseModel): company_name: str contact_name: str opener: str def format_date_for_email(dt: datetime) -> str: """Formats a datetime object to 'Heute HH:MM', 'Morgen HH:MM', or 'DD.MM. HH:MM'.""" now = datetime.now(TZ_BERLIN).date() if dt.date() == now: return dt.strftime("Heute %H:%M Uhr") elif dt.date() == (now + timedelta(days=1)): return dt.strftime("Morgen %H:%M Uhr") else: return dt.strftime("%d.%m. %H:%M Uhr") # --- Setup --- TZ_BERLIN = ZoneInfo("Europe/Berlin") DB_FILE_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "trading_twins.db") if not os.path.exists(os.path.dirname(DB_FILE_PATH)): os.makedirs(os.path.dirname(DB_FILE_PATH)) SessionLocal = init_db(f"sqlite:///{DB_FILE_PATH}") # --- Config --- TEAMS_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL", "") FEEDBACK_SERVER_BASE_URL = os.getenv("FEEDBACK_SERVER_BASE_URL", "http://localhost:8004") WORDPRESS_BOOKING_URL = os.getenv("WORDPRESS_BOOKING_URL", "") MS_BOOKINGS_URL = os.getenv("MS_BOOKINGS_URL", "https://outlook.office365.com/owa/calendar/IHR_BOOKINGS_NAME@robo-planet.de/bookings/") DEFAULT_WAIT_MINUTES = 5 SENDER_EMAIL = os.getenv("SENDER_EMAIL", "info@robo-planet.de") TEST_RECEIVER_EMAIL = "floke.com@gmail.com" SIGNATURE_FILE_PATH = os.path.join(os.path.dirname(__file__), "signature.html") BANNER_FILE_PATH = os.path.join(os.path.dirname(__file__), "RoboPlanetBannerWebinarEinladung.png") # Credentials AZURE_CLIENT_ID = os.getenv("INFO_Application_ID") AZURE_CLIENT_SECRET = os.getenv("INFO_Secret") AZURE_TENANT_ID = os.getenv("INFO_Tenant_ID") CAL_APPID = os.getenv("CAL_APPID") CAL_SECRET = os.getenv("CAL_SECRET") CAL_TENNANT_ID = os.getenv("CAL_TENNANT_ID") GRAPH_API_ENDPOINT = "https://graph.microsoft.com/v1.0" # --- Auth & Calendar Logic (unchanged, proven) --- def get_access_token(client_id, client_secret, tenant_id): if not all([client_id, client_secret, tenant_id]): return None authority = f"https://login.microsoftonline.com/{tenant_id}" app = msal.ConfidentialClientApplication(client_id=client_id, authority=authority, client_credential=client_secret) result = app.acquire_token_silent([".default"], account=None) or app.acquire_token_for_client(scopes=[".default"]) return result.get('access_token') def get_availability(target_email, app_creds): logging.info(f"Requesting availability for {target_email}") token = get_access_token(*app_creds) if not token: logging.error("Failed to acquire access token for calendar.") return None headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Prefer": 'outlook.timezone="Europe/Berlin"'} start_time = datetime.now(TZ_BERLIN).replace(hour=0, minute=0, second=0, microsecond=0) end_time = start_time + timedelta(days=3) # Use 15-minute intervals for finer granularity payload = {"schedules": [target_email], "startTime": {"dateTime": start_time.isoformat(), "timeZone": str(TZ_BERLIN)}, "endTime": {"dateTime": end_time.isoformat(), "timeZone": str(TZ_BERLIN)}, "availabilityViewInterval": 15} try: url = f"{GRAPH_API_ENDPOINT}/users/{target_email}/calendar/getSchedule" r = requests.post(url, headers=headers, json=payload) logging.info(f"Graph API getSchedule status code: {r.status_code}") if r.status_code == 200: view = r.json()['value'][0].get('availabilityView', '') logging.info(f"Availability View received (Length: {len(view)})") return start_time, view, 15 else: logging.error(f"Graph API Error Response: {r.text}") except Exception as e: logging.error(f"Exception during Graph API call: {e}") return None def find_slots(start, view, interval): """ Parses availability string: '0'=Free, '2'=Busy. Returns 2 free slots (start times) within business hours (09:00 - 16:30), excluding weekends (Sat/Sun), lunch breaks, and holidays. """ # Hardcoded Bavarian Holidays for 2026 (Common in Munich) HOLIDAYS_2026 = { (1, 1), # Neujahr (1, 6), # Heilige Drei Könige (4, 3), # Karfreitag (4, 6), # Ostermontag (5, 1), # Maifeiertag (5, 14), # Christi Himmelfahrt (5, 25), # Pfingstmontag (6, 4), # Fronleichnam (8, 15), # Mariä Himmelfahrt (10, 3), # Tag der Deutschen Einheit (11, 1), # Allerheiligen (12, 25), # 1. Weihnachtstag (12, 26) # 2. Weihnachtstag } slots = [] first_slot = None # Iterate through the view string for i, status in enumerate(view): if status == '0': # '0' means Free slot_time = start + timedelta(minutes=i * interval) # Constraints: # 1. Mon-Fri only # 2. Business hours (09:00 - 16:30) # 3. Future only # 4. No Holidays (Bavaria) # 5. Lunch break (12:00 - 12:30) is_holiday = (slot_time.month, slot_time.day) in HOLIDAYS_2026 is_weekend = slot_time.weekday() >= 5 is_lunch = 12 == slot_time.hour and slot_time.minute < 30 is_business_hours = 9 <= slot_time.hour < 17 if not is_weekend and not is_holiday and not is_lunch and is_business_hours and slot_time > datetime.now(TZ_BERLIN): # Max start time 16:30 if slot_time.hour == 16 and slot_time.minute > 30: continue if first_slot is None: first_slot = slot_time slots.append(first_slot) else: # Second slot should be at least 3 hours after the first if slot_time >= first_slot + timedelta(hours=3): slots.append(slot_time) break return slots def is_slot_free(target_email: str, app_creds: tuple, target_time: datetime) -> bool: """Checks via MS Graph API if a specific 15-minute slot is still free.""" logging.info(f"Live-Checking slot availability for {target_email} at {target_time}") token = get_access_token(*app_creds) if not token: logging.warning("Failed to get token for live-check. Proceeding anyway.") return True # Fallback: assume free to not block booking end_time = target_time + timedelta(minutes=15) headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "Prefer": 'outlook.timezone="Europe/Berlin"'} payload = { "schedules": [target_email], "startTime": {"dateTime": target_time.isoformat(), "timeZone": "Europe/Berlin"}, "endTime": {"dateTime": end_time.isoformat(), "timeZone": "Europe/Berlin"}, "availabilityViewInterval": 15 } try: url = f"{GRAPH_API_ENDPOINT}/users/{target_email}/calendar/getSchedule" r = requests.post(url, headers=headers, json=payload) if r.status_code == 200: view = r.json().get('value', [{}])[0].get('availabilityView', '') if view: logging.info(f"Live-Check view for {target_time}: '{view}'") return view[0] == '0' # '0' means free else: logging.error(f"Live-check failed with status {r.status_code}: {r.text}") except Exception as e: logging.error(f"Live-check threw an exception: {e}") return True # Fallback def create_calendar_invite(lead_email, company, start_time): catchall = os.getenv("EMAIL_CATCHALL"); lead_email = catchall if catchall else lead_email token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID) if not token: return False headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} end_time = start_time + timedelta(minutes=15) payload = { "subject": f"Kennenlerngespräch RoboPlanet <> {company}", "body": {"contentType": "HTML", "content": "Vielen Dank für die Terminbuchung."}, "start": {"dateTime": start_time.isoformat(), "timeZone": "Europe/Berlin"}, "end": {"dateTime": end_time.isoformat(), "timeZone": "Europe/Berlin"}, "attendees": [{"emailAddress": {"address": lead_email}}, {"emailAddress": {"address": "e.melcer@robo-planet.de"}}], "isOnlineMeeting": True, "onlineMeetingProvider": "teamsForBusiness" } r = requests.post(f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/calendar/events", headers=headers, json=payload) return r.status_code in [200, 201] # --- FastAPI Server --- app = FastAPI() # --- Webhook Endpoints for Smartlead --- SMARTLEAD_LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "Log") SMARTLEAD_LOG_FILE = os.path.join(SMARTLEAD_LOG_DIR, "smartlead_webhooks.log") # Ensure log directory exists os.makedirs(SMARTLEAD_LOG_DIR, exist_ok=True) async def log_webhook_data(webhook_type: str, request: Request): """Helper function to log incoming webhook data.""" try: data = await request.json() timestamp = datetime.now(TZ_BERLIN).isoformat() log_entry = { "timestamp": timestamp, "webhook_type": webhook_type, "source_ip": request.client.host, "payload": data } with open(SMARTLEAD_LOG_FILE, "a", encoding="utf-8") as f: f.write(json.dumps(log_entry) + "\n") logging.info(f"Successfully processed and logged '{webhook_type}' webhook.") return {"status": "success", "message": f"{webhook_type} webhook received"} except json.JSONDecodeError: logging.error("Webhook received with invalid JSON format.") return Response(content='{"status": "error", "message": "Invalid JSON format"}', status_code=400, media_type="application/json") except Exception as e: logging.error(f"Error processing '{webhook_type}' webhook: {e}") return Response(content='{"status": "error", "message": "Internal server error"}', status_code=500, media_type="application/json") @app.post("/webhook/hot-lead", status_code=202) async def webhook_hot_lead(request: Request): """Webhook endpoint for 'hot leads' from Smartlead.""" return await log_webhook_data("hot-lead", request) @app.post("/webhook/follow-up-lead", status_code=202) async def webhook_follow_up_lead(request: Request): """Webhook endpoint for 'follow-up leads' from Smartlead.""" return await log_webhook_data("follow-up-lead", request) # --- END Webhook Endpoints --- @app.get("/test_lead", status_code=202) def trigger_test_lead(background_tasks: BackgroundTasks): req_id = f"test_{int(time.time())}" background_tasks.add_task(process_lead, req_id, "Testfirma GmbH", "Wir haben Ihre Anfrage erhalten.", TEST_RECEIVER_EMAIL, "Max Mustermann") return {"status": "Test lead triggered", "id": req_id} @app.post("/test_specific_lead", status_code=202) def trigger_specific_test_lead(payload: TestLeadPayload, background_tasks: BackgroundTasks): """Triggers a lead process with specific data but sends email to the TEST_RECEIVER_EMAIL.""" req_id = f"test_specific_{int(time.time())}" # Key difference: Use data from payload, but force the receiver email background_tasks.add_task( process_lead, request_id=req_id, company=payload.company_name, opener=payload.opener, receiver=TEST_RECEIVER_EMAIL, # <--- FORCED TEST EMAIL name=payload.contact_name ) return {"status": "Specific test lead triggered", "id": req_id} @app.get("/stop/{job_uuid}") def stop(job_uuid: str): db = SessionLocal(); job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first() if job: job.status = "cancelled"; db.commit(); db.close(); return Response("Gestoppt.") db.close(); return Response("Not Found", 404) @app.get("/send_now/{job_uuid}") def send_now(job_uuid: str): db = SessionLocal(); job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first() if job: job.status = "send_now"; db.commit(); db.close(); return Response("Wird gesendet.") db.close(); return Response("Not Found", 404) SUCCESS_HTML = """

Termin erfolgreich gebucht!

Vielen Dank für die Terminbuchung.

Wir bestätigen Ihren Termin am {date} um {time} Uhr.

Sie erhalten in Kürze eine separate Kalendereinladung inkl. Microsoft Teams-Link an Ihre E-Mail-Adresse.

""" FALLBACK_HTML = """

⚠️ Termin leider nicht mehr verfügbar

Der von Ihnen gewählte Termin wurde in der Zwischenzeit leider anderweitig vergeben.
Bitte wählen Sie direkt hier einen neuen, passenden Termin aus unserem Kalender:

""" @app.get("/book_slot/{job_uuid}/{ts}") def book_slot(job_uuid: str, ts: int): slot_time = datetime.fromtimestamp(ts, tz=TZ_BERLIN) db = SessionLocal() job = db.query(ProposalJob).filter(ProposalJob.job_uuid == job_uuid).first() if not job or job.status == "booked": db.close() # If job doesn't exist or is already booked, show the fallback calendar to allow a new booking safely return HTMLResponse(content=FALLBACK_HTML.format(ms_bookings_url=MS_BOOKINGS_URL)) # LIVE CHECK: Is the slot still free in the calendar? app_creds = (CAL_APPID, CAL_SECRET, CAL_TENNANT_ID) if not is_slot_free("e.melcer@robo-planet.de", app_creds, slot_time): logging.warning(f"RACE CONDITION PREVENTED: Slot {slot_time} for job {job_uuid} is taken!") db.close() return HTMLResponse(content=FALLBACK_HTML.format(ms_bookings_url=MS_BOOKINGS_URL)) if create_calendar_invite(job.customer_email, job.customer_company, slot_time): # NEW: Account Matching in SuperOffice (via Company Explorer) try: logging.info(f"MATCHING ACCOUNT: Checking for '{job.customer_company}' before Sale creation...") match_res = match_company(job.customer_company) if match_res.get("match_found"): best = match_res["best_match"] logging.info(f"✅ MATCH SUCCESS: Found CRM ID {best['crm_id']} for '{best['name']}' (Score: {best['score']})") # This CRM ID can now be used for the subsequent Sale creation in the next task. else: logging.warning(f"⚠️ NO MATCH FOUND in SuperOffice for '{job.customer_company}'. A new account will be needed.") except Exception as e: logging.error(f"❌ ERROR during SuperOffice Matching check: {e}") job.status = "booked" db.commit() db.close() date_str = slot_time.strftime('%d.%m.%Y') time_str = slot_time.strftime('%H:%M') html_content = SUCCESS_HTML.format(date=date_str, time=time_str) return HTMLResponse(content=html_content) db.close() return HTMLResponse(content="

Es gab einen internen Fehler bei der Kalenderbuchung. Bitte versuchen Sie es später erneut.

", status_code=500) # --- Workflow Logic --- def send_email(subject, body, to_email): """ Sends an email using Microsoft Graph API, attaching a dynamically generated HTML signature with multiple inline images. """ logging.info(f"Preparing to send email to {to_email} with subject: '{subject}'") # 1. Read the signature file try: with open(SIGNATURE_FILE_PATH, 'r', encoding='utf-8') as f: signature_html = f.read() except Exception as e: logging.error(f"Could not read signature file: {e}") signature_html = "" # Fallback to no signature # 2. Find and prepare all signature images as attachments attachments = [] image_dir = os.path.dirname(SIGNATURE_FILE_PATH) image_files = [f for f in os.listdir(image_dir) if f.startswith('image') and f.endswith('.png')] for filename in image_files: try: with open(os.path.join(image_dir, filename), "rb") as f: content_bytes = f.read() content_b64 = base64.b64encode(content_bytes).decode("utf-8") attachments.append({ "@odata.type": "#microsoft.graph.fileAttachment", "name": filename, "contentBytes": content_b64, "isInline": True, "contentId": filename }) except Exception as e: logging.error(f"Could not process image {filename}: {e}") # 3. Get access token catchall = os.getenv("EMAIL_CATCHALL"); to_email = catchall if catchall else to_email token = get_access_token(AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID) if not token: logging.error("Failed to get access token for sending email.") return # 4. Construct and send the email headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} full_body = body + signature_html payload = { "message": { "subject": subject, "body": {"contentType": "HTML", "content": full_body}, "toRecipients": [{"emailAddress": {"address": to_email}}] }, "saveToSentItems": "true" } if attachments: payload["message"]["attachments"] = attachments response = requests.post(f"{GRAPH_API_ENDPOINT}/users/{SENDER_EMAIL}/sendMail", headers=headers, json=payload) logging.info(f"Send mail API response status: {response.status_code}") if response.status_code not in [200, 202]: logging.error(f"Error sending mail: {response.text}") def process_lead(request_id, company, opener, receiver, name): logging.info(f"--- Starting process_lead for request_id: {request_id} ---") db = SessionLocal() try: job = ProposalJob(job_uuid=request_id, customer_email=receiver, customer_company=company, customer_name=name, status="pending") db.add(job) db.commit() logging.info(f"Job {request_id} created and saved to DB.") cal_data = get_availability("e.melcer@robo-planet.de", (CAL_APPID, CAL_SECRET, CAL_TENNANT_ID)) suggestions = find_slots(*cal_data) if cal_data else [] if not suggestions: logging.warning(f"No slots found via API for job {request_id}. Creating fallback slots.") now = datetime.now(TZ_BERLIN) tomorrow = (now + timedelta(days=1)).replace(hour=10, minute=0, second=0, microsecond=0) overmorrow = (now + timedelta(days=2)).replace(hour=14, minute=0, second=0, microsecond=0) suggestions = [tomorrow, overmorrow] logging.info(f"Found/created {len(suggestions)} slot suggestions for job {request_id}.") for s in suggestions: db.add(ProposedSlot(job_id=job.id, start_time=s, end_time=s + timedelta(minutes=15))) db.commit() send_time = datetime.now(TZ_BERLIN) + timedelta(minutes=DEFAULT_WAIT_MINUTES) logging.info(f"Sending Teams approval card for job {request_id}.") from .teams_notification import send_approval_card send_approval_card(job_uuid=request_id, customer_name=company, time_string=send_time.strftime("%H:%M"), webhook_url=TEAMS_WEBHOOK_URL, api_base_url=FEEDBACK_SERVER_BASE_URL) logging.info(f"Waiting for response or timeout until {send_time.strftime('%H:%M:%S')} for job {request_id}") wait_until = datetime.now(TZ_BERLIN) + timedelta(minutes=DEFAULT_WAIT_MINUTES) while datetime.now(TZ_BERLIN) < wait_until: db.refresh(job) if job.status in ["cancelled", "send_now"]: logging.info(f"Status for job {request_id} changed to '{job.status}'. Exiting wait loop.") break time.sleep(5) db.refresh(job) if job.status == "cancelled": logging.info(f"Job {request_id} was cancelled. No email will be sent.") return logging.info(f"Timeout reached or 'Send Now' clicked for job {request_id}. Proceeding to send email.") booking_html = "" fallback_url = WORDPRESS_BOOKING_URL if WORDPRESS_BOOKING_URL else MS_BOOKINGS_URL booking_html += f'

Alternativ buchen Sie direkt einen passenden Termin in meinem Kalender.

' try: with open(SIGNATURE_FILE_PATH, 'r') as f: sig = f.read() except: sig = "" # Format the opener text into proper HTML paragraphs opener_html = "".join([f"

{line}

" for line in opener.split('\n') if line.strip()]) email_body = f"""

Hallo {name},

{opener_html}

Ich freue mich auf den Austausch und schlage Ihnen hierfür konkrete Termine vor:

Mit freundlichen Grüßen,

""" send_email(f"Ihr Kontakt mit RoboPlanet - {company}", email_body, receiver) job.status = "sent" db.commit() logging.info(f"--- Finished process_lead for request_id: {request_id} ---") except Exception as e: logging.error(f"FATAL error in process_lead for request_id {request_id}: {e}", exc_info=True) finally: db.close() if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8004)