183 lines
6.9 KiB
Python
183 lines
6.9 KiB
Python
import pandas as pd
|
|
import os
|
|
import logging
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from collections import defaultdict
|
|
from main import get_berlin_now_str, get_logo_base64
|
|
from weasyprint import HTML
|
|
|
|
logger = logging.getLogger("fotograf-scraper")
|
|
|
|
def generate_siblings_pdf_from_csv(csv_path: str, institution: str, calendly_events: list, list_type: str, output_path: str):
|
|
logger.info(f"Generating Siblings PDF for {institution} from {csv_path}")
|
|
df = None
|
|
for sep in [";", ","]:
|
|
try:
|
|
test_df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig", nrows=5)
|
|
if len(test_df.columns) > 1:
|
|
df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig")
|
|
break
|
|
except Exception as e:
|
|
continue
|
|
|
|
if df is None:
|
|
try:
|
|
df = pd.read_csv(csv_path, sep=";", encoding="latin1")
|
|
except:
|
|
raise Exception("CSV konnte nicht gelesen werden.")
|
|
|
|
df.columns = df.columns.str.strip().str.replace('"', "")
|
|
|
|
# Identify Email Column
|
|
email_col = next((c for c in df.columns if "email" in c.lower()), None)
|
|
if not email_col:
|
|
email_col = next((c for c in df.columns if "e-mail" in c.lower()), None)
|
|
|
|
if not email_col:
|
|
logger.warning("No email column found. Siblings logic cannot run.")
|
|
families = []
|
|
else:
|
|
# Columns mappings
|
|
group_col = next((c for c in df.columns if c.lower() in ["gruppe", "klasse", "group", "class"]), None)
|
|
lastname_col = next((c for c in df.columns if "nachname" in c.lower()), None)
|
|
firstname_col = next((c for c in df.columns if "vorname" in c.lower()), None)
|
|
wunsch_col = next((c for c in df.columns if "familie" in c.lower() or "geschwister" in c.lower() and "fotos" in c.lower()), None)
|
|
if not wunsch_col:
|
|
wunsch_col = next((c for c in df.columns if "familie / geschwister" in c.lower()), None)
|
|
|
|
# Build Calendly Dictionary for fast lookup (Email -> Time)
|
|
from zoneinfo import ZoneInfo
|
|
import datetime
|
|
calendly_map = {}
|
|
now_berlin = datetime.datetime.now(ZoneInfo("Europe/Berlin"))
|
|
midnight_today = now_berlin.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
for event in calendly_events:
|
|
try:
|
|
start_dt = datetime.datetime.fromisoformat(event['start_time'].replace('Z', '+00:00'))
|
|
start_dt = start_dt.astimezone(ZoneInfo("Europe/Berlin"))
|
|
calendly_map[event['invitee_email'].lower().strip()] = start_dt.strftime("%d.%m. %H:%M")
|
|
except:
|
|
pass
|
|
|
|
families_dict = defaultdict(list)
|
|
df = df.fillna("")
|
|
|
|
# Group by email
|
|
for _, row in df.iterrows():
|
|
email = str(row[email_col]).strip().lower()
|
|
if email and "@" in email:
|
|
families_dict[email].append(row)
|
|
|
|
families = []
|
|
for email, rows in families_dict.items():
|
|
if len(rows) > 1: # SIBLINGS DETECTED
|
|
family_last_name = str(rows[0][lastname_col]).strip() if lastname_col else "Unbekannt"
|
|
|
|
children = []
|
|
for r in rows:
|
|
child_first = str(r[firstname_col]).strip() if firstname_col else ""
|
|
child_group = str(r[group_col]).strip() if group_col else ""
|
|
children.append({"vorname": child_first, "gruppe": child_group})
|
|
|
|
# Check fotograf wunsch
|
|
fotograf_wunsch = False
|
|
if wunsch_col:
|
|
for r in rows:
|
|
val = str(r[wunsch_col]).lower()
|
|
if "ja" in val or "familien" in val or "geschwister" in val:
|
|
fotograf_wunsch = True
|
|
break
|
|
|
|
calendly_time = calendly_map.get(email, None)
|
|
|
|
families.append({
|
|
"nachname": family_last_name,
|
|
"children": children,
|
|
"fotograf_wunsch": fotograf_wunsch,
|
|
"calendly_time": calendly_time
|
|
})
|
|
|
|
# Sort by last name
|
|
families.sort(key=lambda x: x["nachname"])
|
|
|
|
template_dir = os.path.join(os.path.dirname(__file__), "templates")
|
|
env = Environment(loader=FileSystemLoader(template_dir))
|
|
template = env.get_template("siblings_list.html")
|
|
|
|
current_time = get_berlin_now_str()
|
|
logo_base64 = get_logo_base64()
|
|
|
|
render_context = {
|
|
"institution": institution,
|
|
"current_time": current_time,
|
|
"logo_base64": logo_base64,
|
|
"families": families
|
|
}
|
|
|
|
html_out = template.render(render_context)
|
|
pdf = HTML(string=html_out).write_pdf()
|
|
|
|
with open(output_path, "wb") as f:
|
|
f.write(pdf)
|
|
logger.info(f"Siblings PDF saved to {output_path}")
|
|
|
|
def get_sibling_families_from_csv(csv_path: str, calendly_events: list = None) -> list:
|
|
df = None
|
|
for sep in [";", ","]:
|
|
try:
|
|
test_df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig", nrows=5)
|
|
if len(test_df.columns) > 1:
|
|
df = pd.read_csv(csv_path, sep=sep, encoding="utf-8-sig")
|
|
break
|
|
except Exception as e:
|
|
continue
|
|
|
|
if df is None:
|
|
try:
|
|
df = pd.read_csv(csv_path, sep=";", encoding="latin1")
|
|
except:
|
|
raise Exception("CSV konnte nicht gelesen werden.")
|
|
|
|
df.columns = df.columns.str.strip().str.replace('"', "")
|
|
|
|
email_col = next((c for c in df.columns if "email" in c.lower()), None)
|
|
if not email_col:
|
|
email_col = next((c for c in df.columns if "e-mail" in c.lower()), None)
|
|
|
|
if not email_col:
|
|
return []
|
|
|
|
lastname_col = next((c for c in df.columns if "nachname" in c.lower()), None)
|
|
|
|
# Build Calendly Email Set for filtering
|
|
booked_emails = set()
|
|
if calendly_events:
|
|
for event in calendly_events:
|
|
email = event.get('invitee_email', '').lower().strip()
|
|
if email:
|
|
booked_emails.add(email)
|
|
|
|
families_dict = defaultdict(list)
|
|
df = df.fillna("")
|
|
|
|
for _, row in df.iterrows():
|
|
email = str(row[email_col]).strip().lower()
|
|
if email and "@" in email:
|
|
families_dict[email].append(row)
|
|
|
|
families = []
|
|
for email, rows in families_dict.items():
|
|
if len(rows) > 1: # SIBLINGS DETECTED
|
|
# FILTER OUT if they already have an appointment
|
|
if email in booked_emails:
|
|
logger.info(f"Family {email} already has Calendly appointment, skipping QR card.")
|
|
continue
|
|
|
|
family_last_name = str(rows[0][lastname_col]).strip() if lastname_col else "Unbekannt"
|
|
families.append({
|
|
"nachname": family_last_name
|
|
})
|
|
|
|
families.sort(key=lambda x: x["nachname"])
|
|
return families |