Files
Brancheneinstufung2/fotograf-de-scraper/backend/main.py

322 lines
12 KiB
Python

import os
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from typing import List, Dict, Any, Optional
import time
import datetime
import base64
import re
import pandas as pd
from jinja2 import Environment, FileSystemLoader
from weasyprint import HTML
import tempfile
import shutil
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# Load environment variables
load_dotenv()
app = FastAPI(title="Fotograf.de Scraper & ERP API")
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Configuration & Constants ---
LOGIN_URL = 'https://app.fotograf.de/login/login'
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"dashboard_jobs_table_rows": "//tr[.//a[contains(@data-qa-id, 'link:photo-jobs-name-')]]",
"job_row_name_link": ".//a[contains(@data-qa-id, 'link:photo-jobs-name-')]",
"job_row_status": ".//td[count(//th[contains(., 'Status')]/preceding-sibling::th) + 1]",
"job_row_date": ".//td[count(//th[contains(., 'Datum')]/preceding-sibling::th) + 1]",
"job_row_shooting_type": ".//td[count(//th[contains(., 'Typ')]/preceding-sibling::th) + 1]",
"export_dropdown": "[data-qa-id='dropdown:export']",
"export_csv_link": "//a[contains(text(), 'CSV') or contains(., 'CSV')]", # Common pattern for CSV export in dropdowns
}
# --- PDF Generation Logic (Reused from List-Generator) ---
def get_logo_base64():
logo_path = os.path.join(os.path.dirname(__file__), "assets", "logo.png")
try:
with open(logo_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
print(f"Warning: Logo file not found at {logo_path}")
return None
def generate_pdf_from_csv(csv_path: str, institution: str, date_info: str, list_type: str, output_path: str):
df = None
# Try different separators
for sep in [";", ","]:
try:
test_df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig", nrows=5)
if len(test_df.columns) > 1:
df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig")
break
except Exception:
continue
if df is None:
try:
df = pd.read_csv(csv_path, sep=";", encoding="latin1")
except:
raise Exception("CSV konnte nicht gelesen werden.")
df.columns = df.columns.str.strip().str.replace("\"", "")
group_label = "Gruppe" if list_type == 'k' else "Klasse"
person_label_plural = "Kinder" if list_type == 'k' else "Schüler"
col_mapping = {}
for col in df.columns:
lower_col = col.lower().strip()
if lower_col in ["vorname kind", "vorname", "first name"]:
col_mapping[col] = "Vorname"
elif lower_col in ["nachname kind", "nachname", "last name"]:
col_mapping[col] = "Nachname"
elif lower_col in ["gruppe", "klasse", "group", "class"]:
col_mapping[col] = group_label
df = df.rename(columns=col_mapping)
df = df.fillna("")
for col in ["Vorname", "Nachname", group_label]:
if col not in df.columns:
df[col] = "Alle" if col == group_label else ""
df = df.sort_values(by=[group_label, "Nachname", "Vorname"])
grouped = df.groupby(group_label)
class_data = []
for class_name, group in grouped:
class_data.append({"name": class_name, "students": group.to_dict("records")})
class_counts = [{"name": c, "count": len(g)} for c, g in grouped]
total_students = len(df)
template_dir = os.path.join(os.path.dirname(__file__), "templates")
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template("school_list.html")
current_time = datetime.datetime.now().strftime("%d.%m.%Y %H:%M Uhr")
logo_base64 = get_logo_base64()
render_context = {
"institution": institution,
"date_info": date_info,
"class_counts": class_counts,
"total_students": total_students,
"class_data": class_data,
"current_time": current_time,
"logo_base64": logo_base64,
"group_label": group_label,
"person_label_plural": person_label_plural,
"group_column_name": group_label
}
html_out = template.render(render_context)
HTML(string=html_out).write_pdf(output_path)
# --- Selenium Scraper Functions ---
def setup_driver(download_path: str = None):
print("Initialisiere Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/chromium'
if download_path:
prefs = {
"download.default_directory": download_path,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True
}
options.add_experimental_option("prefs", prefs)
try:
driver = webdriver.Chrome(options=options)
if download_path:
# Crucial for headless mode: Allow downloads
driver.execute_cdp_cmd('Page.setDownloadBehavior', {
'behavior': 'allow',
'downloadPath': download_path
})
return driver
except Exception as e:
print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
return None
def login(driver, username, password):
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 45)
try:
cookie_wait = WebDriverWait(driver, 5)
cookie_wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click()
time.sleep(1)
except:
pass
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
wait.until(EC.url_contains('/config_dashboard/index'))
return True
except Exception as e:
print(f"Login fehlgeschlagen: {e}")
return False
def get_jobs_list(driver) -> List[Dict[str, Any]]:
jobs_list_url = "https://app.fotograf.de/config_jobs/index"
driver.get(jobs_list_url)
wait = WebDriverWait(driver, 45)
jobs = []
try:
job_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["dashboard_jobs_table_rows"])))
for row in job_rows:
try:
name_element = row.find_element(By.XPATH, SELECTORS["job_row_name_link"])
job_name = name_element.text.strip()
job_url = name_element.get_attribute('href')
job_id_match = re.search(r'/(\d+)$', job_url)
job_id = job_id_match.group(1) if job_id_match else None
status_element = row.find_element(By.XPATH, SELECTORS["job_row_status"])
job_status = status_element.text.strip()
date_element = row.find_element(By.XPATH, SELECTORS["job_row_date"])
job_date = date_element.text.strip()
type_element = row.find_element(By.XPATH, SELECTORS["job_row_shooting_type"])
shooting_type = type_element.text.strip()
jobs.append({
"id": job_id,
"name": job_name,
"url": job_url,
"status": job_status,
"date": job_date,
"shooting_type": shooting_type,
})
except:
continue
except:
pass
return jobs
# --- API Endpoints ---
@app.get("/health")
async def health_check():
return {"status": "ok"}
@app.get("/api/jobs", response_model=List[Dict[str, Any]])
async def get_jobs(account_type: str):
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
if not username or not password:
raise HTTPException(status_code=400, detail="Credentials not found.")
driver = None
try:
driver = setup_driver()
if not driver or not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
return get_jobs_list(driver)
finally:
if driver: driver.quit()
@app.get("/api/jobs/{job_id}/generate-pdf")
async def generate_pdf(job_id: str, account_type: str):
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
with tempfile.TemporaryDirectory() as temp_dir:
driver = setup_driver(download_path=temp_dir)
try:
if not login(driver, username, password):
raise HTTPException(status_code=401, detail="Login failed.")
# 1. Navigate to registrations page
reg_url = f"https://app.fotograf.de/config_children/view_registrations/{job_id}"
print(f"Navigiere zu Registrierungen: {reg_url}")
driver.get(reg_url)
wait = WebDriverWait(driver, 30)
# Get Institution Name for PDF
try:
institution = driver.find_element(By.TAG_NAME, "h1").text.strip()
except:
institution = "Fotoauftrag"
# 2. Click Export and trigger CSV download
print("Trigger Export...")
export_btn = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["export_dropdown"])))
export_btn.click()
time.sleep(1) # Wait for menu
csv_btn = wait.until(EC.element_to_be_clickable((By.XPATH, SELECTORS["export_csv_link"])))
csv_btn.click()
# 3. Wait for download to finish
print("Warte auf CSV Download...")
timeout = 30
start_time = time.time()
csv_file = None
while time.time() - start_time < timeout:
files = os.listdir(temp_dir)
csv_files = [f for f in files if f.endswith('.csv')]
if csv_files:
csv_file = os.path.join(temp_dir, csv_files[0])
break
time.sleep(1)
if not csv_file:
raise HTTPException(status_code=500, detail="CSV Download fehlgeschlagen.")
# 4. Generate PDF
print(f"Generiere PDF aus {csv_file}...")
output_pdf_name = f"Listen_{job_id}.pdf"
output_pdf_path = os.path.join(temp_dir, output_pdf_name)
generate_pdf_from_csv(
csv_path=csv_file,
institution=institution,
date_info=datetime.datetime.now().strftime("%d.%m.%Y"),
list_type=account_type, # 'k' or 'schule'
output_path=output_pdf_path
)
# 5. Return PDF
final_storage = os.path.join("/tmp", output_pdf_name)
shutil.copy(output_pdf_path, final_storage)
return FileResponse(path=final_storage, filename=output_pdf_name, media_type="application/pdf")
except Exception as e:
print(f"Fehler bei PDF Generierung: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
if driver: driver.quit()