import os
import logging
import datetime
from zoneinfo import ZoneInfo
import base64
import re
import pandas as pd
from jinja2 import Environment, FileSystemLoader
from weasyprint import HTML
import tempfile
import shutil
import time
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from database import get_db, Job as DBJob, engine, Base, JobParticipant, SessionLocal
import math
import uuid
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# --- Logging Configuration ---
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger("fotograf-scraper")
# --- Global State for Last Generated File ---
# Simple and robust: persists as long as the container runs.
LATEST_FILE_STATE = {
"path": None,
"display_name": None,
"timestamp": None,
"type": None # 'pdf' or 'csv'
}
def update_latest_file(file_path: str, display_name: str, file_type: str):
try:
# Copy file to a stable location inside the container (/app/data is persistent)
# but for simplicity, /tmp is also fine for "just the last one"
stable_path = os.path.join("/tmp", f"latest_result_{file_type}.{file_type}")
shutil.copy2(file_path, stable_path)
now_berlin = datetime.datetime.now(ZoneInfo("Europe/Berlin"))
LATEST_FILE_STATE["path"] = stable_path
LATEST_FILE_STATE["display_name"] = display_name
LATEST_FILE_STATE["timestamp"] = now_berlin.strftime("%H:%M Uhr")
LATEST_FILE_STATE["type"] = file_type
logger.info(f"Updated latest file state: {display_name}")
except Exception as e:
logger.error(f"Failed to update latest file state: {e}")
def get_berlin_now_str():
return datetime.datetime.now(ZoneInfo("Europe/Berlin")).strftime("%d.%m.%Y %H:%M Uhr")
def format_job_date(date_str: str) -> str:
import re
import datetime
# Sucht nach einem Datum im Format DD.MM.YYYY
match = re.search(r'(\d{2})\.(\d{2})\.(\d{4})', date_str)
if match:
try:
day, month, year = match.groups()
dt = datetime.datetime(int(year), int(month), int(day))
next_day = dt + datetime.timedelta(days=1)
# Format: 15. + 16.04.2026
return f"{dt.day:02d}. + {next_day.strftime('%d.%m.%Y')}"
except Exception:
pass
return date_str
# Load environment variables
load_dotenv()
# Ensure DB is created
Base.metadata.create_all(bind=engine)
import publish_request_api
app = FastAPI(title="Fotograf.de Scraper & ERP API")
app.include_router(publish_request_api.router)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Configuration & Constants ---
LOGIN_URL = 'https://app.fotograf.de/login/login'
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"dashboard_jobs_table_rows": "//tr[.//a[contains(@data-qa-id, 'link:photo-jobs-name-')]]",
"job_row_name_link": ".//a[contains(@data-qa-id, 'link:photo-jobs-name-')]",
"job_row_status": ".//td[count(//th[contains(., 'Status')]/preceding-sibling::th) + 1]",
"job_row_date": ".//td[count(//th[contains(., 'Datum')]/preceding-sibling::th) + 1]",
"job_row_shooting_type": ".//td[count(//th[contains(., 'Typ')]/preceding-sibling::th) + 1]",
"export_dropdown": "[data-qa-id='dropdown:export']",
"export_csv_link": "button[data-qa-id='button:csv']",
# --- Statistics Selectors ---
"album_overview_rows": "//table/tbody/tr",
"album_overview_link": ".//td[2]//a",
"access_code_count": "//span[text()='Zugangscodes']/following-sibling::strong",
"person_rows": "//div[contains(@class, 'border-legacy-silver-550') and .//span[text()='Logins']]",
"person_all_photos": ".//div[@data-key]",
"person_purchased_photos": ".//div[@data-key and .//img[@alt='Bestellungen mit diesem Foto']]",
"person_access_card_photo": ".//div[@data-key and contains(@class, 'opacity-50')]",
"potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]",
"quick_login_url": "//a[@id='quick-login-url']",
"buyer_email": "//span[contains(., '@')]",
}
# --- PDF Generation Logic ---
def get_logo_base64():
logo_path = os.path.join(os.path.dirname(__file__), "assets", "logo.png")
logger.debug(f"Loading logo from: {logo_path}")
try:
with open(logo_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
logger.warning(f"Logo file not found at {logo_path}")
return None
def generate_pdf_from_csv(csv_path: str, institution: str, date_info: str, list_type: str, output_path: str):
logger.info(f"Generating PDF for {institution} from {csv_path}")
df = None
for sep in [";", ","]:
try:
logger.debug(f"Trying CSV separator: '{sep}'")
test_df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig", nrows=5)
if len(test_df.columns) > 1:
df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig")
logger.debug(f"Successfully read CSV with separator '{sep}'")
break
except Exception as e:
logger.debug(f"Failed to read with separator '{sep}': {e}")
continue
if df is None:
logger.error("Could not read CSV with standard separators.")
try:
df = pd.read_csv(csv_path, sep=";", encoding="latin1")
logger.info("Fallback to latin1 encoding successful.")
except:
raise Exception("CSV konnte nicht gelesen werden.")
df.columns = df.columns.str.strip().str.replace("\"", "")
logger.debug(f"CSV Columns: {list(df.columns)}")
group_label = "Gruppe" if list_type.startswith('kiga') else "Klasse"
person_label_plural = "Kinder" if list_type.startswith('kiga') else "Schüler"
col_mapping = {}
for col in df.columns:
lower_col = col.lower().strip()
if lower_col in ["vorname kind", "vorname", "first name"]:
col_mapping[col] = "Vorname"
elif lower_col in ["nachname kind", "nachname", "last name"]:
col_mapping[col] = "Nachname"
elif lower_col in ["gruppe", "klasse", "group", "class"]:
col_mapping[col] = group_label
df = df.rename(columns=col_mapping)
df = df.fillna("")
for col in ["Vorname", "Nachname", group_label]:
if col not in df.columns:
logger.warning(f"Column '{col}' not found in CSV, using default values.")
df[col] = "Alle" if col == group_label else ""
df = df.sort_values(by=[group_label, "Nachname", "Vorname"])
grouped = df.groupby(group_label)
class_data = []
for class_name, group in grouped:
class_data.append({"name": class_name, "students": group.to_dict("records")})
class_counts = [{"name": c, "count": len(g)} for c, g in grouped]
total_students = len(df)
template_dir = os.path.join(os.path.dirname(__file__), "templates")
logger.debug(f"Using template directory: {template_dir}")
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template("school_list.html")
current_time = get_berlin_now_str()
logo_base64 = get_logo_base64()
render_context = {
"institution": institution,
"date_info": date_info,
"class_counts": class_counts,
"total_students": total_students,
"class_data": class_data,
"current_time": current_time,
"logo_base64": logo_base64,
"group_label": group_label,
"person_label_plural": person_label_plural,
"group_column_name": group_label
}
logger.debug("Rendering HTML template...")
html_out = template.render(render_context)
logger.info(f"Writing PDF to: {output_path}")
HTML(string=html_out).write_pdf(output_path)
update_latest_file(output_path, f"Teilnehmerliste {institution}", "pdf")
def generate_appointment_overview_pdf(raw_events: list, job_name: str, event_type_name: str, output_path: str):
from collections import defaultdict
from zoneinfo import ZoneInfo
parsed_events = []
for event in raw_events:
start_dt = datetime.datetime.fromisoformat(event['start_time'].replace('Z', '+00:00'))
start_dt = start_dt.astimezone(ZoneInfo("Europe/Berlin"))
num_children = ""
has_consent = False
for qa in event.get('questions_and_answers', []):
q_text = qa.get('question', '').lower()
a_text = qa.get('answer', '')
if any(kw in q_text for kw in ["wie viele kinder", "anzahl kinder", "wieviele kinder"]):
num_children = a_text
elif any(kw in q_text for kw in ["veröffentlichen", "bilder"]):
if "ja" in a_text.lower() or "gerne" in a_text.lower():
has_consent = True
parsed_events.append({
"dt": start_dt,
"name": event['invitee_name'],
"children": num_children,
"consent": has_consent
})
grouped = defaultdict(list)
for e in parsed_events:
date_str = e['dt'].strftime("%d.%m.%Y")
grouped[date_str].append(e)
final_grouped = {}
for date_str, events in grouped.items():
events.sort(key=lambda x: x['dt'])
min_dt = events[0]['dt']
max_dt = events[-1]['dt']
slots = []
curr_dt = min_dt
event_idx = 0
while curr_dt <= max_dt or event_idx < len(events):
next_dt = curr_dt + datetime.timedelta(minutes=6)
events_in_slot = []
while event_idx < len(events) and events[event_idx]['dt'] < next_dt:
events_in_slot.append(events[event_idx])
event_idx += 1
if events_in_slot:
for e in events_in_slot:
slots.append({
"time_str": e['dt'].strftime("%H:%M"),
"name": e['name'],
"children": e['children'],
"consent": e['consent'],
"booked": True,
"dt": e['dt']
})
else:
if curr_dt <= max_dt:
slots.append({
"time_str": curr_dt.strftime("%H:%M"),
"name": "",
"children": "",
"consent": False,
"booked": False,
"dt": curr_dt
})
curr_dt = next_dt
# Compress empty slots if there are more than 2 in a row
compressed_slots = []
empty_streak = []
for slot in slots:
if slot["booked"]:
if len(empty_streak) > 2:
start_time = empty_streak[0]["time_str"]
end_dt = empty_streak[-1]["dt"] + datetime.timedelta(minutes=6)
end_time = end_dt.strftime("%H:%M")
compressed_slots.append({
"is_compressed": True,
"time_str": f"{start_time} - {end_time}",
"name": "--- Freie Zeit / Pause ---",
"children": "",
"consent": False,
"booked": False
})
else:
compressed_slots.extend(empty_streak)
empty_streak = []
compressed_slots.append(slot)
else:
empty_streak.append(slot)
if len(empty_streak) > 2:
start_time = empty_streak[0]["time_str"]
end_dt = empty_streak[-1]["dt"] + datetime.timedelta(minutes=6)
end_time = end_dt.strftime("%H:%M")
compressed_slots.append({
"is_compressed": True,
"time_str": f"{start_time} - {end_time}",
"name": "--- Freie Zeit / Pause ---",
"children": "",
"consent": False,
"booked": False
})
else:
compressed_slots.extend(empty_streak)
final_grouped[date_str] = compressed_slots
template_dir = os.path.join(os.path.dirname(__file__), "templates")
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template("appointment_list.html")
current_time = get_berlin_now_str()
logo_base64 = get_logo_base64()
render_context = {
"job_name": job_name,
"event_type_name": event_type_name or "Alle Events",
"current_time": current_time,
"logo_base64": logo_base64,
"grouped_slots": final_grouped
}
html_out = template.render(render_context)
HTML(string=html_out).write_pdf(output_path)
update_latest_file(output_path, f"Terminübersicht {job_name}", "pdf")
# --- Selenium Scraper Functions ---
def take_error_screenshot(driver, error_name):
errors_dir = os.path.join(os.path.dirname(__file__), 'errors')
os.makedirs(errors_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(errors_dir, filename)
try:
driver.save_screenshot(filepath)
logger.error(f"!!! Error screenshot saved to: {filepath}")
except Exception as e:
logger.error(f"!!! Could not save screenshot: {e}")
def setup_driver(download_path: str = None):
logger.info("Initializing Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/chromium'
if download_path:
logger.debug(f"Configuring download path: {download_path}")
prefs = {
"download.default_directory": download_path,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True
}
options.add_experimental_option("prefs", prefs)
try:
driver = webdriver.Chrome(options=options)
if download_path:
logger.debug("Allowing downloads in headless mode via CDP...")
driver.execute_cdp_cmd('Page.setDownloadBehavior', {
'behavior': 'allow',
'downloadPath': download_path
})
return driver
except Exception as e:
logger.error(f"Failed to initialize WebDriver: {e}")
return None
def login(driver, username, password):
logger.info(f"Starting login process for user: {username}")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 30)
try:
logger.debug("Checking for cookie banner...")
cookie_wait = WebDriverWait(driver, 5)
cookie_wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click()
logger.info("Cookie banner accepted.")
except:
logger.debug("No cookie banner found.")
logger.debug("Entering credentials...")
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
logger.info("Clicking login button...")
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
logger.info("Waiting for dashboard redirect...")
wait.until(EC.url_contains('/config_dashboard/index'))
logger.info("Login successful!")
return True
except Exception as e:
logger.error(f"Login failed: {e}")
take_error_screenshot(driver, "login_error")
return False
def get_jobs_list(driver) -> List[Dict[str, Any]]:
jobs_list_url = "https://app.fotograf.de/config_jobs/index"
logger.info(f"Navigating to jobs list: {jobs_list_url}")
driver.get(jobs_list_url)
wait = WebDriverWait(driver, 30)
jobs = []
try:
logger.debug("Waiting for job rows to appear...")
job_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["dashboard_jobs_table_rows"])))
logger.info(f"Found {len(job_rows)} job rows.")
for row in job_rows:
try:
name_element = row.find_element(By.XPATH, SELECTORS["job_row_name_link"])
job_name = name_element.text.strip()
job_url = name_element.get_attribute('href')
job_id_match = re.search(r'/(\d+)$', job_url)
job_id = job_id_match.group(1) if job_id_match else None
logger.debug(f"Parsing job: {job_name} (ID: {job_id})")
status_element = row.find_element(By.XPATH, SELECTORS["job_row_status"])
job_status = status_element.text.strip()
date_element = row.find_element(By.XPATH, SELECTORS["job_row_date"])
job_date = date_element.text.strip()
type_element = row.find_element(By.XPATH, SELECTORS["job_row_shooting_type"])
shooting_type = type_element.text.strip()
jobs.append({
"id": job_id,
"name": job_name,
"url": job_url,
"status": job_status,
"date": job_date,
"shooting_type": shooting_type,
})
except Exception as e:
logger.warning(f"Error parsing single job row: {e}")
continue
except Exception as e:
logger.error(f"Error retrieving job list: {e}")
take_error_screenshot(driver, "job_list_error")
return jobs
# --- Background Task Engine ---
task_store: Dict[str, Dict[str, Any]] = {}
def process_statistics(task_id: str, job_id: str, account_type: str):
logger.info(f"Task {task_id}: Starting fast statistics calculation for job {job_id}")
task_store[task_id] = {"status": "running", "progress": "Synchronisiere Daten von Fotograf.de...", "result": None}
db = SessionLocal()
try:
# 1. Sync data from CSV
try:
sync_participants(job_id, account_type, db)
except Exception as sync_err:
logger.error(f"Sync failed during statistics: {sync_err}")
count = db.query(JobParticipant).filter(JobParticipant.job_id == job_id).count()
if count == 0:
task_store[task_id] = {"status": "error", "progress": f"Synchronisierung fehlgeschlagen: {str(sync_err)}"}
return
# 2. Query DB and group by 'gruppe'
task_store[task_id]["progress"] = "Berechne Statistiken..."
# Get all participants for this job
participants = db.query(JobParticipant).filter(JobParticipant.job_id == job_id).all()
# Group by group
groups = {}
for p in participants:
g_name = p.gruppe or "Unbekannt"
if g_name not in groups:
groups[g_name] = {
"Album": g_name,
"Kinder_insgesamt": 0,
"Kinder_mit_Käufen": 0,
"Kinder_Alle_Bilder_gekauft": 0 # Not available in CSV, setting to 0 or estimates
}
groups[g_name]["Kinder_insgesamt"] += 1
if p.has_orders:
groups[g_name]["Kinder_mit_Käufen"] += 1
statistics = list(groups.values())
statistics.sort(key=lambda x: x["Album"])
task_store[task_id] = {
"status": "completed",
"progress": "Statistik erfolgreich berechnet!",
"result": statistics
}
except Exception as e:
logger.exception(f"Unexpected error in statistics task {task_id}")
task_store[task_id] = {"status": "error", "progress": f"Unerwarteter Fehler: {str(e)}"}
finally:
db.close()
def process_reminder_analysis(task_id: str, job_id: str, account_type: str):
logger.info(f"Task {task_id}: Starting fast reminder analysis for job {job_id}")
task_store[task_id] = {"status": "running", "progress": "Synchronisiere Daten von Fotograf.de...", "result": None}
db = SessionLocal()
try:
# 1. Sync data from CSV (This takes ~20s and gets all parent emails, logins and orders)
try:
sync_participants(job_id, account_type, db)
except Exception as sync_err:
logger.error(f"Sync failed during reminder analysis: {sync_err}")
# Continue anyway if we have some data, or fail if we have none
count = db.query(JobParticipant).filter(JobParticipant.job_id == job_id).count()
if count == 0:
task_store[task_id] = {"status": "error", "progress": f"Synchronisierung fehlgeschlagen: {str(sync_err)}"}
return
# 2. Query DB for potential candidates (Logins <= 1 and No Orders)
task_store[task_id]["progress"] = "Analysiere Datenbank-Einträge..."
candidates = db.query(JobParticipant).filter(
JobParticipant.job_id == job_id,
JobParticipant.has_orders == 0,
JobParticipant.logins <= 1,
JobParticipant.email_eltern != "",
JobParticipant.email_eltern != None
).all()
if not candidates:
task_store[task_id] = {
"status": "completed",
"progress": "Keine passenden Empfänger (0-1 Logins, keine Bestellung) gefunden.",
"result": []
}
return
# 3. Aggregate results by Email
aggregation = {}
for c in candidates:
email = c.email_eltern
if email not in aggregation:
aggregation[email] = {
"email": email,
"parent_name": c.vorname_eltern if c.vorname_eltern else "Liebe Eltern",
"children": [],
"links": []
}
# Add child name
child_name = c.vorname_kind or ""
child_label = "Familienbilder" if child_name.lower() == "familie" else child_name
if child_label and child_label not in aggregation[email]["children"]:
aggregation[email]["children"].append(child_label)
# Add Quick Login Link
link = f"https://www.kinderfotos-erding.de/a/{c.zugangscode}"
html_link = f'Fotos von {child_label}'
if html_link not in aggregation[email]["links"]:
aggregation[email]["links"].append(html_link)
# 4. Format for Supermailer/Gmail
final_result = []
for email, data in aggregation.items():
children_str = " und ".join(data["children"]) if len(data["children"]) > 1 else (data["children"][0] if data["children"] else "Eurem Kind")
links_html = "".join([f"{l}
" for l in data["links"]])
final_result.append({
"E-Mail-Adresse Käufer": email,
"Name Käufer": data["parent_name"],
"Kindernamen": children_str,
"Anzahl Kinder": len(data["children"]),
"LinksHTML": links_html
})
task_store[task_id] = {
"status": "completed",
"progress": f"Analyse fertig! {len(final_result)} Empfänger identifiziert.",
"result": final_result
}
except Exception as e:
logger.exception(f"Error in task {task_id}")
task_store[task_id] = {"status": "error", "progress": f"Fehler: {str(e)}"}
finally:
db.close()
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, UploadFile, File, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse
from typing import List, Dict, Any, Optional
from pydantic import BaseModel
from sqlalchemy.orm import Session
from database import get_db, Job as DBJob, engine, Base
import math
import uuid
from qr_generator import get_calendly_events, overlay_text_on_pdf, get_calendly_event_types
from gmail_service import GmailService
# --- API Endpoints ---
@app.get("/api/calendly/event-types")
async def fetch_calendly_event_types():
api_token = os.getenv("CALENDLY_TOKEN")
if not api_token:
raise HTTPException(status_code=400, detail="Calendly API token missing.")
try:
types = get_calendly_event_types(api_token)
return {"event_types": types}
except Exception as e:
logger.error(f"Error fetching Calendly event types: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/calendly/events")
async def fetch_calendly_events(start_time: str, end_time: str, event_type_name: Optional[str] = None):
"""
Debug endpoint to fetch and inspect raw Calendly data.
"""
api_token = os.getenv("CALENDLY_TOKEN")
if not api_token:
raise HTTPException(status_code=400, detail="Calendly API token missing.")
try:
from qr_generator import get_calendly_events_raw
raw_data = get_calendly_events_raw(api_token, start_time, end_time, event_type_name)
return {"count": len(raw_data), "events": raw_data}
except Exception as e:
logger.error(f"Error fetching Calendly events: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/qr-cards/generate")
async def generate_qr_cards(
start_time: str = Form(None),
end_time: str = Form(None),
event_type_name: str = Form(None),
pdf_file: UploadFile = File(...)
):
logger.info(f"API Request: Generate QR cards from {start_time} to {end_time} for event type '{event_type_name}'")
api_token = os.getenv("CALENDLY_TOKEN")
if not api_token:
raise HTTPException(status_code=400, detail="Calendly API token missing.")
try:
# Save uploaded PDF temporarily
temp_dir = tempfile.gettempdir()
base_pdf_path = os.path.join(temp_dir, f"upload_{uuid.uuid4()}.pdf")
with open(base_pdf_path, "wb") as buffer:
shutil.copyfileobj(pdf_file.file, buffer)
# 1. Fetch formatted data from Calendly
texts = get_calendly_events(api_token, start_time, end_time, event_type_name)
if not texts:
os.remove(base_pdf_path)
return JSONResponse(status_code=404, content={"message": "Keine passenden Termine gefunden."})
# 2. Overlay text on blank PDF
output_name = f"QR_Karten_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
output_path = os.path.join(temp_dir, output_name)
overlay_text_on_pdf(base_pdf_path, output_path, texts)
# Cleanup uploaded file
os.remove(base_pdf_path)
# Update latest file tracking
update_latest_file(output_path, f"QR-Karten ({event_type_name or 'Calendly'})", "pdf")
return FileResponse(path=output_path, filename=output_name, media_type="application/pdf")
except Exception as e:
logger.error(f"Error generating QR cards: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/jobs/{job_id}/appointment-list")
async def generate_appointment_list(job_id: str, event_type_name: str, db: Session = Depends(get_db)):
logger.info(f"API Request: Generate appointment list for job {job_id}, event_type '{event_type_name}'")
api_token = os.getenv("CALENDLY_TOKEN")
if not api_token:
raise HTTPException(status_code=400, detail="Calendly API token missing.")
# 1. Fetch job name from DB
job = db.query(DBJob).filter(DBJob.id == job_id).first()
job_name = job.name if job else f"Auftrag {job_id}"
# Clean job name: remove (JOB00005) or similar anywhere in string
import re
job_name_clean = re.sub(r'\(?JOB\d+\)?', '', job_name).strip()
# 2. Fetch raw Calendly events
try:
from qr_generator import get_calendly_events_raw
raw_events = get_calendly_events_raw(api_token, event_type_name=event_type_name)
except Exception as e:
logger.error(f"Error fetching raw Calendly events: {e}")
raise HTTPException(status_code=500, detail=str(e))
if not raw_events:
return JSONResponse(status_code=404, content={"message": "Keine passenden Termine für diesen Event-Typ gefunden."})
# Filter out old events (keep only today and future)
from zoneinfo import ZoneInfo
now_berlin = datetime.datetime.now(ZoneInfo("Europe/Berlin"))
midnight_today = now_berlin.replace(hour=0, minute=0, second=0, microsecond=0)
future_events = []
for event in raw_events:
try:
start_dt = datetime.datetime.fromisoformat(event['start_time'].replace('Z', '+00:00'))
start_dt_berlin = start_dt.astimezone(ZoneInfo("Europe/Berlin"))
if start_dt_berlin >= midnight_today:
future_events.append(event)
except Exception as e:
logger.warning(f"Error parsing date for event: {e}")
future_events.append(event) # Fallback: keep event if date parsing fails
if not future_events:
return JSONResponse(status_code=404, content={"message": "Keine zukünftigen Termine für diesen Event-Typ gefunden."})
# 3. Generate PDF
temp_dir = tempfile.gettempdir()
output_name = f"Terminuebersicht_{job_id}_{datetime.datetime.now().strftime('%Y%m%d')}.pdf"
output_path = os.path.join(temp_dir, output_name)
try:
generate_appointment_overview_pdf(future_events, job_name_clean, event_type_name, output_path)
return FileResponse(path=output_path, filename=output_name, media_type="application/pdf")
except Exception as e:
logger.error(f"Error generating appointment overview pdf: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/jobs/latest-file")
async def get_latest_file_info():
if not LATEST_FILE_STATE["path"] or not os.path.exists(LATEST_FILE_STATE["path"]):
return {"has_file": False}
return {
"has_file": True,
"display_name": LATEST_FILE_STATE["display_name"],
"timestamp": LATEST_FILE_STATE["timestamp"],
"type": LATEST_FILE_STATE["type"]
}
@app.get("/api/jobs/download-latest")
async def download_latest_file():
if not LATEST_FILE_STATE["path"] or not os.path.exists(LATEST_FILE_STATE["path"]):
raise HTTPException(status_code=404, detail="Keine Datei gefunden.")
filename = f"Letzte_Datei_{LATEST_FILE_STATE['type']}.{LATEST_FILE_STATE['type']}"
return FileResponse(
path=LATEST_FILE_STATE["path"],
filename=filename,
media_type="application/pdf" if LATEST_FILE_STATE["type"] == "pdf" else "text/csv"
)
@app.get("/health")
async def health_check():
return {"status": "ok"}
# --- Gmail API Endpoints ---
@app.get("/api/auth/google")
async def get_google_auth_url(db: Session = Depends(get_db)):
service = GmailService(db)
return {"url": service.get_auth_url()}
@app.get("/api/auth/callback")
async def google_auth_callback(code: str, db: Session = Depends(get_db)):
service = GmailService(db)
try:
service.handle_callback(code)
# Redirect back to frontend
# The frontend lives at /fotograf-de/ through NGINX
frontend_url = os.getenv("FRONTEND_URL", "https://floke-ai.duckdns.org/fotograf-de/")
return RedirectResponse(url=frontend_url)
except Exception as e:
logger.error(f"Auth callback failed: {e}")
return JSONResponse(status_code=500, content={"message": f"Authentifizierung fehlgeschlagen: {str(e)}"})
@app.get("/api/gmail/status")
async def get_gmail_status(db: Session = Depends(get_db)):
service = GmailService(db)
return {"authenticated": service.is_authenticated()}
@app.get("/api/jobs", response_model=List[Dict[str, Any]])
async def get_jobs(account_type: str, force_refresh: bool = False, db: Session = Depends(get_db)):
logger.info(f"API Request: GET /api/jobs for {account_type} (force_refresh={force_refresh})")
# 1. Check database first if not forcing a refresh
if not force_refresh:
cached_jobs = db.query(DBJob).filter(DBJob.account_type == account_type).all()
if cached_jobs:
logger.info(f"Returning {len(cached_jobs)} cached jobs for {account_type}")
return [
{
"id": job.id,
"name": job.name,
"url": job.url,
"status": job.status,
"date": job.date,
"shooting_type": job.shooting_type,
"last_updated": job.last_updated.isoformat() if job.last_updated else None
}
for job in cached_jobs
]
else:
logger.info(f"No cached jobs found for {account_type}. Initiating scrape...")
# 2. Scrape from fotograf.de if forcing refresh or no cached jobs
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
if not username or not password:
logger.error(f"Credentials for {account_type} not found in .env")
raise HTTPException(status_code=400, detail="Credentials not found.")
driver = None
try:
driver = setup_driver()
if not driver or not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
scraped_jobs = get_jobs_list(driver)
# 3. Save to database
if scraped_jobs:
logger.info(f"Saving {len(scraped_jobs)} jobs to database for {account_type}...")
# Clear old jobs for this account type
db.query(DBJob).filter(DBJob.account_type == account_type).delete()
# Insert new jobs
now = datetime.datetime.utcnow()
for job_data in scraped_jobs:
if job_data["id"]: # Ensure we have an ID
new_job = DBJob(
id=job_data["id"],
name=job_data["name"],
url=job_data["url"],
status=job_data["status"],
date=job_data["date"],
shooting_type=job_data["shooting_type"],
account_type=account_type,
last_updated=now
)
db.add(new_job)
# Update dict for return value
job_data["last_updated"] = now.isoformat()
db.commit()
logger.info("Database updated successfully.")
return scraped_jobs
except Exception as e:
logger.error(f"Error during scraping or database save: {e}")
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
if driver:
logger.debug("Closing driver.")
driver.quit()
@app.get("/api/tasks/{task_id}")
async def get_task_status(task_id: str):
logger.debug(f"API Request: Check task status for {task_id}")
if task_id not in task_store:
raise HTTPException(status_code=404, detail="Task nicht gefunden.")
return task_store[task_id]
@app.post("/api/jobs/{job_id}/statistics")
async def start_statistics(job_id: str, account_type: str, background_tasks: BackgroundTasks):
logger.info(f"API Request: Start statistics for job {job_id} ({account_type})")
task_id = str(uuid.uuid4())
background_tasks.add_task(process_statistics, task_id, job_id, account_type)
return {"task_id": task_id}
@app.post("/api/jobs/{job_id}/reminder-analysis")
async def start_reminder_analysis(job_id: str, account_type: str, background_tasks: BackgroundTasks):
logger.info(f"API Request: Start reminder analysis for job {job_id} ({account_type})")
task_id = str(uuid.uuid4())
background_tasks.add_task(process_reminder_analysis, task_id, job_id, account_type)
return {"task_id": task_id}
@app.get("/api/tasks/{task_id}/download-csv")
async def download_task_csv(task_id: str):
if task_id not in task_store or task_store[task_id]["status"] != "completed":
raise HTTPException(status_code=404, detail="Ergebnis nicht gefunden oder Task noch nicht abgeschlossen.")
result = task_store[task_id]["result"]
if not result or not isinstance(result, list):
raise HTTPException(status_code=400, detail="Keine Daten zum Exportieren vorhanden.")
try:
df = pd.DataFrame(result)
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
df.to_csv(temp_file.name, index=False, encoding='utf-8-sig')
filename = f"Supermailer_Liste_{task_id[:8]}.csv"
update_latest_file(temp_file.name, "Supermailer Liste", "csv")
return FileResponse(path=temp_file.name, filename=filename, media_type="text/csv")
except Exception as e:
logger.error(f"Export error: {e}")
raise HTTPException(status_code=500, detail="CSV Export fehlgeschlagen.")
class BulkEmailRequest(BaseModel):
emails: List[Dict[str, str]]
@app.post("/api/gmail/send-bulk")
async def send_bulk_emails(request: BulkEmailRequest, db: Session = Depends(get_db)):
service = GmailService(db)
if not service.is_authenticated():
raise HTTPException(status_code=401, detail="Gmail nicht authentifiziert.")
success_count = 0
failed_emails = []
for email_data in request.emails:
to = email_data.get("to")
subject = email_data.get("subject")
body = email_data.get("body")
if service.send_email(to, subject, body):
success_count += 1
else:
failed_emails.append(to)
return {
"total": len(request.emails),
"success": success_count,
"failed": failed_emails
}
def sync_participants(job_id: str, account_type: str, db: Session):
logger.info(f"Syncing participants for job {job_id} ({account_type})")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
with tempfile.TemporaryDirectory() as temp_dir:
driver = setup_driver(download_path=temp_dir)
try:
if not login(driver, username, password):
raise Exception("Login failed.")
# Navigate to the Persons tab
job_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
driver.get(job_url)
wait = WebDriverWait(driver, 30)
personen_tab = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "[data-qa-id='link:photo-jobs-tabs-names_list']")))
driver.execute_script("arguments[0].click();", personen_tab)
# Click Export -> CSV
export_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_dropdown"])))
driver.execute_script("arguments[0].click();", export_btn)
time.sleep(1)
csv_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_csv_link"])))
driver.execute_script("arguments[0].click();", csv_btn)
# Wait for download
csv_file = None
for _ in range(45):
files = os.listdir(temp_dir)
csv_files = [f for f in files if f.endswith('.csv')]
if csv_files:
csv_file = os.path.join(temp_dir, csv_files[0])
break
time.sleep(1)
if not csv_file:
raise Exception("CSV download timed out.")
# Read CSV with pandas
df = None
for sep in [";", ","]:
try:
df = pd.read_csv(csv_file, sep=sep, encoding="utf-8-sig")
if len(df.columns) > 1: break
except: continue
if df is None: raise Exception("Could not parse CSV.")
# Clean columns
df.columns = df.columns.str.strip().str.replace("\"", "")
logger.debug(f"Sync CSV Columns: {list(df.columns)}")
# Column Mapping
mapping = {
"Child ID": "child_id",
"Email der Eltern (1)": "email_eltern",
"Vorname Eltern (1)": "vorname_eltern",
"Nachname Eltern (1)": "nachname_eltern",
"Vorname Kind": "vorname_kind",
"Nachname Kind": "nachname_kind",
"Zugangscode (1)": "zugangscode",
"Logins (1)": "logins",
"Bestellungen": "has_orders",
"Gruppe": "gruppe",
"Klasse": "gruppe"
}
# Upsert into database
for _, row in df.iterrows():
code = str(row.get("Zugangscode (1)", "")).strip()
if not code or code == "nan": continue
def clean_val(val):
v = str(val).strip()
return "" if v.lower() == "nan" else v
# Determine order status
orders_val = str(row.get("Bestellungen", "0")).lower()
has_orders = 1 if (orders_val != "0" and orders_val != "nan" and orders_val != "") else 0
# Determine logins
logins_val = row.get("Logins (1)", 0)
try: logins = int(float(logins_val))
except: logins = 0
participant = db.query(JobParticipant).filter(JobParticipant.job_id == job_id, JobParticipant.zugangscode == code).first()
if not participant:
participant = JobParticipant(job_id=job_id, zugangscode=code)
db.add(participant)
participant.child_id = clean_val(row.get("Child ID"))
participant.vorname_kind = clean_val(row.get("Vorname Kind"))
participant.nachname_kind = clean_val(row.get("Nachname Kind"))
participant.vorname_eltern = clean_val(row.get("Vorname Eltern (1)"))
participant.nachname_eltern = clean_val(row.get("Nachname Eltern (1)"))
participant.email_eltern = clean_val(row.get("Email der Eltern (1)")).lower()
participant.gruppe = clean_val(row.get("Gruppe", row.get("Klasse")))
participant.logins = logins
participant.has_orders = has_orders
participant.last_synced = datetime.datetime.utcnow()
db.commit()
logger.info(f"Successfully synced {len(df)} participants for job {job_id}")
return len(df)
finally:
driver.quit()
@app.get("/api/jobs/{job_id}/fast-stats")
async def get_fast_stats(job_id: str, db: Session = Depends(get_db)):
participants = db.query(JobParticipant).filter(JobParticipant.job_id == job_id).all()
if not participants:
return []
groups = {}
for p in participants:
g_name = p.gruppe or "Unbekannt"
if g_name not in groups:
groups[g_name] = {
"Album": g_name,
"Kinder_insgesamt": 0,
"Kinder_mit_Käufen": 0,
"Kinder_Alle_Bilder_gekauft": 0
}
groups[g_name]["Kinder_insgesamt"] += 1
if p.has_orders:
groups[g_name]["Kinder_mit_Käufen"] += 1
statistics = list(groups.values())
statistics.sort(key=lambda x: x["Album"])
return statistics
@app.post("/api/jobs/{job_id}/sync-participants")
async def sync_participants_api(job_id: str, account_type: str, db: Session = Depends(get_db)):
try:
count = sync_participants(job_id, account_type, db)
return {"status": "success", "count": count}
except Exception as e:
logger.exception("Sync failed")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/jobs/{job_id}/generate-pdf")
async def generate_pdf(job_id: str, account_type: str, db: Session = Depends(get_db)):
logger.info(f"API Request: Generate PDF for job {job_id} ({account_type})")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
with tempfile.TemporaryDirectory() as temp_dir:
logger.debug(f"Using temp directory for download: {temp_dir}")
driver = setup_driver(download_path=temp_dir)
try:
if not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
# 1. Navigate to job settings page first
job_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
logger.info(f"Navigating to job main page: {job_url}")
driver.get(job_url)
wait = WebDriverWait(driver, 30)
# Get Institution Name for PDF
try:
institution = driver.find_element(By.TAG_NAME, "h1").text.strip()
logger.debug(f"Detected institution name: {institution}")
except:
institution = "Fotoauftrag"
# 1.5 Click on the "Personen" tab
logger.info("Clicking on 'Personen' tab...")
personen_tab = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "[data-qa-id='link:photo-jobs-tabs-names_list']")))
# Use JS click to avoid 'element click intercepted' errors from loading overlays
driver.execute_script("arguments[0].click();", personen_tab)
# Wait for the export button to become present on the new tab
logger.info("Waiting for Export Dropdown...")
export_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_dropdown"])))
# Scroll to it and click via JS to avoid obscuring elements
driver.execute_script("arguments[0].scrollIntoView(true);", export_btn)
time.sleep(1)
logger.info("Clicking Export Dropdown...")
driver.execute_script("arguments[0].click();", export_btn)
logger.debug("Export dropdown clicked, waiting for menu items...")
time.sleep(2)
try:
csv_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_csv_link"])))
logger.info("CSV Export button found. Clicking...")
driver.execute_script("arguments[0].click();", csv_btn)
except TimeoutException:
logger.error("CSV Button not found after clicking dropdown.")
take_error_screenshot(driver, "csv_button_missing")
raise HTTPException(status_code=500, detail="CSV Export Button konnte nicht gefunden werden.")
# Wait for file to appear
logger.debug("Waiting for CSV file in download directory...")
timeout = 45
start_time = time.time()
csv_file = None
while time.time() - start_time < timeout:
files = os.listdir(temp_dir)
csv_files = [f for f in files if f.endswith('.csv')]
if csv_files:
csv_file = os.path.join(temp_dir, csv_files[0])
logger.info(f"Download complete: {csv_file}")
break
time.sleep(1)
if not csv_file:
logger.error(f"Download timed out after {timeout} seconds.")
take_error_screenshot(driver, "download_timeout")
raise HTTPException(status_code=500, detail="CSV Download fehlgeschlagen.")
output_pdf_name = f"Listen_{job_id}.pdf"
output_pdf_path = os.path.join(temp_dir, output_pdf_name)
# Hole Auftragsdatum aus der Datenbank, falls vorhanden
job_record = db.query(DBJob).filter(DBJob.id == job_id).first()
if job_record and job_record.date:
final_date_info = format_job_date(job_record.date)
else:
final_date_info = datetime.datetime.now(ZoneInfo("Europe/Berlin")).strftime("%d.%m.%Y")
generate_pdf_from_csv(
csv_path=csv_file,
institution=institution,
date_info=final_date_info,
list_type=account_type,
output_path=output_pdf_path
)
final_storage = os.path.join("/tmp", output_pdf_name)
logger.info(f"PDF successfully generated. Copying to {final_storage}")
shutil.copy(output_pdf_path, final_storage)
return FileResponse(path=final_storage, filename=output_pdf_name, media_type="application/pdf")
except HTTPException as he:
raise he
except Exception as e:
logger.exception("Unexpected error during PDF generation")
raise HTTPException(status_code=500, detail=str(e))
finally:
if driver:
logger.debug("Closing driver.")
driver.quit()
@app.get("/api/jobs/{job_id}/siblings-list")
async def generate_siblings_list(job_id: str, account_type: str, event_type_name: str = "", db: Session = Depends(get_db)):
logger.info(f"API Request: Generate siblings list for job {job_id}")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
api_token = os.getenv("CALENDLY_TOKEN")
if not api_token:
raise HTTPException(status_code=400, detail="Calendly API token missing.")
# Get Calendly events
from qr_generator import get_calendly_events_raw
try:
# Fetch ALL events to ensure we don't miss siblings due to event name mismatches
calendly_events = get_calendly_events_raw(api_token, event_type_name=None)
logger.info(f"Fetched {len(calendly_events)} total events from Calendly for siblings check.")
except Exception as e:
logger.error(f"Error fetching Calendly events: {e}")
calendly_events = []
with tempfile.TemporaryDirectory() as temp_dir:
logger.debug(f"Using temp directory: {temp_dir}")
driver = setup_driver(download_path=temp_dir)
try:
if not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
job_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
driver.get(job_url)
wait = WebDriverWait(driver, 30)
try:
institution = driver.find_element(By.TAG_NAME, "h1").text.strip()
except:
institution = "Fotoauftrag"
personen_tab = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "[data-qa-id='link:photo-jobs-tabs-names_list']")))
driver.execute_script("arguments[0].click();", personen_tab)
export_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_dropdown"])))
driver.execute_script("arguments[0].scrollIntoView(true);", export_btn)
time.sleep(1)
driver.execute_script("arguments[0].click();", export_btn)
time.sleep(2)
try:
csv_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_csv_link"])))
driver.execute_script("arguments[0].click();", csv_btn)
except TimeoutException:
raise HTTPException(status_code=500, detail="CSV Export Button nicht gefunden.")
timeout = 45
start_time = time.time()
csv_file = None
while time.time() - start_time < timeout:
files = os.listdir(temp_dir)
csv_files = [f for f in files if f.endswith('.csv')]
if csv_files:
csv_file = os.path.join(temp_dir, csv_files[0])
break
time.sleep(1)
if not csv_file:
raise HTTPException(status_code=500, detail="CSV Download fehlgeschlagen.")
output_pdf_name = f"Geschwisterliste_{job_id}.pdf"
output_pdf_path = os.path.join(temp_dir, output_pdf_name)
from siblings_logic import generate_siblings_pdf_from_csv
generate_siblings_pdf_from_csv(
csv_path=csv_file,
institution=institution,
calendly_events=calendly_events,
list_type=account_type,
output_path=output_pdf_path
)
final_storage = os.path.join("/tmp", output_pdf_name)
shutil.copy(output_pdf_path, final_storage)
# Since the frontend has trouble triggering a blob download, return a JSON with a download link
download_url = f"/api/jobs/download-qr/{job_id}/{output_pdf_name}"
return JSONResponse(content={"status": "success", "download_url": download_url, "filename": output_pdf_name})
except HTTPException as he:
raise he
except Exception as e:
logger.exception("Error generating siblings list")
raise HTTPException(status_code=500, detail=str(e))
finally:
if driver: driver.quit()
@app.post("/api/jobs/{job_id}/siblings-qr-cards")
async def generate_siblings_qr_endpoint(
job_id: str,
account_type: str,
pdf_file: UploadFile = File(...),
db: Session = Depends(get_db)
):
logger.info(f"API Request: Generate siblings QR cards for job {job_id}")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
with tempfile.TemporaryDirectory() as temp_dir:
input_pdf_path = os.path.join(temp_dir, "input.pdf")
with open(input_pdf_path, "wb") as buffer:
shutil.copyfileobj(pdf_file.file, buffer)
driver = setup_driver(download_path=temp_dir)
try:
if not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
job_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
driver.get(job_url)
wait = WebDriverWait(driver, 30)
personen_tab = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "[data-qa-id='link:photo-jobs-tabs-names_list']")))
driver.execute_script("arguments[0].click();", personen_tab)
export_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_dropdown"])))
driver.execute_script("arguments[0].scrollIntoView(true);", export_btn)
time.sleep(1)
driver.execute_script("arguments[0].click();", export_btn)
time.sleep(2)
try:
csv_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_csv_link"])))
driver.execute_script("arguments[0].click();", csv_btn)
except TimeoutException:
raise HTTPException(status_code=500, detail="CSV Export Button nicht gefunden.")
timeout = 45
start_time = time.time()
csv_file = None
while time.time() - start_time < timeout:
files = os.listdir(temp_dir)
csv_files = [f for f in files if f.endswith('.csv')]
if csv_files:
csv_file = os.path.join(temp_dir, csv_files[0])
break
time.sleep(1)
if not csv_file:
raise HTTPException(status_code=500, detail="CSV Download fehlgeschlagen.")
output_pdf_name = f"Geschwister_QR_{job_id}.pdf"
output_pdf_path = os.path.join(temp_dir, output_pdf_name)
from siblings_logic import get_sibling_families_from_csv
# Fetch Calendly events to exclude those who already have a meeting
api_token = os.getenv("CALENDLY_TOKEN")
from qr_generator import get_calendly_events_raw
try:
calendly_events = get_calendly_events_raw(api_token, event_type_name=None)
except:
calendly_events = []
families = get_sibling_families_from_csv(csv_file, calendly_events=calendly_events)
if not families:
raise HTTPException(status_code=404, detail="Keine Geschwisterkinder für QR-Karten gefunden.")
from qr_generator import generate_siblings_qr_overlay
generate_siblings_qr_overlay(input_pdf_path, output_pdf_path, families)
final_storage = os.path.join("/tmp", output_pdf_name)
shutil.copy(output_pdf_path, final_storage)
# Since the frontend has trouble triggering a blob download, return a JSON with a download link
download_url = f"/api/jobs/download-qr/{job_id}/{output_pdf_name}"
return JSONResponse(content={"status": "success", "download_url": download_url, "filename": output_pdf_name})
except HTTPException as he:
raise he
except Exception as e:
logger.exception("Error generating siblings QR cards")
raise HTTPException(status_code=500, detail=str(e))
finally:
if driver: driver.quit()
@app.get("/api/jobs/download-qr/{job_id}/{filename}")
async def download_generated_qr(job_id: str, filename: str):
file_path = os.path.join("/tmp", filename)
if os.path.exists(file_path):
return FileResponse(path=file_path, filename=filename, media_type="application/pdf")
raise HTTPException(status_code=404, detail="File not found")