[32788f42] feat: implement database persistence, modernized UI with Tailwind, and Calendly-integrated QR card generator for Fotograf.de scraper
BIN
fotograf-de-scraper/backend/data/blank.pdf
Normal file
32
fotograf-de-scraper/backend/database.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from sqlalchemy import create_engine, Column, Integer, String, DateTime
|
||||
from sqlalchemy.orm import declarative_base, sessionmaker
|
||||
import datetime
|
||||
import os
|
||||
|
||||
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:////app/data/fotograf_jobs.db")
|
||||
|
||||
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
class Job(Base):
|
||||
__tablename__ = "jobs"
|
||||
|
||||
id = Column(String, primary_key=True, index=True)
|
||||
name = Column(String, index=True)
|
||||
url = Column(String)
|
||||
status = Column(String)
|
||||
date = Column(String)
|
||||
shooting_type = Column(String)
|
||||
account_type = Column(String, index=True) # 'kiga' or 'schule'
|
||||
last_updated = Column(DateTime, default=datetime.datetime.utcnow)
|
||||
|
||||
Base.metadata.create_all(bind=engine)
|
||||
|
||||
def get_db():
|
||||
db = SessionLocal()
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
||||
|
After Width: | Height: | Size: 76 KiB |
|
After Width: | Height: | Size: 119 KiB |
|
After Width: | Height: | Size: 119 KiB |
|
After Width: | Height: | Size: 119 KiB |
|
After Width: | Height: | Size: 31 KiB |
|
After Width: | Height: | Size: 31 KiB |
|
After Width: | Height: | Size: 105 KiB |
|
After Width: | Height: | Size: 185 KiB |
@@ -10,10 +10,14 @@ import tempfile
|
||||
import shutil
|
||||
import time
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import FileResponse
|
||||
from typing import List, Dict, Any, Optional
|
||||
from sqlalchemy.orm import Session
|
||||
from database import get_db, Job as DBJob, engine, Base
|
||||
import math
|
||||
import uuid
|
||||
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
@@ -35,6 +39,9 @@ logger = logging.getLogger("fotograf-scraper")
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Ensure DB is created
|
||||
Base.metadata.create_all(bind=engine)
|
||||
|
||||
app = FastAPI(title="Fotograf.de Scraper & ERP API")
|
||||
|
||||
# Configure CORS
|
||||
@@ -61,6 +68,14 @@ SELECTORS = {
|
||||
"job_row_shooting_type": ".//td[count(//th[contains(., 'Typ')]/preceding-sibling::th) + 1]",
|
||||
"export_dropdown": "[data-qa-id='dropdown:export']",
|
||||
"export_csv_link": "button[data-qa-id='button:csv']",
|
||||
# --- Statistics Selectors ---
|
||||
"album_overview_rows": "//table/tbody/tr",
|
||||
"album_overview_link": ".//td[2]//a",
|
||||
"access_code_count": "//span[text()='Zugangscodes']/following-sibling::strong",
|
||||
"person_rows": "//div[contains(@class, 'border-legacy-silver-550') and .//span[text()='Logins']]",
|
||||
"person_all_photos": ".//div[@data-key]",
|
||||
"person_purchased_photos": ".//div[@data-key and .//img[@alt='Bestellungen mit diesem Foto']]",
|
||||
"person_access_card_photo": ".//div[@data-key and contains(@class, 'opacity-50')]",
|
||||
}
|
||||
|
||||
# --- PDF Generation Logic ---
|
||||
@@ -278,15 +293,214 @@ def get_jobs_list(driver) -> List[Dict[str, Any]]:
|
||||
|
||||
return jobs
|
||||
|
||||
# --- Background Task Engine ---
|
||||
|
||||
task_store: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
def process_statistics(task_id: str, job_id: str, account_type: str):
|
||||
logger.info(f"Task {task_id}: Starting statistics calculation for job {job_id}")
|
||||
task_store[task_id] = {"status": "running", "progress": "Initialisiere Browser...", "result": None}
|
||||
|
||||
username = os.getenv(f"{account_type.upper()}_USER")
|
||||
password = os.getenv(f"{account_type.upper()}_PW")
|
||||
driver = None
|
||||
|
||||
try:
|
||||
driver = setup_driver()
|
||||
if not driver or not login(driver, username, password):
|
||||
task_store[task_id] = {"status": "error", "progress": "Login fehlgeschlagen. Überprüfe die Zugangsdaten."}
|
||||
return
|
||||
|
||||
task_store[task_id]["progress"] = f"Lade Alben-Übersicht für Auftrag..."
|
||||
|
||||
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
|
||||
logger.info(f"Navigating to albums: {albums_overview_url}")
|
||||
driver.get(albums_overview_url)
|
||||
wait = WebDriverWait(driver, 15)
|
||||
|
||||
albums_to_visit = []
|
||||
try:
|
||||
album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"])))
|
||||
for row in album_rows:
|
||||
try:
|
||||
album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"])
|
||||
albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')})
|
||||
except NoSuchElementException:
|
||||
continue
|
||||
except TimeoutException:
|
||||
task_store[task_id] = {"status": "error", "progress": "Konnte die Album-Liste nicht finden."}
|
||||
return
|
||||
|
||||
total_albums = len(albums_to_visit)
|
||||
task_store[task_id]["progress"] = f"{total_albums} Alben gefunden. Starte Auswertung..."
|
||||
|
||||
statistics = []
|
||||
|
||||
for index, album in enumerate(albums_to_visit):
|
||||
album_name = album['name']
|
||||
task_store[task_id]["progress"] = f"Bearbeite Album {index + 1}/{total_albums}: '{album_name}'..."
|
||||
driver.get(album['url'])
|
||||
|
||||
try:
|
||||
total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text
|
||||
num_pages = math.ceil(int(total_codes_text) / 20)
|
||||
|
||||
total_children_in_album = 0
|
||||
children_with_purchase = 0
|
||||
children_with_all_purchased = 0
|
||||
|
||||
for page_num in range(1, num_pages + 1):
|
||||
task_store[task_id]["progress"] = f"Bearbeite Album {index + 1}/{total_albums}: '{album_name}' (Seite {page_num}/{num_pages})..."
|
||||
|
||||
if page_num > 1:
|
||||
driver.get(album['url'] + f"?page_guest_accesses={page_num}")
|
||||
|
||||
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
|
||||
|
||||
for person_row in person_rows:
|
||||
total_children_in_album += 1
|
||||
try:
|
||||
photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]")
|
||||
|
||||
num_total_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_all_photos"]))
|
||||
num_purchased_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_purchased_photos"]))
|
||||
num_access_cards = len(photo_container.find_elements(By.XPATH, SELECTORS["person_access_card_photo"]))
|
||||
|
||||
buyable_photos = num_total_photos - num_access_cards
|
||||
|
||||
if num_purchased_photos > 0:
|
||||
children_with_purchase += 1
|
||||
|
||||
if buyable_photos > 0 and buyable_photos == num_purchased_photos:
|
||||
children_with_all_purchased += 1
|
||||
except NoSuchElementException:
|
||||
continue
|
||||
|
||||
statistics.append({
|
||||
"Album": album_name,
|
||||
"Kinder_insgesamt": total_children_in_album,
|
||||
"Kinder_mit_Käufen": children_with_purchase,
|
||||
"Kinder_Alle_Bilder_gekauft": children_with_all_purchased
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei Auswertung von Album '{album_name}': {e}")
|
||||
continue
|
||||
|
||||
task_store[task_id] = {
|
||||
"status": "completed",
|
||||
"progress": "Auswertung erfolgreich abgeschlossen!",
|
||||
"result": statistics
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unexpected error in task {task_id}")
|
||||
task_store[task_id] = {"status": "error", "progress": f"Unerwarteter Fehler: {str(e)}"}
|
||||
finally:
|
||||
if driver:
|
||||
logger.debug(f"Task {task_id}: Closing driver.")
|
||||
driver.quit()
|
||||
|
||||
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, UploadFile, File, Form
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import FileResponse, JSONResponse
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
from database import get_db, Job as DBJob, engine, Base
|
||||
import math
|
||||
import uuid
|
||||
from qr_generator import get_calendly_events, overlay_text_on_pdf
|
||||
|
||||
# --- API Endpoints ---
|
||||
|
||||
@app.get("/api/calendly/events")
|
||||
async def fetch_calendly_events(start_time: str, end_time: str, event_type_name: Optional[str] = None):
|
||||
"""
|
||||
Debug endpoint to fetch and inspect raw Calendly data.
|
||||
"""
|
||||
api_token = os.getenv("CALENDLY_TOKEN")
|
||||
if not api_token:
|
||||
raise HTTPException(status_code=400, detail="Calendly API token missing.")
|
||||
|
||||
try:
|
||||
from qr_generator import get_calendly_events_raw
|
||||
raw_data = get_calendly_events_raw(api_token, start_time, end_time, event_type_name)
|
||||
return {"count": len(raw_data), "events": raw_data}
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching Calendly events: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/api/qr-cards/generate")
|
||||
async def generate_qr_cards(
|
||||
start_time: str = Form(...),
|
||||
end_time: str = Form(...),
|
||||
event_type_name: str = Form(None),
|
||||
pdf_file: UploadFile = File(...)
|
||||
):
|
||||
logger.info(f"API Request: Generate QR cards from {start_time} to {end_time} for event type '{event_type_name}'")
|
||||
api_token = os.getenv("CALENDLY_TOKEN")
|
||||
if not api_token:
|
||||
raise HTTPException(status_code=400, detail="Calendly API token missing.")
|
||||
|
||||
try:
|
||||
# Save uploaded PDF temporarily
|
||||
temp_dir = tempfile.gettempdir()
|
||||
base_pdf_path = os.path.join(temp_dir, f"upload_{uuid.uuid4()}.pdf")
|
||||
with open(base_pdf_path, "wb") as buffer:
|
||||
shutil.copyfileobj(pdf_file.file, buffer)
|
||||
|
||||
# 1. Fetch formatted data from Calendly
|
||||
texts = get_calendly_events(api_token, start_time, end_time, event_type_name)
|
||||
if not texts:
|
||||
os.remove(base_pdf_path)
|
||||
return JSONResponse(status_code=404, content={"message": "Keine passenden Termine gefunden."})
|
||||
|
||||
# 2. Overlay text on blank PDF
|
||||
output_name = f"QR_Karten_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
|
||||
output_path = os.path.join(temp_dir, output_name)
|
||||
|
||||
overlay_text_on_pdf(base_pdf_path, output_path, texts)
|
||||
|
||||
# Cleanup uploaded file
|
||||
os.remove(base_pdf_path)
|
||||
|
||||
return FileResponse(path=output_path, filename=output_name, media_type="application/pdf")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating QR cards: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
return {"status": "ok"}
|
||||
|
||||
@app.get("/api/jobs", response_model=List[Dict[str, Any]])
|
||||
async def get_jobs(account_type: str):
|
||||
logger.info(f"API Request: GET /api/jobs for {account_type}")
|
||||
async def get_jobs(account_type: str, force_refresh: bool = False, db: Session = Depends(get_db)):
|
||||
logger.info(f"API Request: GET /api/jobs for {account_type} (force_refresh={force_refresh})")
|
||||
|
||||
# 1. Check database first if not forcing a refresh
|
||||
if not force_refresh:
|
||||
cached_jobs = db.query(DBJob).filter(DBJob.account_type == account_type).all()
|
||||
if cached_jobs:
|
||||
logger.info(f"Returning {len(cached_jobs)} cached jobs for {account_type}")
|
||||
return [
|
||||
{
|
||||
"id": job.id,
|
||||
"name": job.name,
|
||||
"url": job.url,
|
||||
"status": job.status,
|
||||
"date": job.date,
|
||||
"shooting_type": job.shooting_type,
|
||||
"last_updated": job.last_updated.isoformat() if job.last_updated else None
|
||||
}
|
||||
for job in cached_jobs
|
||||
]
|
||||
else:
|
||||
logger.info(f"No cached jobs found for {account_type}. Initiating scrape...")
|
||||
|
||||
# 2. Scrape from fotograf.de if forcing refresh or no cached jobs
|
||||
username = os.getenv(f"{account_type.upper()}_USER")
|
||||
password = os.getenv(f"{account_type.upper()}_PW")
|
||||
if not username or not password:
|
||||
@@ -298,12 +512,61 @@ async def get_jobs(account_type: str):
|
||||
driver = setup_driver()
|
||||
if not driver or not login(driver, username, password):
|
||||
raise HTTPException(status_code=401, detail="Login failed.")
|
||||
return get_jobs_list(driver)
|
||||
|
||||
scraped_jobs = get_jobs_list(driver)
|
||||
|
||||
# 3. Save to database
|
||||
if scraped_jobs:
|
||||
logger.info(f"Saving {len(scraped_jobs)} jobs to database for {account_type}...")
|
||||
# Clear old jobs for this account type
|
||||
db.query(DBJob).filter(DBJob.account_type == account_type).delete()
|
||||
|
||||
# Insert new jobs
|
||||
now = datetime.datetime.utcnow()
|
||||
for job_data in scraped_jobs:
|
||||
if job_data["id"]: # Ensure we have an ID
|
||||
new_job = DBJob(
|
||||
id=job_data["id"],
|
||||
name=job_data["name"],
|
||||
url=job_data["url"],
|
||||
status=job_data["status"],
|
||||
date=job_data["date"],
|
||||
shooting_type=job_data["shooting_type"],
|
||||
account_type=account_type,
|
||||
last_updated=now
|
||||
)
|
||||
db.add(new_job)
|
||||
|
||||
# Update dict for return value
|
||||
job_data["last_updated"] = now.isoformat()
|
||||
|
||||
db.commit()
|
||||
logger.info("Database updated successfully.")
|
||||
|
||||
return scraped_jobs
|
||||
except Exception as e:
|
||||
logger.error(f"Error during scraping or database save: {e}")
|
||||
db.rollback()
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
finally:
|
||||
if driver:
|
||||
logger.debug("Closing driver.")
|
||||
driver.quit()
|
||||
|
||||
@app.get("/api/tasks/{task_id}")
|
||||
async def get_task_status(task_id: str):
|
||||
logger.debug(f"API Request: Check task status for {task_id}")
|
||||
if task_id not in task_store:
|
||||
raise HTTPException(status_code=404, detail="Task nicht gefunden.")
|
||||
return task_store[task_id]
|
||||
|
||||
@app.post("/api/jobs/{job_id}/statistics")
|
||||
async def start_statistics(job_id: str, account_type: str, background_tasks: BackgroundTasks):
|
||||
logger.info(f"API Request: Start statistics for job {job_id} ({account_type})")
|
||||
task_id = str(uuid.uuid4())
|
||||
background_tasks.add_task(process_statistics, task_id, job_id, account_type)
|
||||
return {"task_id": task_id}
|
||||
|
||||
@app.get("/api/jobs/{job_id}/generate-pdf")
|
||||
async def generate_pdf(job_id: str, account_type: str):
|
||||
logger.info(f"API Request: Generate PDF for job {job_id} ({account_type})")
|
||||
|
||||
200
fotograf-de-scraper/backend/qr_generator.py
Normal file
@@ -0,0 +1,200 @@
|
||||
import os
|
||||
import requests
|
||||
import io
|
||||
import datetime
|
||||
from reportlab.pdfgen import canvas
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.pdfbase import pdfmetrics
|
||||
from reportlab.pdfbase.ttfonts import TTFont
|
||||
from PyPDF2 import PdfReader, PdfWriter
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger("qr-card-generator")
|
||||
|
||||
def get_calendly_events_raw(api_token: str, start_time: str, end_time: str, event_type_name: str = None):
|
||||
"""
|
||||
Debug function to fetch raw Calendly data without formatting.
|
||||
"""
|
||||
headers = {
|
||||
'Authorization': f'Bearer {api_token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 1. Get current user info to get the user URI
|
||||
user_url = "https://api.calendly.com/users/me"
|
||||
user_response = requests.get(user_url, headers=headers)
|
||||
if not user_response.ok:
|
||||
raise Exception(f"Calendly API Error: {user_response.status_code}")
|
||||
|
||||
user_data = user_response.json()
|
||||
user_uri = user_data['resource']['uri']
|
||||
|
||||
# 2. Get events for the user
|
||||
events_url = "https://api.calendly.com/scheduled_events"
|
||||
params = {
|
||||
'user': user_uri,
|
||||
'min_start_time': start_time,
|
||||
'max_start_time': end_time,
|
||||
'status': 'active'
|
||||
}
|
||||
|
||||
events_response = requests.get(events_url, headers=headers, params=params)
|
||||
if not events_response.ok:
|
||||
raise Exception(f"Calendly API Error: {events_response.status_code}")
|
||||
|
||||
events_data = events_response.json()
|
||||
events = events_data['collection']
|
||||
|
||||
raw_results = []
|
||||
|
||||
# 3. Get invitees
|
||||
for event in events:
|
||||
event_name = event.get('name', '')
|
||||
# Filter by event type if provided
|
||||
if event_type_name and event_type_name.lower() not in event_name.lower():
|
||||
continue
|
||||
|
||||
event_uri = event['uri']
|
||||
event_uuid = event_uri.split('/')[-1]
|
||||
invitees_url = f"https://api.calendly.com/scheduled_events/{event_uuid}/invitees"
|
||||
|
||||
invitees_response = requests.get(invitees_url, headers=headers)
|
||||
if not invitees_response.ok:
|
||||
continue
|
||||
|
||||
invitees_data = invitees_response.json()
|
||||
|
||||
for invitee in invitees_data['collection']:
|
||||
raw_results.append({
|
||||
"event_name": event_name,
|
||||
"start_time": event['start_time'],
|
||||
"invitee_name": invitee['name'],
|
||||
"invitee_email": invitee['email'],
|
||||
"questions_and_answers": invitee.get('questions_and_answers', [])
|
||||
})
|
||||
|
||||
return raw_results
|
||||
|
||||
def get_calendly_events(api_token: str, start_time: str, end_time: str, event_type_name: str = None):
|
||||
"""
|
||||
Fetches events from Calendly API for the current user within a time range.
|
||||
"""
|
||||
raw_data = get_calendly_events_raw(api_token, start_time, end_time, event_type_name)
|
||||
formatted_data = []
|
||||
|
||||
for item in raw_data:
|
||||
# Parse start time
|
||||
start_dt = datetime.datetime.fromisoformat(item['start_time'].replace('Z', '+00:00'))
|
||||
# Format as HH:MM
|
||||
time_str = start_dt.strftime('%H:%M')
|
||||
|
||||
name = item['invitee_name']
|
||||
|
||||
# Extract specific answers from the Calendly form
|
||||
# We look for the number of children and any additional notes
|
||||
num_children = ""
|
||||
additional_notes = ""
|
||||
questions_and_answers = item.get('questions_and_answers', [])
|
||||
|
||||
for q_a in questions_and_answers:
|
||||
q_text = q_a.get('question', '').lower()
|
||||
a_text = q_a.get('answer', '')
|
||||
|
||||
if "wie viele kinder" in q_text:
|
||||
num_children = a_text
|
||||
elif "nachricht" in q_text or "anmerkung" in q_text:
|
||||
# If there's a custom notes field in some events
|
||||
additional_notes = a_text
|
||||
|
||||
# Construct the final string: "Name, X Kinder // HH:MM Uhr (Notes)"
|
||||
# matching: Halime Türe, 1 Kind // 12:00 Uhr
|
||||
final_text = f"{name}"
|
||||
if num_children:
|
||||
final_text += f", {num_children}"
|
||||
|
||||
final_text += f" // {time_str} Uhr"
|
||||
|
||||
if additional_notes:
|
||||
final_text += f" ({additional_notes})"
|
||||
|
||||
formatted_data.append(final_text)
|
||||
|
||||
logger.info(f"Processed {len(formatted_data)} invitees.")
|
||||
return formatted_data
|
||||
|
||||
|
||||
def overlay_text_on_pdf(base_pdf_path: str, output_pdf_path: str, texts: list):
|
||||
"""
|
||||
Overlays text from the `texts` list onto a base PDF.
|
||||
Expects two text entries per page (top and bottom element).
|
||||
Coordinates are in mm from bottom-left (ReportLab default).
|
||||
Target:
|
||||
Element 1: X: 72mm, Y: 22mm (from top-left in user spec, need to convert)
|
||||
Element 2: X: 72mm, Y: 171mm (from top-left in user spec, need to convert)
|
||||
"""
|
||||
|
||||
# Convert mm to points (1 mm = 2.83465 points)
|
||||
mm_to_pt = 2.83465
|
||||
|
||||
# A4 dimensions in points (approx 595.27 x 841.89)
|
||||
page_width, page_height = A4
|
||||
|
||||
# User coordinates are from top-left.
|
||||
# ReportLab uses bottom-left as (0,0).
|
||||
# Element 1 (Top): X = 72mm, Y = 22mm (from top) -> Y = page_height - 22mm
|
||||
# Element 2 (Bottom): X = 72mm, Y = 171mm (from top) -> Y = page_height - 171mm
|
||||
|
||||
x_pos = 72 * mm_to_pt
|
||||
y_pos_1 = page_height - (22 * mm_to_pt)
|
||||
y_pos_2 = page_height - (171 * mm_to_pt)
|
||||
|
||||
reader = PdfReader(base_pdf_path)
|
||||
writer = PdfWriter()
|
||||
|
||||
total_pages = len(reader.pages)
|
||||
max_capacity = total_pages * 2
|
||||
|
||||
if len(texts) > max_capacity:
|
||||
logger.warning(f"Not enough pages in base PDF. Have {len(texts)} invitees but only space for {max_capacity}. Truncating.")
|
||||
texts = texts[:max_capacity]
|
||||
|
||||
# We need to process pairs of texts for each page
|
||||
text_pairs = [texts[i:i+2] for i in range(0, len(texts), 2)]
|
||||
|
||||
for page_idx, pair in enumerate(text_pairs):
|
||||
if page_idx >= total_pages:
|
||||
break # Should be caught by the truncation above, but safety first
|
||||
|
||||
# Create a new blank page in memory to draw the text
|
||||
packet = io.BytesIO()
|
||||
can = canvas.Canvas(packet, pagesize=A4)
|
||||
|
||||
# Draw the text.
|
||||
can.setFont("Helvetica", 12)
|
||||
|
||||
if len(pair) > 0:
|
||||
can.drawString(x_pos, y_pos_1, pair[0])
|
||||
if len(pair) > 1:
|
||||
can.drawString(x_pos, y_pos_2, pair[1])
|
||||
|
||||
can.save()
|
||||
packet.seek(0)
|
||||
|
||||
# Read the text PDF we just created
|
||||
new_pdf = PdfReader(packet)
|
||||
text_page = new_pdf.pages[0]
|
||||
|
||||
# Get the specific page from the original PDF
|
||||
page_to_merge = reader.pages[page_idx]
|
||||
page_to_merge.merge_page(text_page)
|
||||
|
||||
writer.add_page(page_to_merge)
|
||||
|
||||
# If there are pages left in the base PDF that we didn't use, append them too?
|
||||
# Usually you'd want to keep them or discard them. We'll discard unused pages for now
|
||||
# to avoid empty cards, or you can change this loop to include them.
|
||||
|
||||
with open(output_pdf_path, "wb") as output_file:
|
||||
writer.write(output_file)
|
||||
|
||||
logger.info(f"Successfully generated overlaid PDF at {output_pdf_path}")
|
||||
@@ -7,3 +7,7 @@ pandas==2.2.2
|
||||
weasyprint==62.1
|
||||
jinja2==3.1.4
|
||||
pydyf==0.10.0
|
||||
sqlalchemy==2.0.31
|
||||
requests==2.31.0
|
||||
reportlab==4.0.9
|
||||
PyPDF2==3.0.1
|
||||
|
||||