Files
Brancheneinstufung2/fotograf-de-scraper/backend/main.py

383 lines
15 KiB
Python

import os
import logging
import datetime
import base64
import re
import pandas as pd
from jinja2 import Environment, FileSystemLoader
from weasyprint import HTML
import tempfile
import shutil
import time
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from typing import List, Dict, Any, Optional
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# --- Logging Configuration ---
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger("fotograf-scraper")
# Load environment variables
load_dotenv()
app = FastAPI(title="Fotograf.de Scraper & ERP API")
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Configuration & Constants ---
LOGIN_URL = 'https://app.fotograf.de/login/login'
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"dashboard_jobs_table_rows": "//tr[.//a[contains(@data-qa-id, 'link:photo-jobs-name-')]]",
"job_row_name_link": ".//a[contains(@data-qa-id, 'link:photo-jobs-name-')]",
"job_row_status": ".//td[count(//th[contains(., 'Status')]/preceding-sibling::th) + 1]",
"job_row_date": ".//td[count(//th[contains(., 'Datum')]/preceding-sibling::th) + 1]",
"job_row_shooting_type": ".//td[count(//th[contains(., 'Typ')]/preceding-sibling::th) + 1]",
"export_dropdown": "[data-qa-id='dropdown:export']",
"export_csv_link": "//a[contains(text(), 'CSV') or contains(., 'CSV')]",
}
# --- PDF Generation Logic ---
def get_logo_base64():
logo_path = os.path.join(os.path.dirname(__file__), "assets", "logo.png")
logger.debug(f"Loading logo from: {logo_path}")
try:
with open(logo_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
logger.warning(f"Logo file not found at {logo_path}")
return None
def generate_pdf_from_csv(csv_path: str, institution: str, date_info: str, list_type: str, output_path: str):
logger.info(f"Generating PDF for {institution} from {csv_path}")
df = None
for sep in [';', ',']:
try:
logger.debug(f"Trying CSV separator: '{sep}'")
test_df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig", nrows=5)
if len(test_df.columns) > 1:
df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig")
logger.debug(f"Successfully read CSV with separator '{sep}'")
break
except Exception as e:
logger.debug(f"Failed to read with separator '{sep}': {e}")
continue
if df is None:
logger.error("Could not read CSV with standard separators.")
try:
df = pd.read_csv(csv_path, sep=";", encoding="latin1")
logger.info("Fallback to latin1 encoding successful.")
except:
raise Exception("CSV konnte nicht gelesen werden.")
df.columns = df.columns.str.strip().str.replace("\"", "")
logger.debug(f"CSV Columns: {list(df.columns)}")
group_label = "Gruppe" if list_type == 'k' else "Klasse"
person_label_plural = "Kinder" if list_type == 'k' else "Schüler"
col_mapping = {}
for col in df.columns:
lower_col = col.lower().strip()
if lower_col in ["vorname kind", "vorname", "first name"]:
col_mapping[col] = "Vorname"
elif lower_col in ["nachname kind", "nachname", "last name"]:
col_mapping[col] = "Nachname"
elif lower_col in ["gruppe", "klasse", "group", "class"]:
col_mapping[col] = group_label
df = df.rename(columns=col_mapping)
df = df.fillna("")
for col in ["Vorname", "Nachname", group_label]:
if col not in df.columns:
logger.warning(f"Column '{col}' not found in CSV, using default values.")
df[col] = "Alle" if col == group_label else ""
df = df.sort_values(by=[group_label, "Nachname", "Vorname"])
grouped = df.groupby(group_label)
class_data = []
for class_name, group in grouped:
class_data.append({"name": class_name, "students": group.to_dict("records")})
class_counts = [{"name": c, "count": len(g)} for c, g in grouped]
total_students = len(df)
template_dir = os.path.join(os.path.dirname(__file__), "templates")
logger.debug(f"Using template directory: {template_dir}")
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template("school_list.html")
current_time = datetime.datetime.now().strftime("%d.%m.%Y %H:%M Uhr")
logo_base64 = get_logo_base64()
render_context = {
"institution": institution,
"date_info": date_info,
"class_counts": class_counts,
"total_students": total_students,
"class_data": class_data,
"current_time": current_time,
"logo_base64": logo_base64,
"group_label": group_label,
"person_label_plural": person_label_plural,
"group_column_name": group_label
}
logger.debug("Rendering HTML template...")
html_out = template.render(render_context)
logger.info(f"Writing PDF to: {output_path}")
HTML(string=html_out).write_pdf(output_path)
# --- Selenium Scraper Functions ---
def take_error_screenshot(driver, error_name):
errors_dir = os.path.join(os.path.dirname(__file__), 'errors')
os.makedirs(errors_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(errors_dir, filename)
try:
driver.save_screenshot(filepath)
logger.error(f"!!! Error screenshot saved to: {filepath}")
except Exception as e:
logger.error(f"!!! Could not save screenshot: {e}")
def setup_driver(download_path: str = None):
logger.info("Initializing Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/chromium'
if download_path:
logger.debug(f"Configuring download path: {download_path}")
prefs = {
"download.default_directory": download_path,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True
}
options.add_experimental_option("prefs", prefs)
try:
driver = webdriver.Chrome(options=options)
if download_path:
logger.debug("Allowing downloads in headless mode via CDP...")
driver.execute_cdp_cmd('Page.setDownloadBehavior', {
'behavior': 'allow',
'downloadPath': download_path
})
return driver
except Exception as e:
logger.error(f"Failed to initialize WebDriver: {e}")
return None
def login(driver, username, password):
logger.info(f"Starting login process for user: {username}")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 30)
try:
logger.debug("Checking for cookie banner...")
cookie_wait = WebDriverWait(driver, 5)
cookie_wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click()
logger.info("Cookie banner accepted.")
except:
logger.debug("No cookie banner found.")
logger.debug("Entering credentials...")
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
logger.info("Clicking login button...")
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
logger.info("Waiting for dashboard redirect...")
wait.until(EC.url_contains('/config_dashboard/index'))
logger.info("Login successful!")
return True
except Exception as e:
logger.error(f"Login failed: {e}")
take_error_screenshot(driver, "login_error")
return False
def get_jobs_list(driver) -> List[Dict[str, Any]]:
jobs_list_url = "https://app.fotograf.de/config_jobs/index"
logger.info(f"Navigating to jobs list: {jobs_list_url}")
driver.get(jobs_list_url)
wait = WebDriverWait(driver, 30)
jobs = []
try:
logger.debug("Waiting for job rows to appear...")
job_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["dashboard_jobs_table_rows"])))
logger.info(f"Found {len(job_rows)} job rows.")
for row in job_rows:
try:
name_element = row.find_element(By.XPATH, SELECTORS["job_row_name_link"])
job_name = name_element.text.strip()
job_url = name_element.get_attribute('href')
job_id_match = re.search(r'/(\d+)$', job_url)
job_id = job_id_match.group(1) if job_id_match else None
logger.debug(f"Parsing job: {job_name} (ID: {job_id})")
status_element = row.find_element(By.XPATH, SELECTORS["job_row_status"])
job_status = status_element.text.strip()
date_element = row.find_element(By.XPATH, SELECTORS["job_row_date"])
job_date = date_element.text.strip()
type_element = row.find_element(By.XPATH, SELECTORS["job_row_shooting_type"])
shooting_type = type_element.text.strip()
jobs.append({
"id": job_id,
"name": job_name,
"url": job_url,
"status": job_status,
"date": job_date,
"shooting_type": shooting_type,
})
except Exception as e:
logger.warning(f"Error parsing single job row: {e}")
continue
except Exception as e:
logger.error(f"Error retrieving job list: {e}")
take_error_screenshot(driver, "job_list_error")
return jobs
# --- API Endpoints ---
@app.get("/health")
async def health_check():
return {"status": "ok"}
@app.get("/api/jobs", response_model=List[Dict[str, Any]])
async def get_jobs(account_type: str):
logger.info(f"API Request: GET /api/jobs for {account_type}")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
if not username or not password:
logger.error(f"Credentials for {account_type} not found in .env")
raise HTTPException(status_code=400, detail="Credentials not found.")
driver = None
try:
driver = setup_driver()
if not driver or not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
return get_jobs_list(driver)
finally:
if driver:
logger.debug("Closing driver.")
driver.quit()
@app.get("/api/jobs/{job_id}/generate-pdf")
async def generate_pdf(job_id: str, account_type: str):
logger.info(f"API Request: Generate PDF for job {job_id} ({account_type})")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
with tempfile.TemporaryDirectory() as temp_dir:
logger.debug(f"Using temp directory for download: {temp_dir}")
driver = setup_driver(download_path=temp_dir)
try:
if not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
reg_url = f"https://app.fotograf.de/config_children/view_registrations/{job_id}"
logger.info(f"Navigating to registrations page: {reg_url}")
driver.get(reg_url)
wait = WebDriverWait(driver, 30)
try:
institution = driver.find_element(By.TAG_NAME, "h1").text.strip()
logger.debug(f"Detected institution name: {institution}")
except:
institution = "Fotoauftrag"
logger.info("Triggering CSV Export...")
export_btn = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["export_dropdown"])))
export_btn.click()
logger.debug("Export dropdown clicked, waiting for menu items...")
time.sleep(1)
csv_btn = wait.until(EC.element_to_be_clickable((By.XPATH, SELECTORS["export_csv_link"])))
csv_btn.click()
logger.info("CSV Export link clicked.")
# Wait for file to appear
logger.debug("Waiting for CSV file in download directory...")
timeout = 45
start_time = time.time()
csv_file = None
while time.time() - start_time < timeout:
files = os.listdir(temp_dir)
csv_files = [f for f in files if f.endswith('.csv')]
if csv_files:
csv_file = os.path.join(temp_dir, csv_files[0])
logger.info(f"Download complete: {csv_file}")
break
time.sleep(1)
if not csv_file:
logger.error(f"Download timed out after {timeout} seconds.")
take_error_screenshot(driver, "download_timeout")
raise HTTPException(status_code=500, detail="CSV Download fehlgeschlagen.")
output_pdf_name = f"Listen_{job_id}.pdf"
output_pdf_path = os.path.join(temp_dir, output_pdf_name)
generate_pdf_from_csv(
csv_path=csv_file,
institution=institution,
date_info=datetime.datetime.now().strftime("%d.%m.%Y"),
list_type=account_type,
output_path=output_pdf_path
)
final_storage = os.path.join("/tmp", output_pdf_name)
logger.info(f"PDF successfully generated. Copying to {final_storage}")
shutil.copy(output_pdf_path, final_storage)
return FileResponse(path=final_storage, filename=output_pdf_name, media_type="application/pdf")
except Exception as e:
logger.exception("Unexpected error during PDF generation")
raise HTTPException(status_code=500, detail=str(e))
finally:
if driver:
logger.debug("Closing driver.")
driver.quit()