Files
Brancheneinstufung2/fotograf-de-scraper/backend/main.py

1037 lines
44 KiB
Python

import os
import logging
import datetime
import base64
import re
import pandas as pd
from jinja2 import Environment, FileSystemLoader
from weasyprint import HTML
import tempfile
import shutil
import time
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from database import get_db, Job as DBJob, engine, Base
import math
import uuid
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# --- Logging Configuration ---
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger("fotograf-scraper")
# Load environment variables
load_dotenv()
# Ensure DB is created
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Fotograf.de Scraper & ERP API")
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Configuration & Constants ---
LOGIN_URL = 'https://app.fotograf.de/login/login'
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"dashboard_jobs_table_rows": "//tr[.//a[contains(@data-qa-id, 'link:photo-jobs-name-')]]",
"job_row_name_link": ".//a[contains(@data-qa-id, 'link:photo-jobs-name-')]",
"job_row_status": ".//td[count(//th[contains(., 'Status')]/preceding-sibling::th) + 1]",
"job_row_date": ".//td[count(//th[contains(., 'Datum')]/preceding-sibling::th) + 1]",
"job_row_shooting_type": ".//td[count(//th[contains(., 'Typ')]/preceding-sibling::th) + 1]",
"export_dropdown": "[data-qa-id='dropdown:export']",
"export_csv_link": "button[data-qa-id='button:csv']",
# --- Statistics Selectors ---
"album_overview_rows": "//table/tbody/tr",
"album_overview_link": ".//td[2]//a",
"access_code_count": "//span[text()='Zugangscodes']/following-sibling::strong",
"person_rows": "//div[contains(@class, 'border-legacy-silver-550') and .//span[text()='Logins']]",
"person_all_photos": ".//div[@data-key]",
"person_purchased_photos": ".//div[@data-key and .//img[@alt='Bestellungen mit diesem Foto']]",
"person_access_card_photo": ".//div[@data-key and contains(@class, 'opacity-50')]",
"potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]",
"quick_login_url": "//a[@id='quick-login-url']",
"buyer_email": "//span[contains(., '@')]",
}
# --- PDF Generation Logic ---
def get_logo_base64():
logo_path = os.path.join(os.path.dirname(__file__), "assets", "logo.png")
logger.debug(f"Loading logo from: {logo_path}")
try:
with open(logo_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
logger.warning(f"Logo file not found at {logo_path}")
return None
def generate_pdf_from_csv(csv_path: str, institution: str, date_info: str, list_type: str, output_path: str):
logger.info(f"Generating PDF for {institution} from {csv_path}")
df = None
for sep in [";", ","]:
try:
logger.debug(f"Trying CSV separator: '{sep}'")
test_df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig", nrows=5)
if len(test_df.columns) > 1:
df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig")
logger.debug(f"Successfully read CSV with separator '{sep}'")
break
except Exception as e:
logger.debug(f"Failed to read with separator '{sep}': {e}")
continue
if df is None:
logger.error("Could not read CSV with standard separators.")
try:
df = pd.read_csv(csv_path, sep=";", encoding="latin1")
logger.info("Fallback to latin1 encoding successful.")
except:
raise Exception("CSV konnte nicht gelesen werden.")
df.columns = df.columns.str.strip().str.replace("\"", "")
logger.debug(f"CSV Columns: {list(df.columns)}")
group_label = "Gruppe" if list_type == 'k' else "Klasse"
person_label_plural = "Kinder" if list_type == 'k' else "Schüler"
col_mapping = {}
for col in df.columns:
lower_col = col.lower().strip()
if lower_col in ["vorname kind", "vorname", "first name"]:
col_mapping[col] = "Vorname"
elif lower_col in ["nachname kind", "nachname", "last name"]:
col_mapping[col] = "Nachname"
elif lower_col in ["gruppe", "klasse", "group", "class"]:
col_mapping[col] = group_label
df = df.rename(columns=col_mapping)
df = df.fillna("")
for col in ["Vorname", "Nachname", group_label]:
if col not in df.columns:
logger.warning(f"Column '{col}' not found in CSV, using default values.")
df[col] = "Alle" if col == group_label else ""
df = df.sort_values(by=[group_label, "Nachname", "Vorname"])
grouped = df.groupby(group_label)
class_data = []
for class_name, group in grouped:
class_data.append({"name": class_name, "students": group.to_dict("records")})
class_counts = [{"name": c, "count": len(g)} for c, g in grouped]
total_students = len(df)
template_dir = os.path.join(os.path.dirname(__file__), "templates")
logger.debug(f"Using template directory: {template_dir}")
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template("school_list.html")
current_time = datetime.datetime.now().strftime("%d.%m.%Y %H:%M Uhr")
logo_base64 = get_logo_base64()
render_context = {
"institution": institution,
"date_info": date_info,
"class_counts": class_counts,
"total_students": total_students,
"class_data": class_data,
"current_time": current_time,
"logo_base64": logo_base64,
"group_label": group_label,
"person_label_plural": person_label_plural,
"group_column_name": group_label
}
logger.debug("Rendering HTML template...")
html_out = template.render(render_context)
logger.info(f"Writing PDF to: {output_path}")
HTML(string=html_out).write_pdf(output_path)
def generate_appointment_overview_pdf(raw_events: list, job_name: str, event_type_name: str, output_path: str):
from collections import defaultdict
from zoneinfo import ZoneInfo
parsed_events = []
for event in raw_events:
start_dt = datetime.datetime.fromisoformat(event['start_time'].replace('Z', '+00:00'))
start_dt = start_dt.astimezone(ZoneInfo("Europe/Berlin"))
num_children = ""
has_consent = False
for qa in event.get('questions_and_answers', []):
q_text = qa.get('question', '').lower()
a_text = qa.get('answer', '')
if any(kw in q_text for kw in ["wie viele kinder", "anzahl kinder", "wieviele kinder"]):
num_children = a_text
elif any(kw in q_text for kw in ["veröffentlichen", "bilder"]):
if "ja" in a_text.lower() or "gerne" in a_text.lower():
has_consent = True
parsed_events.append({
"dt": start_dt,
"name": event['invitee_name'],
"children": num_children,
"consent": has_consent
})
grouped = defaultdict(list)
for e in parsed_events:
date_str = e['dt'].strftime("%d.%m.%Y")
grouped[date_str].append(e)
final_grouped = {}
for date_str, events in grouped.items():
events.sort(key=lambda x: x['dt'])
min_dt = events[0]['dt']
max_dt = events[-1]['dt']
slots = []
curr_dt = min_dt
event_idx = 0
while curr_dt <= max_dt or event_idx < len(events):
next_dt = curr_dt + datetime.timedelta(minutes=6)
events_in_slot = []
while event_idx < len(events) and events[event_idx]['dt'] < next_dt:
events_in_slot.append(events[event_idx])
event_idx += 1
if events_in_slot:
for e in events_in_slot:
slots.append({
"time_str": e['dt'].strftime("%H:%M"),
"name": e['name'],
"children": e['children'],
"consent": e['consent'],
"booked": True,
"dt": e['dt']
})
else:
if curr_dt <= max_dt:
slots.append({
"time_str": curr_dt.strftime("%H:%M"),
"name": "",
"children": "",
"consent": False,
"booked": False,
"dt": curr_dt
})
curr_dt = next_dt
# Compress empty slots if there are more than 2 in a row
compressed_slots = []
empty_streak = []
for slot in slots:
if slot["booked"]:
if len(empty_streak) > 2:
start_time = empty_streak[0]["time_str"]
end_dt = empty_streak[-1]["dt"] + datetime.timedelta(minutes=6)
end_time = end_dt.strftime("%H:%M")
compressed_slots.append({
"is_compressed": True,
"time_str": f"{start_time} - {end_time}",
"name": "--- Freie Zeit / Pause ---",
"children": "",
"consent": False,
"booked": False
})
else:
compressed_slots.extend(empty_streak)
empty_streak = []
compressed_slots.append(slot)
else:
empty_streak.append(slot)
if len(empty_streak) > 2:
start_time = empty_streak[0]["time_str"]
end_dt = empty_streak[-1]["dt"] + datetime.timedelta(minutes=6)
end_time = end_dt.strftime("%H:%M")
compressed_slots.append({
"is_compressed": True,
"time_str": f"{start_time} - {end_time}",
"name": "--- Freie Zeit / Pause ---",
"children": "",
"consent": False,
"booked": False
})
else:
compressed_slots.extend(empty_streak)
final_grouped[date_str] = compressed_slots
template_dir = os.path.join(os.path.dirname(__file__), "templates")
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template("appointment_list.html")
current_time = datetime.datetime.now().strftime("%d.%m.%Y %H:%M Uhr")
logo_base64 = get_logo_base64()
render_context = {
"job_name": job_name,
"event_type_name": event_type_name or "Alle Events",
"current_time": current_time,
"logo_base64": logo_base64,
"grouped_slots": final_grouped
}
html_out = template.render(render_context)
HTML(string=html_out).write_pdf(output_path)
# --- Selenium Scraper Functions ---
def take_error_screenshot(driver, error_name):
errors_dir = os.path.join(os.path.dirname(__file__), 'errors')
os.makedirs(errors_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(errors_dir, filename)
try:
driver.save_screenshot(filepath)
logger.error(f"!!! Error screenshot saved to: {filepath}")
except Exception as e:
logger.error(f"!!! Could not save screenshot: {e}")
def setup_driver(download_path: str = None):
logger.info("Initializing Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/chromium'
if download_path:
logger.debug(f"Configuring download path: {download_path}")
prefs = {
"download.default_directory": download_path,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True
}
options.add_experimental_option("prefs", prefs)
try:
driver = webdriver.Chrome(options=options)
if download_path:
logger.debug("Allowing downloads in headless mode via CDP...")
driver.execute_cdp_cmd('Page.setDownloadBehavior', {
'behavior': 'allow',
'downloadPath': download_path
})
return driver
except Exception as e:
logger.error(f"Failed to initialize WebDriver: {e}")
return None
def login(driver, username, password):
logger.info(f"Starting login process for user: {username}")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 30)
try:
logger.debug("Checking for cookie banner...")
cookie_wait = WebDriverWait(driver, 5)
cookie_wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click()
logger.info("Cookie banner accepted.")
except:
logger.debug("No cookie banner found.")
logger.debug("Entering credentials...")
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
logger.info("Clicking login button...")
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
logger.info("Waiting for dashboard redirect...")
wait.until(EC.url_contains('/config_dashboard/index'))
logger.info("Login successful!")
return True
except Exception as e:
logger.error(f"Login failed: {e}")
take_error_screenshot(driver, "login_error")
return False
def get_jobs_list(driver) -> List[Dict[str, Any]]:
jobs_list_url = "https://app.fotograf.de/config_jobs/index"
logger.info(f"Navigating to jobs list: {jobs_list_url}")
driver.get(jobs_list_url)
wait = WebDriverWait(driver, 30)
jobs = []
try:
logger.debug("Waiting for job rows to appear...")
job_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["dashboard_jobs_table_rows"])))
logger.info(f"Found {len(job_rows)} job rows.")
for row in job_rows:
try:
name_element = row.find_element(By.XPATH, SELECTORS["job_row_name_link"])
job_name = name_element.text.strip()
job_url = name_element.get_attribute('href')
job_id_match = re.search(r'/(\d+)$', job_url)
job_id = job_id_match.group(1) if job_id_match else None
logger.debug(f"Parsing job: {job_name} (ID: {job_id})")
status_element = row.find_element(By.XPATH, SELECTORS["job_row_status"])
job_status = status_element.text.strip()
date_element = row.find_element(By.XPATH, SELECTORS["job_row_date"])
job_date = date_element.text.strip()
type_element = row.find_element(By.XPATH, SELECTORS["job_row_shooting_type"])
shooting_type = type_element.text.strip()
jobs.append({
"id": job_id,
"name": job_name,
"url": job_url,
"status": job_status,
"date": job_date,
"shooting_type": shooting_type,
})
except Exception as e:
logger.warning(f"Error parsing single job row: {e}")
continue
except Exception as e:
logger.error(f"Error retrieving job list: {e}")
take_error_screenshot(driver, "job_list_error")
return jobs
# --- Background Task Engine ---
task_store: Dict[str, Dict[str, Any]] = {}
def process_statistics(task_id: str, job_id: str, account_type: str):
logger.info(f"Task {task_id}: Starting statistics calculation for job {job_id}")
task_store[task_id] = {"status": "running", "progress": "Initialisiere Browser...", "result": None}
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
driver = None
try:
driver = setup_driver()
if not driver or not login(driver, username, password):
task_store[task_id] = {"status": "error", "progress": "Login fehlgeschlagen. Überprüfe die Zugangsdaten."}
return
task_store[task_id]["progress"] = f"Lade Alben-Übersicht für Auftrag..."
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
logger.info(f"Navigating to albums: {albums_overview_url}")
driver.get(albums_overview_url)
wait = WebDriverWait(driver, 15)
albums_to_visit = []
try:
album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"])))
for row in album_rows:
try:
album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"])
albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')})
except NoSuchElementException:
continue
except TimeoutException:
task_store[task_id] = {"status": "error", "progress": "Konnte die Album-Liste nicht finden."}
return
total_albums = len(albums_to_visit)
task_store[task_id]["progress"] = f"{total_albums} Alben gefunden. Starte Auswertung..."
statistics = []
for index, album in enumerate(albums_to_visit):
album_name = album['name']
task_store[task_id]["progress"] = f"Bearbeite Album {index + 1}/{total_albums}: '{album_name}'..."
driver.get(album['url'])
try:
total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text
num_pages = math.ceil(int(total_codes_text) / 20)
total_children_in_album = 0
children_with_purchase = 0
children_with_all_purchased = 0
for page_num in range(1, num_pages + 1):
task_store[task_id]["progress"] = f"Bearbeite Album {index + 1}/{total_albums}: '{album_name}' (Seite {page_num}/{num_pages})..."
if page_num > 1:
driver.get(album['url'] + f"?page_guest_accesses={page_num}")
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
for person_row in person_rows:
total_children_in_album += 1
try:
photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]")
num_total_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_all_photos"]))
num_purchased_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_purchased_photos"]))
num_access_cards = len(photo_container.find_elements(By.XPATH, SELECTORS["person_access_card_photo"]))
buyable_photos = num_total_photos - num_access_cards
if num_purchased_photos > 0:
children_with_purchase += 1
if buyable_photos > 0 and buyable_photos == num_purchased_photos:
children_with_all_purchased += 1
except NoSuchElementException:
continue
statistics.append({
"Album": album_name,
"Kinder_insgesamt": total_children_in_album,
"Kinder_mit_Käufen": children_with_purchase,
"Kinder_Alle_Bilder_gekauft": children_with_all_purchased
})
except Exception as e:
logger.error(f"Fehler bei Auswertung von Album '{album_name}': {e}")
continue
task_store[task_id] = {
"status": "completed",
"progress": "Auswertung erfolgreich abgeschlossen!",
"result": statistics
}
except Exception as e:
logger.exception(f"Unexpected error in task {task_id}")
task_store[task_id] = {"status": "error", "progress": f"Unerwarteter Fehler: {str(e)}"}
finally:
if driver:
logger.debug(f"Task {task_id}: Closing driver.")
driver.quit()
def process_reminder_analysis(task_id: str, job_id: str, account_type: str):
logger.info(f"Task {task_id}: Starting reminder analysis for job {job_id}")
task_store[task_id] = {"status": "running", "progress": "Initialisiere Browser...", "result": None}
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
driver = None
try:
driver = setup_driver()
if not driver or not login(driver, username, password):
task_store[task_id] = {"status": "error", "progress": "Login fehlgeschlagen."}
return
wait = WebDriverWait(driver, 15)
# 1. Navigate to albums overview
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
task_store[task_id]["progress"] = "Lade Alben-Übersicht..."
driver.get(albums_overview_url)
albums_to_visit = []
try:
album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"])))
for row in album_rows:
try:
album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"])
albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')})
except NoSuchElementException:
continue
except TimeoutException:
task_store[task_id] = {"status": "error", "progress": "Konnte die Album-Liste nicht finden."}
return
raw_results = []
total_albums = len(albums_to_visit)
for index, album in enumerate(albums_to_visit):
album_name = album['name']
task_store[task_id]["progress"] = f"Album {index+1}/{total_albums}: '{album_name}'..."
driver.get(album['url'])
try:
total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text
num_pages = math.ceil(int(total_codes_text) / 20)
for page_num in range(1, num_pages + 1):
task_store[task_id]["progress"] = f"Album {index+1}/{total_albums}: '{album_name}' (Seite {page_num}/{num_pages})..."
if page_num > 1:
driver.get(album['url'] + f"?page_guest_accesses={page_num}")
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
num_persons = len(person_rows)
for i in range(num_persons):
# Re-locate rows to avoid stale element reference
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
person_row = person_rows[i]
login_count_text = person_row.find_element(By.XPATH, ".//span[text()='Logins']/following-sibling::strong").text
# Only interested in people with 0 or 1 logins (potential reminders)
# Actually, if they haven't bought yet, they might need a reminder regardless of logins,
# but the legacy logic uses login_count <= 1.
# Let's stick to the legacy logic for now.
if int(login_count_text) <= 1:
vorname = person_row.find_element(By.XPATH, ".//span[text()='Vorname']/following-sibling::strong").text
try:
photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]")
purchase_icons = photo_container.find_elements(By.XPATH, ".//img[@alt='Bestellungen mit diesem Foto']")
if len(purchase_icons) > 0:
continue
except NoSuchElementException:
pass
# Potential candidate
access_code_page_url = person_row.find_element(By.XPATH, ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]").get_attribute('href')
# Open in new tab or navigate back and forth?
# Scraper.py navigates back and forth.
driver.get(access_code_page_url)
try:
wait.until(EC.visibility_of_element_located((By.XPATH, "//a[@id='quick-login-url']")))
quick_login_url = driver.find_element(By.XPATH, "//a[@id='quick-login-url']").get_attribute('href')
potential_buyer_element = driver.find_element(By.XPATH, "//a[contains(@href, '/config_customers/view_customer')]")
buyer_name = potential_buyer_element.text
potential_buyer_element.click()
email = wait.until(EC.visibility_of_element_located((By.XPATH, "//span[contains(., '@')]"))).text
raw_results.append({
"child_name": vorname,
"buyer_name": buyer_name,
"email": email,
"quick_login": quick_login_url
})
except Exception as e:
logger.warning(f"Error getting details for {vorname}: {e}")
# Go back to the album page
driver.get(album['url'] + (f"?page_guest_accesses={page_num}" if page_num > 1 else ""))
wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"])))
except Exception as e:
logger.error(f"Fehler bei Album '{album_name}': {e}")
continue
# Aggregate Results
task_store[task_id]["progress"] = "Aggregiere Ergebnisse..."
aggregated_data = {}
for res in raw_results:
email = res['email']
child_name = "Familienbilder" if res['child_name'] == "Familie" else res['child_name']
html_link = f'<a href="{res["quick_login"]}">Fotos von {child_name}</a>'
if email not in aggregated_data:
aggregated_data[email] = {
'buyer_first_name': res['buyer_name'].split(' ')[0],
'email': email,
'children': [child_name],
'links': [html_link]
}
else:
if child_name not in aggregated_data[email]['children']:
aggregated_data[email]['children'].append(child_name)
aggregated_data[email]['links'].append(html_link)
final_list = []
for email, data in aggregated_data.items():
names = data['children']
if len(names) > 2:
names_str = ', '.join(names[:-1]) + ' und ' + names[-1]
else:
names_str = ' und '.join(names)
final_list.append({
'Name Käufer': data['buyer_first_name'],
'E-Mail-Adresse Käufer': email,
'Kindernamen': names_str,
'LinksHTML': '<br><br>'.join(data['links'])
})
task_store[task_id] = {
"status": "completed",
"progress": "Analyse abgeschlossen!",
"result": final_list
}
except Exception as e:
logger.exception(f"Error in task {task_id}")
task_store[task_id] = {"status": "error", "progress": f"Fehler: {str(e)}"}
finally:
if driver: driver.quit()
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, UploadFile, File, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from database import get_db, Job as DBJob, engine, Base
import math
import uuid
from qr_generator import get_calendly_events, overlay_text_on_pdf, get_calendly_event_types
# --- API Endpoints ---
@app.get("/api/calendly/event-types")
async def fetch_calendly_event_types():
api_token = os.getenv("CALENDLY_TOKEN")
if not api_token:
raise HTTPException(status_code=400, detail="Calendly API token missing.")
try:
types = get_calendly_event_types(api_token)
return {"event_types": types}
except Exception as e:
logger.error(f"Error fetching Calendly event types: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/calendly/events")
async def fetch_calendly_events(start_time: str, end_time: str, event_type_name: Optional[str] = None):
"""
Debug endpoint to fetch and inspect raw Calendly data.
"""
api_token = os.getenv("CALENDLY_TOKEN")
if not api_token:
raise HTTPException(status_code=400, detail="Calendly API token missing.")
try:
from qr_generator import get_calendly_events_raw
raw_data = get_calendly_events_raw(api_token, start_time, end_time, event_type_name)
return {"count": len(raw_data), "events": raw_data}
except Exception as e:
logger.error(f"Error fetching Calendly events: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/qr-cards/generate")
async def generate_qr_cards(
start_time: str = Form(None),
end_time: str = Form(None),
event_type_name: str = Form(None),
pdf_file: UploadFile = File(...)
):
logger.info(f"API Request: Generate QR cards from {start_time} to {end_time} for event type '{event_type_name}'")
api_token = os.getenv("CALENDLY_TOKEN")
if not api_token:
raise HTTPException(status_code=400, detail="Calendly API token missing.")
try:
# Save uploaded PDF temporarily
temp_dir = tempfile.gettempdir()
base_pdf_path = os.path.join(temp_dir, f"upload_{uuid.uuid4()}.pdf")
with open(base_pdf_path, "wb") as buffer:
shutil.copyfileobj(pdf_file.file, buffer)
# 1. Fetch formatted data from Calendly
texts = get_calendly_events(api_token, start_time, end_time, event_type_name)
if not texts:
os.remove(base_pdf_path)
return JSONResponse(status_code=404, content={"message": "Keine passenden Termine gefunden."})
# 2. Overlay text on blank PDF
output_name = f"QR_Karten_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
output_path = os.path.join(temp_dir, output_name)
overlay_text_on_pdf(base_pdf_path, output_path, texts)
# Cleanup uploaded file
os.remove(base_pdf_path)
return FileResponse(path=output_path, filename=output_name, media_type="application/pdf")
except Exception as e:
logger.error(f"Error generating QR cards: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/jobs/{job_id}/appointment-list")
async def generate_appointment_list(job_id: str, event_type_name: str, db: Session = Depends(get_db)):
logger.info(f"API Request: Generate appointment list for job {job_id}, event_type '{event_type_name}'")
api_token = os.getenv("CALENDLY_TOKEN")
if not api_token:
raise HTTPException(status_code=400, detail="Calendly API token missing.")
# 1. Fetch job name from DB
job = db.query(DBJob).filter(DBJob.id == job_id).first()
job_name = job.name if job else f"Auftrag {job_id}"
# Clean job name: remove (JOB00005) or similar anywhere in string
import re
job_name_clean = re.sub(r'\(?JOB\d+\)?', '', job_name).strip()
# 2. Fetch raw Calendly events
try:
from qr_generator import get_calendly_events_raw
raw_events = get_calendly_events_raw(api_token, event_type_name=event_type_name)
except Exception as e:
logger.error(f"Error fetching raw Calendly events: {e}")
raise HTTPException(status_code=500, detail=str(e))
if not raw_events:
return JSONResponse(status_code=404, content={"message": "Keine passenden Termine für diesen Event-Typ gefunden."})
# 3. Generate PDF
temp_dir = tempfile.gettempdir()
output_name = f"Terminuebersicht_{job_id}_{datetime.datetime.now().strftime('%Y%m%d')}.pdf"
output_path = os.path.join(temp_dir, output_name)
try:
generate_appointment_overview_pdf(raw_events, job_name_clean, event_type_name, output_path)
return FileResponse(path=output_path, filename=output_name, media_type="application/pdf")
except Exception as e:
logger.error(f"Error generating appointment overview pdf: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {"status": "ok"}
@app.get("/api/jobs", response_model=List[Dict[str, Any]])
async def get_jobs(account_type: str, force_refresh: bool = False, db: Session = Depends(get_db)):
logger.info(f"API Request: GET /api/jobs for {account_type} (force_refresh={force_refresh})")
# 1. Check database first if not forcing a refresh
if not force_refresh:
cached_jobs = db.query(DBJob).filter(DBJob.account_type == account_type).all()
if cached_jobs:
logger.info(f"Returning {len(cached_jobs)} cached jobs for {account_type}")
return [
{
"id": job.id,
"name": job.name,
"url": job.url,
"status": job.status,
"date": job.date,
"shooting_type": job.shooting_type,
"last_updated": job.last_updated.isoformat() if job.last_updated else None
}
for job in cached_jobs
]
else:
logger.info(f"No cached jobs found for {account_type}. Initiating scrape...")
# 2. Scrape from fotograf.de if forcing refresh or no cached jobs
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
if not username or not password:
logger.error(f"Credentials for {account_type} not found in .env")
raise HTTPException(status_code=400, detail="Credentials not found.")
driver = None
try:
driver = setup_driver()
if not driver or not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
scraped_jobs = get_jobs_list(driver)
# 3. Save to database
if scraped_jobs:
logger.info(f"Saving {len(scraped_jobs)} jobs to database for {account_type}...")
# Clear old jobs for this account type
db.query(DBJob).filter(DBJob.account_type == account_type).delete()
# Insert new jobs
now = datetime.datetime.utcnow()
for job_data in scraped_jobs:
if job_data["id"]: # Ensure we have an ID
new_job = DBJob(
id=job_data["id"],
name=job_data["name"],
url=job_data["url"],
status=job_data["status"],
date=job_data["date"],
shooting_type=job_data["shooting_type"],
account_type=account_type,
last_updated=now
)
db.add(new_job)
# Update dict for return value
job_data["last_updated"] = now.isoformat()
db.commit()
logger.info("Database updated successfully.")
return scraped_jobs
except Exception as e:
logger.error(f"Error during scraping or database save: {e}")
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
if driver:
logger.debug("Closing driver.")
driver.quit()
@app.get("/api/tasks/{task_id}")
async def get_task_status(task_id: str):
logger.debug(f"API Request: Check task status for {task_id}")
if task_id not in task_store:
raise HTTPException(status_code=404, detail="Task nicht gefunden.")
return task_store[task_id]
@app.post("/api/jobs/{job_id}/statistics")
async def start_statistics(job_id: str, account_type: str, background_tasks: BackgroundTasks):
logger.info(f"API Request: Start statistics for job {job_id} ({account_type})")
task_id = str(uuid.uuid4())
background_tasks.add_task(process_statistics, task_id, job_id, account_type)
return {"task_id": task_id}
@app.post("/api/jobs/{job_id}/reminder-analysis")
async def start_reminder_analysis(job_id: str, account_type: str, background_tasks: BackgroundTasks):
logger.info(f"API Request: Start reminder analysis for job {job_id} ({account_type})")
task_id = str(uuid.uuid4())
background_tasks.add_task(process_reminder_analysis, task_id, job_id, account_type)
return {"task_id": task_id}
@app.get("/api/tasks/{task_id}/download-csv")
async def download_task_csv(task_id: str):
if task_id not in task_store or task_store[task_id]["status"] != "completed":
raise HTTPException(status_code=404, detail="Ergebnis nicht gefunden oder Task noch nicht abgeschlossen.")
result = task_store[task_id]["result"]
if not result or not isinstance(result, list):
raise HTTPException(status_code=400, detail="Keine Daten zum Exportieren vorhanden.")
try:
df = pd.DataFrame(result)
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
df.to_csv(temp_file.name, index=False, encoding='utf-8-sig')
return FileResponse(path=temp_file.name, filename=f"Supermailer_Liste_{task_id[:8]}.csv", media_type="text/csv")
except Exception as e:
logger.error(f"Export error: {e}")
raise HTTPException(status_code=500, detail="CSV Export fehlgeschlagen.")
@app.get("/api/jobs/{job_id}/generate-pdf")
async def generate_pdf(job_id: str, account_type: str):
logger.info(f"API Request: Generate PDF for job {job_id} ({account_type})")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
with tempfile.TemporaryDirectory() as temp_dir:
logger.debug(f"Using temp directory for download: {temp_dir}")
driver = setup_driver(download_path=temp_dir)
try:
if not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
# 1. Navigate to job settings page first
job_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
logger.info(f"Navigating to job main page: {job_url}")
driver.get(job_url)
wait = WebDriverWait(driver, 30)
# Get Institution Name for PDF
try:
institution = driver.find_element(By.TAG_NAME, "h1").text.strip()
logger.debug(f"Detected institution name: {institution}")
except:
institution = "Fotoauftrag"
# 1.5 Click on the "Personen" tab
logger.info("Clicking on 'Personen' tab...")
personen_tab = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "[data-qa-id='link:photo-jobs-tabs-names_list']")))
personen_tab.click()
# Wait for the export button to become present on the new tab
logger.info("Waiting for Export Dropdown...")
export_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_dropdown"])))
# Scroll to it and click via JS to avoid obscuring elements
driver.execute_script("arguments[0].scrollIntoView(true);", export_btn)
time.sleep(1)
logger.info("Clicking Export Dropdown...")
driver.execute_script("arguments[0].click();", export_btn)
logger.debug("Export dropdown clicked, waiting for menu items...")
time.sleep(2)
try:
csv_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_csv_link"])))
logger.info("CSV Export button found. Clicking...")
driver.execute_script("arguments[0].click();", csv_btn)
except TimeoutException:
logger.error("CSV Button not found after clicking dropdown.")
take_error_screenshot(driver, "csv_button_missing")
raise HTTPException(status_code=500, detail="CSV Export Button konnte nicht gefunden werden.")
# Wait for file to appear
logger.debug("Waiting for CSV file in download directory...")
timeout = 45
start_time = time.time()
csv_file = None
while time.time() - start_time < timeout:
files = os.listdir(temp_dir)
csv_files = [f for f in files if f.endswith('.csv')]
if csv_files:
csv_file = os.path.join(temp_dir, csv_files[0])
logger.info(f"Download complete: {csv_file}")
break
time.sleep(1)
if not csv_file:
logger.error(f"Download timed out after {timeout} seconds.")
take_error_screenshot(driver, "download_timeout")
raise HTTPException(status_code=500, detail="CSV Download fehlgeschlagen.")
output_pdf_name = f"Listen_{job_id}.pdf"
output_pdf_path = os.path.join(temp_dir, output_pdf_name)
generate_pdf_from_csv(
csv_path=csv_file,
institution=institution,
date_info=datetime.datetime.now().strftime("%d.%m.%Y"),
list_type=account_type,
output_path=output_pdf_path
)
final_storage = os.path.join("/tmp", output_pdf_name)
logger.info(f"PDF successfully generated. Copying to {final_storage}")
shutil.copy(output_pdf_path, final_storage)
return FileResponse(path=final_storage, filename=output_pdf_name, media_type="application/pdf")
except HTTPException as he:
raise he
except Exception as e:
logger.exception("Unexpected error during PDF generation")
raise HTTPException(status_code=500, detail=str(e))
finally:
if driver:
logger.debug("Closing driver.")
driver.quit()