Files
Brancheneinstufung2/fotograf-de-scraper/backend/main.py

403 lines
16 KiB
Python

import os
import logging
import datetime
import base64
import re
import pandas as pd
from jinja2 import Environment, FileSystemLoader
from weasyprint import HTML
import tempfile
import shutil
import time
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from typing import List, Dict, Any, Optional
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# --- Logging Configuration ---
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger("fotograf-scraper")
# Load environment variables
load_dotenv()
app = FastAPI(title="Fotograf.de Scraper & ERP API")
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Configuration & Constants ---
LOGIN_URL = 'https://app.fotograf.de/login/login'
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"dashboard_jobs_table_rows": "//tr[.//a[contains(@data-qa-id, 'link:photo-jobs-name-')]]",
"job_row_name_link": ".//a[contains(@data-qa-id, 'link:photo-jobs-name-')]",
"job_row_status": ".//td[count(//th[contains(., 'Status')]/preceding-sibling::th) + 1]",
"job_row_date": ".//td[count(//th[contains(., 'Datum')]/preceding-sibling::th) + 1]",
"job_row_shooting_type": ".//td[count(//th[contains(., 'Typ')]/preceding-sibling::th) + 1]",
"export_dropdown": "[data-qa-id='dropdown:export']",
"export_csv_link": "button[data-qa-id='button:csv']",
}
# --- PDF Generation Logic ---
def get_logo_base64():
logo_path = os.path.join(os.path.dirname(__file__), "assets", "logo.png")
logger.debug(f"Loading logo from: {logo_path}")
try:
with open(logo_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
logger.warning(f"Logo file not found at {logo_path}")
return None
def generate_pdf_from_csv(csv_path: str, institution: str, date_info: str, list_type: str, output_path: str):
logger.info(f"Generating PDF for {institution} from {csv_path}")
df = None
for sep in [";", ","]:
try:
logger.debug(f"Trying CSV separator: '{sep}'")
test_df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig", nrows=5)
if len(test_df.columns) > 1:
df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig")
logger.debug(f"Successfully read CSV with separator '{sep}'")
break
except Exception as e:
logger.debug(f"Failed to read with separator '{sep}': {e}")
continue
if df is None:
logger.error("Could not read CSV with standard separators.")
try:
df = pd.read_csv(csv_path, sep=";", encoding="latin1")
logger.info("Fallback to latin1 encoding successful.")
except:
raise Exception("CSV konnte nicht gelesen werden.")
df.columns = df.columns.str.strip().str.replace("\"", "")
logger.debug(f"CSV Columns: {list(df.columns)}")
group_label = "Gruppe" if list_type == 'k' else "Klasse"
person_label_plural = "Kinder" if list_type == 'k' else "Schüler"
col_mapping = {}
for col in df.columns:
lower_col = col.lower().strip()
if lower_col in ["vorname kind", "vorname", "first name"]:
col_mapping[col] = "Vorname"
elif lower_col in ["nachname kind", "nachname", "last name"]:
col_mapping[col] = "Nachname"
elif lower_col in ["gruppe", "klasse", "group", "class"]:
col_mapping[col] = group_label
df = df.rename(columns=col_mapping)
df = df.fillna("")
for col in ["Vorname", "Nachname", group_label]:
if col not in df.columns:
logger.warning(f"Column '{col}' not found in CSV, using default values.")
df[col] = "Alle" if col == group_label else ""
df = df.sort_values(by=[group_label, "Nachname", "Vorname"])
grouped = df.groupby(group_label)
class_data = []
for class_name, group in grouped:
class_data.append({"name": class_name, "students": group.to_dict("records")})
class_counts = [{"name": c, "count": len(g)} for c, g in grouped]
total_students = len(df)
template_dir = os.path.join(os.path.dirname(__file__), "templates")
logger.debug(f"Using template directory: {template_dir}")
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template("school_list.html")
current_time = datetime.datetime.now().strftime("%d.%m.%Y %H:%M Uhr")
logo_base64 = get_logo_base64()
render_context = {
"institution": institution,
"date_info": date_info,
"class_counts": class_counts,
"total_students": total_students,
"class_data": class_data,
"current_time": current_time,
"logo_base64": logo_base64,
"group_label": group_label,
"person_label_plural": person_label_plural,
"group_column_name": group_label
}
logger.debug("Rendering HTML template...")
html_out = template.render(render_context)
logger.info(f"Writing PDF to: {output_path}")
HTML(string=html_out).write_pdf(output_path)
# --- Selenium Scraper Functions ---
def take_error_screenshot(driver, error_name):
errors_dir = os.path.join(os.path.dirname(__file__), 'errors')
os.makedirs(errors_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(errors_dir, filename)
try:
driver.save_screenshot(filepath)
logger.error(f"!!! Error screenshot saved to: {filepath}")
except Exception as e:
logger.error(f"!!! Could not save screenshot: {e}")
def setup_driver(download_path: str = None):
logger.info("Initializing Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/chromium'
if download_path:
logger.debug(f"Configuring download path: {download_path}")
prefs = {
"download.default_directory": download_path,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True
}
options.add_experimental_option("prefs", prefs)
try:
driver = webdriver.Chrome(options=options)
if download_path:
logger.debug("Allowing downloads in headless mode via CDP...")
driver.execute_cdp_cmd('Page.setDownloadBehavior', {
'behavior': 'allow',
'downloadPath': download_path
})
return driver
except Exception as e:
logger.error(f"Failed to initialize WebDriver: {e}")
return None
def login(driver, username, password):
logger.info(f"Starting login process for user: {username}")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 30)
try:
logger.debug("Checking for cookie banner...")
cookie_wait = WebDriverWait(driver, 5)
cookie_wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click()
logger.info("Cookie banner accepted.")
except:
logger.debug("No cookie banner found.")
logger.debug("Entering credentials...")
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
logger.info("Clicking login button...")
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
logger.info("Waiting for dashboard redirect...")
wait.until(EC.url_contains('/config_dashboard/index'))
logger.info("Login successful!")
return True
except Exception as e:
logger.error(f"Login failed: {e}")
take_error_screenshot(driver, "login_error")
return False
def get_jobs_list(driver) -> List[Dict[str, Any]]:
jobs_list_url = "https://app.fotograf.de/config_jobs/index"
logger.info(f"Navigating to jobs list: {jobs_list_url}")
driver.get(jobs_list_url)
wait = WebDriverWait(driver, 30)
jobs = []
try:
logger.debug("Waiting for job rows to appear...")
job_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["dashboard_jobs_table_rows"])))
logger.info(f"Found {len(job_rows)} job rows.")
for row in job_rows:
try:
name_element = row.find_element(By.XPATH, SELECTORS["job_row_name_link"])
job_name = name_element.text.strip()
job_url = name_element.get_attribute('href')
job_id_match = re.search(r'/(\d+)$', job_url)
job_id = job_id_match.group(1) if job_id_match else None
logger.debug(f"Parsing job: {job_name} (ID: {job_id})")
status_element = row.find_element(By.XPATH, SELECTORS["job_row_status"])
job_status = status_element.text.strip()
date_element = row.find_element(By.XPATH, SELECTORS["job_row_date"])
job_date = date_element.text.strip()
type_element = row.find_element(By.XPATH, SELECTORS["job_row_shooting_type"])
shooting_type = type_element.text.strip()
jobs.append({
"id": job_id,
"name": job_name,
"url": job_url,
"status": job_status,
"date": job_date,
"shooting_type": shooting_type,
})
except Exception as e:
logger.warning(f"Error parsing single job row: {e}")
continue
except Exception as e:
logger.error(f"Error retrieving job list: {e}")
take_error_screenshot(driver, "job_list_error")
return jobs
# --- API Endpoints ---
@app.get("/health")
async def health_check():
return {"status": "ok"}
@app.get("/api/jobs", response_model=List[Dict[str, Any]])
async def get_jobs(account_type: str):
logger.info(f"API Request: GET /api/jobs for {account_type}")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
if not username or not password:
logger.error(f"Credentials for {account_type} not found in .env")
raise HTTPException(status_code=400, detail="Credentials not found.")
driver = None
try:
driver = setup_driver()
if not driver or not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
return get_jobs_list(driver)
finally:
if driver:
logger.debug("Closing driver.")
driver.quit()
@app.get("/api/jobs/{job_id}/generate-pdf")
async def generate_pdf(job_id: str, account_type: str):
logger.info(f"API Request: Generate PDF for job {job_id} ({account_type})")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
with tempfile.TemporaryDirectory() as temp_dir:
logger.debug(f"Using temp directory for download: {temp_dir}")
driver = setup_driver(download_path=temp_dir)
try:
if not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
# 1. Navigate to job settings page first
job_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
logger.info(f"Navigating to job main page: {job_url}")
driver.get(job_url)
wait = WebDriverWait(driver, 30)
# Get Institution Name for PDF
try:
institution = driver.find_element(By.TAG_NAME, "h1").text.strip()
logger.debug(f"Detected institution name: {institution}")
except:
institution = "Fotoauftrag"
# 1.5 Click on the "Personen" tab
logger.info("Clicking on 'Personen' tab...")
personen_tab = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "[data-qa-id='link:photo-jobs-tabs-names_list']")))
personen_tab.click()
# Wait for the export button to become present on the new tab
logger.info("Waiting for Export Dropdown...")
export_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_dropdown"])))
# Scroll to it and click via JS to avoid obscuring elements
driver.execute_script("arguments[0].scrollIntoView(true);", export_btn)
time.sleep(1)
logger.info("Clicking Export Dropdown...")
driver.execute_script("arguments[0].click();", export_btn)
logger.debug("Export dropdown clicked, waiting for menu items...")
time.sleep(2)
try:
csv_btn = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, SELECTORS["export_csv_link"])))
logger.info("CSV Export button found. Clicking...")
driver.execute_script("arguments[0].click();", csv_btn)
except TimeoutException:
logger.error("CSV Button not found after clicking dropdown.")
take_error_screenshot(driver, "csv_button_missing")
raise HTTPException(status_code=500, detail="CSV Export Button konnte nicht gefunden werden.")
# Wait for file to appear
logger.debug("Waiting for CSV file in download directory...")
timeout = 45
start_time = time.time()
csv_file = None
while time.time() - start_time < timeout:
files = os.listdir(temp_dir)
csv_files = [f for f in files if f.endswith('.csv')]
if csv_files:
csv_file = os.path.join(temp_dir, csv_files[0])
logger.info(f"Download complete: {csv_file}")
break
time.sleep(1)
if not csv_file:
logger.error(f"Download timed out after {timeout} seconds.")
take_error_screenshot(driver, "download_timeout")
raise HTTPException(status_code=500, detail="CSV Download fehlgeschlagen.")
output_pdf_name = f"Listen_{job_id}.pdf"
output_pdf_path = os.path.join(temp_dir, output_pdf_name)
generate_pdf_from_csv(
csv_path=csv_file,
institution=institution,
date_info=datetime.datetime.now().strftime("%d.%m.%Y"),
list_type=account_type,
output_path=output_pdf_path
)
final_storage = os.path.join("/tmp", output_pdf_name)
logger.info(f"PDF successfully generated. Copying to {final_storage}")
shutil.copy(output_pdf_path, final_storage)
return FileResponse(path=final_storage, filename=output_pdf_name, media_type="application/pdf")
except HTTPException as he:
raise he
except Exception as e:
logger.exception("Unexpected error during PDF generation")
raise HTTPException(status_code=500, detail=str(e))
finally:
if driver:
logger.debug("Closing driver.")
driver.quit()