import os import logging import datetime import base64 import re import pandas as pd from jinja2 import Environment, FileSystemLoader from weasyprint import HTML import tempfile import shutil import time from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from typing import List, Dict, Any, Optional from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException # --- Logging Configuration --- logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger("fotograf-scraper") # Load environment variables load_dotenv() app = FastAPI(title="Fotograf.de Scraper & ERP API") # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --- Configuration & Constants --- LOGIN_URL = 'https://app.fotograf.de/login/login' SELECTORS = { "cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll", "login_user": "#login-email", "login_pass": "#login-password", "login_button": "#login-submit", "dashboard_jobs_table_rows": "//tr[.//a[contains(@data-qa-id, 'link:photo-jobs-name-')]]", "job_row_name_link": ".//a[contains(@data-qa-id, 'link:photo-jobs-name-')]", "job_row_status": ".//td[count(//th[contains(., 'Status')]/preceding-sibling::th) + 1]", "job_row_date": ".//td[count(//th[contains(., 'Datum')]/preceding-sibling::th) + 1]", "job_row_shooting_type": ".//td[count(//th[contains(., 'Typ')]/preceding-sibling::th) + 1]", "export_dropdown": "[data-qa-id='dropdown:export']", "export_csv_link": "button[data-qa-id='button:csv']", } # --- PDF Generation Logic --- def get_logo_base64(): logo_path = os.path.join(os.path.dirname(__file__), "assets", "logo.png") logger.debug(f"Loading logo from: {logo_path}") try: with open(logo_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') except FileNotFoundError: logger.warning(f"Logo file not found at {logo_path}") return None def generate_pdf_from_csv(csv_path: str, institution: str, date_info: str, list_type: str, output_path: str): logger.info(f"Generating PDF for {institution} from {csv_path}") df = None for sep in [";", ","]: try: logger.debug(f"Trying CSV separator: '{sep}'") test_df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig", nrows=5) if len(test_df.columns) > 1: df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig") logger.debug(f"Successfully read CSV with separator '{sep}'") break except Exception as e: logger.debug(f"Failed to read with separator '{sep}': {e}") continue if df is None: logger.error("Could not read CSV with standard separators.") try: df = pd.read_csv(csv_path, sep=";", encoding="latin1") logger.info("Fallback to latin1 encoding successful.") except: raise Exception("CSV konnte nicht gelesen werden.") df.columns = df.columns.str.strip().str.replace("\"", "") logger.debug(f"CSV Columns: {list(df.columns)}") group_label = "Gruppe" if list_type == 'k' else "Klasse" person_label_plural = "Kinder" if list_type == 'k' else "Schüler" col_mapping = {} for col in df.columns: lower_col = col.lower().strip() if lower_col in ["vorname kind", "vorname", "first name"]: col_mapping[col] = "Vorname" elif lower_col in ["nachname kind", "nachname", "last name"]: col_mapping[col] = "Nachname" elif lower_col in ["gruppe", "klasse", "group", "class"]: col_mapping[col] = group_label df = df.rename(columns=col_mapping) df = df.fillna("") for col in ["Vorname", "Nachname", group_label]: if col not in df.columns: logger.warning(f"Column '{col}' not found in CSV, using default values.") df[col] = "Alle" if col == group_label else "" df = df.sort_values(by=[group_label, "Nachname", "Vorname"]) grouped = df.groupby(group_label) class_data = [] for class_name, group in grouped: class_data.append({"name": class_name, "students": group.to_dict("records")}) class_counts = [{"name": c, "count": len(g)} for c, g in grouped] total_students = len(df) template_dir = os.path.join(os.path.dirname(__file__), "templates") logger.debug(f"Using template directory: {template_dir}") env = Environment(loader=FileSystemLoader(template_dir)) template = env.get_template("school_list.html") current_time = datetime.datetime.now().strftime("%d.%m.%Y %H:%M Uhr") logo_base64 = get_logo_base64() render_context = { "institution": institution, "date_info": date_info, "class_counts": class_counts, "total_students": total_students, "class_data": class_data, "current_time": current_time, "logo_base64": logo_base64, "group_label": group_label, "person_label_plural": person_label_plural, "group_column_name": group_label } logger.debug("Rendering HTML template...") html_out = template.render(render_context) logger.info(f"Writing PDF to: {output_path}") HTML(string=html_out).write_pdf(output_path) # --- Selenium Scraper Functions --- def take_error_screenshot(driver, error_name): errors_dir = os.path.join(os.path.dirname(__file__), 'errors') os.makedirs(errors_dir, exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"error_{error_name}_{timestamp}.png" filepath = os.path.join(errors_dir, filename) try: driver.save_screenshot(filepath) logger.error(f"!!! Error screenshot saved to: {filepath}") except Exception as e: logger.error(f"!!! Could not save screenshot: {e}") def setup_driver(download_path: str = None): logger.info("Initializing Chrome WebDriver...") options = Options() options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--window-size=1920,1200') options.binary_location = '/usr/bin/chromium' if download_path: logger.debug(f"Configuring download path: {download_path}") prefs = { "download.default_directory": download_path, "download.prompt_for_download": False, "download.directory_upgrade": True, "safebrowsing.enabled": True } options.add_experimental_option("prefs", prefs) try: driver = webdriver.Chrome(options=options) if download_path: logger.debug("Allowing downloads in headless mode via CDP...") driver.execute_cdp_cmd('Page.setDownloadBehavior', { 'behavior': 'allow', 'downloadPath': download_path }) return driver except Exception as e: logger.error(f"Failed to initialize WebDriver: {e}") return None def login(driver, username, password): logger.info(f"Starting login process for user: {username}") try: driver.get(LOGIN_URL) wait = WebDriverWait(driver, 30) try: logger.debug("Checking for cookie banner...") cookie_wait = WebDriverWait(driver, 5) cookie_wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click() logger.info("Cookie banner accepted.") except: logger.debug("No cookie banner found.") logger.debug("Entering credentials...") wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username) driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password) logger.info("Clicking login button...") driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click() logger.info("Waiting for dashboard redirect...") wait.until(EC.url_contains('/config_dashboard/index')) logger.info("Login successful!") return True except Exception as e: logger.error(f"Login failed: {e}") take_error_screenshot(driver, "login_error") return False def get_jobs_list(driver) -> List[Dict[str, Any]]: jobs_list_url = "https://app.fotograf.de/config_jobs/index" logger.info(f"Navigating to jobs list: {jobs_list_url}") driver.get(jobs_list_url) wait = WebDriverWait(driver, 30) jobs = [] try: logger.debug("Waiting for job rows to appear...") job_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["dashboard_jobs_table_rows"]))) logger.info(f"Found {len(job_rows)} job rows.") for row in job_rows: try: name_element = row.find_element(By.XPATH, SELECTORS["job_row_name_link"]) job_name = name_element.text.strip() job_url = name_element.get_attribute('href') job_id_match = re.search(r'/(\d+)$', job_url) job_id = job_id_match.group(1) if job_id_match else None logger.debug(f"Parsing job: {job_name} (ID: {job_id})") status_element = row.find_element(By.XPATH, SELECTORS["job_row_status"]) job_status = status_element.text.strip() date_element = row.find_element(By.XPATH, SELECTORS["job_row_date"]) job_date = date_element.text.strip() type_element = row.find_element(By.XPATH, SELECTORS["job_row_shooting_type"]) shooting_type = type_element.text.strip() jobs.append({ "id": job_id, "name": job_name, "url": job_url, "status": job_status, "date": job_date, "shooting_type": shooting_type, }) except Exception as e: logger.warning(f"Error parsing single job row: {e}") continue except Exception as e: logger.error(f"Error retrieving job list: {e}") take_error_screenshot(driver, "job_list_error") return jobs # --- API Endpoints --- @app.get("/health") async def health_check(): return {"status": "ok"} @app.get("/api/jobs", response_model=List[Dict[str, Any]]) async def get_jobs(account_type: str): logger.info(f"API Request: GET /api/jobs for {account_type}") username = os.getenv(f"{account_type.upper()}_USER") password = os.getenv(f"{account_type.upper()}_PW") if not username or not password: logger.error(f"Credentials for {account_type} not found in .env") raise HTTPException(status_code=400, detail="Credentials not found.") driver = None try: driver = setup_driver() if not driver or not login(driver, username, password): raise HTTPException(status_code=401, detail="Login failed.") return get_jobs_list(driver) finally: if driver: logger.debug("Closing driver.") driver.quit() @app.get("/api/jobs/{job_id}/generate-pdf") async def generate_pdf(job_id: str, account_type: str): logger.info(f"API Request: Generate PDF for job {job_id} ({account_type})") username = os.getenv(f"{account_type.upper()}_USER") password = os.getenv(f"{account_type.upper()}_PW") with tempfile.TemporaryDirectory() as temp_dir: logger.debug(f"Using temp directory for download: {temp_dir}") driver = setup_driver(download_path=temp_dir) try: if not login(driver, username, password): raise HTTPException(status_code=401, detail="Login failed.") # 1. Navigate to job settings page first job_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}" logger.info(f"Navigating to job main page: {job_url}") driver.get(job_url) wait = WebDriverWait(driver, 30) # Get Institution Name for PDF try: institution = driver.find_element(By.TAG_NAME, "h1").text.strip() logger.debug(f"Detected institution name: {institution}") except: institution = "Fotoauftrag" # 1.5 Click on the "Personen" tab logger.info("Clicking on 'Personen' tab...") personen_tab = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "[data-qa-id='link:photo-jobs-tabs-names_list']"))) personen_tab.click() # Wait for the export button to become present on the new tab logger.info("Waiting for Export Dropdown...") export_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_dropdown"]))) # Scroll to it and click via JS to avoid obscuring elements driver.execute_script("arguments[0].scrollIntoView(true);", export_btn) time.sleep(1) logger.info("Clicking Export Dropdown...") driver.execute_script("arguments[0].click();", export_btn) logger.debug("Export dropdown clicked, waiting for menu items...") time.sleep(2) try: csv_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_csv_link"]))) logger.info("CSV Export button found. Clicking...") driver.execute_script("arguments[0].click();", csv_btn) except TimeoutException: logger.error("CSV Button not found after clicking dropdown.") take_error_screenshot(driver, "csv_button_missing") raise HTTPException(status_code=500, detail="CSV Export Button konnte nicht gefunden werden.") # Wait for file to appear logger.debug("Waiting for CSV file in download directory...") timeout = 45 start_time = time.time() csv_file = None while time.time() - start_time < timeout: files = os.listdir(temp_dir) csv_files = [f for f in files if f.endswith('.csv')] if csv_files: csv_file = os.path.join(temp_dir, csv_files[0]) logger.info(f"Download complete: {csv_file}") break time.sleep(1) if not csv_file: logger.error(f"Download timed out after {timeout} seconds.") take_error_screenshot(driver, "download_timeout") raise HTTPException(status_code=500, detail="CSV Download fehlgeschlagen.") output_pdf_name = f"Listen_{job_id}.pdf" output_pdf_path = os.path.join(temp_dir, output_pdf_name) generate_pdf_from_csv( csv_path=csv_file, institution=institution, date_info=datetime.datetime.now().strftime("%d.%m.%Y"), list_type=account_type, output_path=output_pdf_path ) final_storage = os.path.join("/tmp", output_pdf_name) logger.info(f"PDF successfully generated. Copying to {final_storage}") shutil.copy(output_pdf_path, final_storage) return FileResponse(path=final_storage, filename=output_pdf_name, media_type="application/pdf") except HTTPException as he: raise he except Exception as e: logger.exception("Unexpected error during PDF generation") raise HTTPException(status_code=500, detail=str(e)) finally: if driver: logger.debug("Closing driver.") driver.quit()