[34288f42] Feature: Add 'Skip Calendly' option for siblings list generation

This commit is contained in:
2026-05-04 06:53:32 +00:00
parent db94eca626
commit 991e338d67
3 changed files with 507 additions and 70 deletions

View File

@@ -10,13 +10,14 @@ from weasyprint import HTML
import tempfile
import shutil
import time
import json
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from database import get_db, Job as DBJob, engine, Base, JobParticipant, SessionLocal
from database import get_db, Job as DBJob, engine, Base, JobParticipant, SessionLocal, ReminderHistory
import math
import uuid
@@ -116,6 +117,8 @@ SELECTORS = {
"job_row_shooting_type": ".//td[count(//th[contains(., 'Typ')]/preceding-sibling::th) + 1]",
"export_dropdown": "[data-qa-id='dropdown:export']",
"export_csv_link": "button[data-qa-id='button:csv']",
# --- Reminder & Quick Login Selectors ---
"person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]",
# --- Statistics Selectors ---
"album_overview_rows": "//table/tbody/tr",
"album_overview_link": ".//td[2]//a",
@@ -535,7 +538,7 @@ def process_statistics(task_id: str, job_id: str, account_type: str):
finally:
db.close()
def process_reminder_analysis(task_id: str, job_id: str, account_type: str):
def process_reminder_analysis(task_id: str, job_id: str, account_type: str, max_logins: int = 1, exclude_purchased_emails: bool = True):
logger.info(f"Task {task_id}: Starting fast reminder analysis for job {job_id}")
task_store[task_id] = {"status": "running", "progress": "Analysiere Datenbank-Einträge...", "result": None}
@@ -547,12 +550,25 @@ def process_reminder_analysis(task_id: str, job_id: str, account_type: str):
task_store[task_id] = {"status": "error", "progress": "Keine Daten vorhanden. Bitte erst oben auf 'Daten abgleichen' klicken."}
return
# Query DB for potential candidates (Logins <= 1 and No Orders)
# 1. Get emails that have ALREADY purchased anything (in ANY job we have in DB)
purchased_emails = set()
if exclude_purchased_emails:
from sqlalchemy import or_
# We look globally across the whole job_participants table
purchased_results = db.query(JobParticipant.email_eltern).filter(
or_(JobParticipant.has_orders == 1, JobParticipant.digital_package_ordered == 1),
JobParticipant.email_eltern != "",
JobParticipant.email_eltern != None
).all()
purchased_emails = {r[0].lower() for r in purchased_results}
logger.info(f"Task {task_id}: Found {len(purchased_emails)} unique emails with existing purchases in DB to exclude.")
# 2. Query DB for potential candidates (Logins <= max_logins and No Orders)
candidates = db.query(JobParticipant).filter(
JobParticipant.job_id == job_id,
JobParticipant.has_orders == 0,
JobParticipant.logins <= 1,
JobParticipant.digital_package_ordered == 0,
JobParticipant.logins <= max_logins,
JobParticipant.email_eltern != "",
JobParticipant.email_eltern != None
).all()
@@ -560,15 +576,28 @@ def process_reminder_analysis(task_id: str, job_id: str, account_type: str):
if not candidates:
task_store[task_id] = {
"status": "completed",
"progress": "Keine passenden Empfänger (0-1 Logins, keine Bestellung) gefunden.",
"progress": f"Keine passenden Empfänger (0-{max_logins} Logins, keine Bestellung) gefunden.",
"result": []
}
return
# 3. Aggregate results by Email
aggregation = {}
missing_links_count = 0
for c in candidates:
email = c.email_eltern
email = c.email_eltern.lower()
# Skip if this email already has a purchase for ANOTHER child
if exclude_purchased_emails and email in purchased_emails:
continue
# STRICT LINK CHECK: If we don't have a scraped Quick Login URL, skip this child.
# We don't want to send broken /login/access/ links.
if not c.quick_login_url:
missing_links_count += 1
continue
if email not in aggregation:
aggregation[email] = {
"email": email,
@@ -583,9 +612,8 @@ def process_reminder_analysis(task_id: str, job_id: str, account_type: str):
if child_label and child_label not in aggregation[email]["children"]:
aggregation[email]["children"].append(child_label)
# Add Quick Login Link
link = f"https://www.kinderfotos-erding.de/a/{c.zugangscode}"
html_link = f'<a href="{link}">Fotos von {child_label}</a>'
# Add Quick Login Link (Guaranteed to exist here)
html_link = f'<a href="{c.quick_login_url}">Fotos von {child_label}</a>'
if html_link not in aggregation[email]["links"]:
aggregation[email]["links"].append(html_link)
@@ -603,9 +631,13 @@ def process_reminder_analysis(task_id: str, job_id: str, account_type: str):
"LinksHTML": links_html
})
progress_msg = f"Analyse fertig! {len(final_result)} Empfänger identifiziert."
if missing_links_count > 0:
progress_msg += f" (Hinweis: {missing_links_count} Kinder ignoriert, da Quick-Login-Link fehlt. Bitte vorher 'Daten abgleichen' drücken!)"
task_store[task_id] = {
"status": "completed",
"progress": f"Analyse fertig! {len(final_result)} Empfänger identifiziert.",
"progress": progress_msg,
"result": final_result
}
@@ -904,10 +936,16 @@ async def start_statistics(job_id: str, account_type: str, background_tasks: Bac
return {"task_id": task_id}
@app.post("/api/jobs/{job_id}/reminder-analysis")
async def start_reminder_analysis(job_id: str, account_type: str, background_tasks: BackgroundTasks):
logger.info(f"API Request: Start reminder analysis for job {job_id} ({account_type})")
async def start_reminder_analysis(
job_id: str,
account_type: str,
background_tasks: BackgroundTasks,
max_logins: int = 1,
exclude_purchased_emails: bool = True
):
logger.info(f"API Request: Start reminder analysis for job {job_id} ({account_type}, max_logins={max_logins}, exclude_purchased={exclude_purchased_emails})")
task_id = str(uuid.uuid4())
background_tasks.add_task(process_reminder_analysis, task_id, job_id, account_type)
background_tasks.add_task(process_reminder_analysis, task_id, job_id, account_type, max_logins, exclude_purchased_emails)
return {"task_id": task_id}
@app.get("/api/tasks/{task_id}/download-csv")
@@ -960,7 +998,7 @@ async def send_bulk_emails(request: BulkEmailRequest, db: Session = Depends(get_
"failed": failed_emails
}
def sync_participants(job_id: str, account_type: str, db: Session):
def sync_participants(job_id: str, account_type: str, db: Session, task_id: str = None):
logger.info(f"Syncing participants for job {job_id} ({account_type})")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
@@ -972,6 +1010,7 @@ def sync_participants(job_id: str, account_type: str, db: Session):
raise Exception("Login failed.")
# Navigate to the Persons tab
if task_id: task_store[task_id]["progress"] = "Hole Teilnehmerliste (CSV)..."
job_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
driver.get(job_url)
wait = WebDriverWait(driver, 30)
@@ -1028,6 +1067,8 @@ def sync_participants(job_id: str, account_type: str, db: Session):
"Klasse": "gruppe"
}
if task_id: task_store[task_id]["progress"] = "Aktualisiere Datenbank..."
# Upsert into database
for _, row in df.iterrows():
code = str(row.get("Zugangscode (1)", "")).strip()
@@ -1067,6 +1108,7 @@ def sync_participants(job_id: str, account_type: str, db: Session):
# --- PHASE 2: Scrape Orders for Digital Packages (Price Magic) ---
try:
if task_id: task_store[task_id]["progress"] = "Suche nach digitalen Käufen (Price Magic)..."
orders_url = f"https://app.fotograf.de/config_jobs_orders/{job_id}/customer_orders"
logger.info(f"Navigating to orders page for price magic: {orders_url}")
driver.get(orders_url)
@@ -1127,11 +1169,168 @@ def sync_participants(job_id: str, account_type: str, db: Session):
except Exception as order_err:
logger.error(f"Failed to scrape orders for price magic: {order_err}")
# --- PHASE 3: Link Magic (Scrape Quick Login URLs) ---
try:
# Find candidates for reminders who don't have a link yet
# We prioritize those with few logins and no orders
link_candidates = db.query(JobParticipant).filter(
JobParticipant.job_id == job_id,
JobParticipant.has_orders == 0,
JobParticipant.logins <= 5,
JobParticipant.quick_login_url == None
).all()
if link_candidates:
if task_id: task_store[task_id]["progress"] = f"Sammle Login-Links für {len(link_candidates)} Personen (Link Magic)..."
logger.info(f"Link Magic: Identified {len(link_candidates)} candidates for link scraping.")
# Navigate back to Persons tab
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
logger.info(f"Navigating to Albums overview: {albums_overview_url}")
driver.get(albums_overview_url)
# Find all album links
album_elements = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_link"])))
albums = [{"name": e.text, "url": e.get_attribute("href")} for e in album_elements]
codes_to_find = {c.zugangscode: c for c in link_candidates}
links_found = 0
for album in albums:
if not codes_to_find: break
logger.info(f"Searching for links in album: {album['name']}")
driver.get(album['url'])
try:
total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text
num_pages = math.ceil(int(total_codes_text) / 20)
for page_num in range(1, num_pages + 1):
if not codes_to_find: break
if page_num > 1:
driver.get(album['url'] + f"?page_guest_accesses={page_num}")
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
# Map of codes on this page to their communication link
page_links = {}
for row in person_rows:
row_text = row.text
for code in list(codes_to_find.keys()):
if code in row_text:
try:
comm_link = row.find_element(By.XPATH, SELECTORS["person_access_code_link"]).get_attribute("href")
page_links[code] = comm_link
except: pass
# Now visit each communication page
for code, comm_link in page_links.items():
if code not in codes_to_find: continue
logger.debug(f"Scraping link for code {code}...")
if task_id: task_store[task_id]["progress"] = f"Hole Link {links_found+1} / {len(link_candidates)}..."
driver.get(comm_link)
try:
wait_short = WebDriverWait(driver, 5)
quick_link_el = wait_short.until(EC.presence_of_element_located((By.XPATH, SELECTORS["quick_login_url"])))
quick_link = quick_link_el.get_attribute("href")
# Update DB
codes_to_find[code].quick_login_url = quick_link
del codes_to_find[code]
links_found += 1
if links_found % 5 == 0: db.commit()
except:
logger.warning(f"Could not find quick login link for {code}")
# Go back to album page if we visited communication pages
if page_links:
driver.get(album['url'] + (f"?page_guest_accesses={page_num}" if page_num > 1 else ""))
wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
except Exception as album_err:
logger.error(f"Error in album {album['name']}: {album_err}")
db.commit()
logger.info(f"Link Magic complete: Scraped {links_found} links.")
except Exception as link_err:
logger.error(f"Failed to scrape links: {link_err}")
return len(df)
finally:
driver.quit()
@app.get("/api/jobs/{job_id}/reminder-history")
async def get_reminder_history(job_id: str, db: Session = Depends(get_db)):
history = db.query(ReminderHistory).filter(ReminderHistory.job_id == job_id).order_by(ReminderHistory.timestamp.desc()).all()
return [
{
"id": h.id,
"timestamp": h.timestamp.isoformat(),
"recipient_count": h.recipient_count,
"max_logins": h.max_logins,
"scheduled_time": h.scheduled_time,
"recipients": json.loads(h.recipients_json) if h.recipients_json else []
}
for h in history
]
class SendReminderRequest(BaseModel):
emails: List[Dict[str, str]]
max_logins: int
scheduled_time: Optional[str] = None
recipients_data: List[Dict[str, Any]] # To store in history
@app.post("/api/jobs/{job_id}/reminder-send")
async def send_reminders(
job_id: str,
data: SendReminderRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
logger.info(f"Sending {len(data.emails)} reminders for job {job_id}")
# Save to history
new_history = ReminderHistory(
job_id=job_id,
recipient_count=len(data.emails),
max_logins=data.max_logins,
recipients_json=json.dumps(data.recipients_data),
scheduled_time=data.scheduled_time or "Sofort"
)
db.add(new_history)
db.commit()
# Reuse delayed_send logic from publish_request_api if scheduled
if data.scheduled_time:
from publish_request_api import delayed_send
from database import SessionLocal
background_tasks.add_task(delayed_send, data.emails, data.scheduled_time, SessionLocal)
return {"status": "scheduled", "message": f"Versand für {data.scheduled_time} geplant."}
# Immediate send
service = GmailService(db)
success = 0
failed = []
for email_data in data.emails:
if service.send_email(email_data["to"], email_data["subject"], email_data["body"]):
success += 1
else:
failed.append(email_data["to"])
return {"status": "success", "success": success, "failed": failed}
@app.get("/api/jobs/{job_id}/login-distribution")
async def get_login_distribution(job_id: str, db: Session = Depends(get_db)):
from sqlalchemy import func
results = db.query(
JobParticipant.logins,
func.count(JobParticipant.id)
).filter(JobParticipant.job_id == job_id).group_by(JobParticipant.logins).order_by(JobParticipant.logins).all()
return [{"logins": r[0], "count": r[1]} for r in results]
@app.get("/api/jobs/{job_id}/fast-stats")
async def get_fast_stats(job_id: str, db: Session = Depends(get_db)):
@@ -1180,14 +1379,28 @@ async def get_fast_stats(job_id: str, db: Session = Depends(get_db)):
statistics.sort(key=lambda x: x["Album"])
return statistics
@app.post("/api/jobs/{job_id}/sync-participants")
async def sync_participants_api(job_id: str, account_type: str, db: Session = Depends(get_db)):
def process_sync_task(task_id: str, job_id: str, account_type: str):
logger.info(f"Task {task_id}: Starting background sync for job {job_id}")
task_store[task_id] = {"status": "running", "progress": "Starte Synchronisierung...", "result": None}
db = SessionLocal()
try:
count = sync_participants(job_id, account_type, db)
return {"status": "success", "count": count}
count = sync_participants(job_id, account_type, db, task_id)
task_store[task_id] = {
"status": "completed",
"progress": f"Abgleich fertig! {count} Personen synchronisiert.",
"result": count
}
except Exception as e:
logger.exception("Sync failed")
raise HTTPException(status_code=500, detail=str(e))
logger.exception(f"Unexpected error in sync task {task_id}")
task_store[task_id] = {"status": "error", "progress": f"Fehler: {str(e)}"}
finally:
db.close()
@app.post("/api/jobs/{job_id}/sync-participants")
async def sync_participants_api(job_id: str, account_type: str, background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
background_tasks.add_task(process_sync_task, task_id, job_id, account_type)
return {"task_id": task_id}
@app.get("/api/jobs/{job_id}/generate-pdf")
async def generate_pdf(job_id: str, account_type: str, db: Session = Depends(get_db)):
@@ -1297,23 +1510,24 @@ async def generate_pdf(job_id: str, account_type: str, db: Session = Depends(get
@app.get("/api/jobs/{job_id}/siblings-list")
async def generate_siblings_list(job_id: str, account_type: str, event_type_name: str = "", db: Session = Depends(get_db)):
logger.info(f"API Request: Generate siblings list for job {job_id}")
logger.info(f"API Request: Generate siblings list for job {job_id}, event_type: {event_type_name}")
username = os.getenv(f"{account_type.upper()}_USER")
password = os.getenv(f"{account_type.upper()}_PW")
api_token = os.getenv("CALENDLY_TOKEN")
if not api_token:
raise HTTPException(status_code=400, detail="Calendly API token missing.")
# Get Calendly events
from qr_generator import get_calendly_events_raw
try:
# Fetch ALL events to ensure we don't miss siblings due to event name mismatches
calendly_events = get_calendly_events_raw(api_token, event_type_name=None)
logger.info(f"Fetched {len(calendly_events)} total events from Calendly for siblings check.")
except Exception as e:
logger.error(f"Error fetching Calendly events: {e}")
calendly_events = []
calendly_events = []
if event_type_name:
if not api_token:
logger.warning("Calendly API token missing, skipping Calendly check.")
else:
# Get Calendly events
from qr_generator import get_calendly_events_raw
try:
# Fetch ALL events to ensure we don't miss siblings due to event name mismatches
calendly_events = get_calendly_events_raw(api_token, event_type_name=None)
logger.info(f"Fetched {len(calendly_events)} total events from Calendly for siblings check.")
except Exception as e:
logger.error(f"Error fetching Calendly events: {e}")
with tempfile.TemporaryDirectory() as temp_dir:
logger.debug(f"Using temp directory: {temp_dir}")