278 lines
9.7 KiB
Python
278 lines
9.7 KiB
Python
import os
|
|
import requests
|
|
import io
|
|
import datetime
|
|
from reportlab.pdfgen import canvas
|
|
from reportlab.lib.pagesizes import A4
|
|
from reportlab.pdfbase import pdfmetrics
|
|
from reportlab.pdfbase.ttfonts import TTFont
|
|
from PyPDF2 import PdfReader, PdfWriter
|
|
import logging
|
|
|
|
logger = logging.getLogger("qr-card-generator")
|
|
|
|
def get_calendly_event_types(api_token: str):
|
|
"""
|
|
Fetches available event types for the current user.
|
|
"""
|
|
headers = {
|
|
'Authorization': f'Bearer {api_token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
# 1. Get current user info
|
|
user_url = "https://api.calendly.com/users/me"
|
|
user_response = requests.get(user_url, headers=headers)
|
|
if not user_response.ok:
|
|
raise Exception(f"Calendly API Error: {user_response.status_code}")
|
|
|
|
user_data = user_response.json()
|
|
user_uri = user_data['resource']['uri']
|
|
|
|
# 2. Get event types
|
|
event_types_url = "https://api.calendly.com/event_types"
|
|
params = {
|
|
'user': user_uri
|
|
}
|
|
|
|
types_response = requests.get(event_types_url, headers=headers, params=params)
|
|
if not types_response.ok:
|
|
raise Exception(f"Calendly API Error: {types_response.status_code}")
|
|
|
|
types_data = types_response.json()
|
|
return types_data['collection']
|
|
|
|
def get_calendly_events_raw(api_token: str, start_time: str = None, end_time: str = None, event_type_name: str = None):
|
|
"""
|
|
Debug function to fetch raw Calendly data without formatting.
|
|
"""
|
|
headers = {
|
|
'Authorization': f'Bearer {api_token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
# Defaults: 2024 to +2 years (broad range to ensure we capture all relevant upcoming)
|
|
if not start_time:
|
|
start_time = "2024-01-01T00:00:00Z"
|
|
if not end_time:
|
|
end_time = (datetime.datetime.utcnow() + datetime.timedelta(days=730)).isoformat() + "Z"
|
|
|
|
# 1. Get current user info to get the user URI
|
|
user_url = "https://api.calendly.com/users/me"
|
|
user_response = requests.get(user_url, headers=headers)
|
|
if not user_response.ok:
|
|
raise Exception(f"Calendly API Error: {user_response.status_code}")
|
|
|
|
user_data = user_response.json()
|
|
user_uri = user_data['resource']['uri']
|
|
|
|
# 2. Get events for the user
|
|
events_url = "https://api.calendly.com/scheduled_events"
|
|
params = {
|
|
'user': user_uri,
|
|
'status': 'active',
|
|
'min_start_time': start_time,
|
|
'max_start_time': end_time,
|
|
'count': 100
|
|
}
|
|
|
|
all_events = []
|
|
url = events_url
|
|
while url:
|
|
response = requests.get(url, headers=headers, params=params)
|
|
if not response.ok:
|
|
raise Exception(f"Calendly API Error: {response.status_code}")
|
|
|
|
data = response.json()
|
|
all_events.extend(data.get('collection', []))
|
|
|
|
pagination = data.get('pagination', {})
|
|
next_page_token = pagination.get('next_page_token')
|
|
|
|
if next_page_token:
|
|
params = {'user': user_uri, 'count': 100, 'page_token': next_page_token, 'status': 'active'}
|
|
else:
|
|
url = None
|
|
|
|
raw_results = []
|
|
|
|
# 3. Get invitees
|
|
for event in all_events:
|
|
event_name = event.get('name', '')
|
|
# Filter by event type if provided
|
|
if event_type_name and event_type_name.lower() not in event_name.lower():
|
|
continue
|
|
|
|
event_uri = event['uri']
|
|
event_uuid = event_uri.split('/')[-1]
|
|
invitees_url = f"https://api.calendly.com/scheduled_events/{event_uuid}/invitees"
|
|
|
|
invitees_response = requests.get(invitees_url, headers=headers)
|
|
if not invitees_response.ok:
|
|
continue
|
|
|
|
invitees_data = invitees_response.json()
|
|
|
|
for invitee in invitees_data['collection']:
|
|
raw_results.append({
|
|
"event_name": event_name,
|
|
"start_time": event['start_time'],
|
|
"invitee_name": invitee['name'],
|
|
"invitee_email": invitee['email'],
|
|
"questions_and_answers": invitee.get('questions_and_answers', [])
|
|
})
|
|
|
|
return raw_results
|
|
|
|
def get_calendly_events(api_token: str, start_time: str = None, end_time: str = None, event_type_name: str = None):
|
|
"""
|
|
Fetches events from Calendly API for the current user within a time range.
|
|
"""
|
|
from zoneinfo import ZoneInfo
|
|
|
|
raw_data = get_calendly_events_raw(api_token, start_time, end_time, event_type_name)
|
|
formatted_data = []
|
|
|
|
for item in raw_data:
|
|
# Parse start time from UTC
|
|
start_dt = datetime.datetime.fromisoformat(item['start_time'].replace('Z', '+00:00'))
|
|
# Convert to Europe/Berlin (CET/CEST)
|
|
start_dt = start_dt.astimezone(ZoneInfo("Europe/Berlin"))
|
|
# Format as HH:MM
|
|
time_str = start_dt.strftime('%H:%M')
|
|
|
|
name = item['invitee_name']
|
|
|
|
# Extract specific answers from the Calendly form
|
|
num_children = ""
|
|
additional_notes = ""
|
|
has_consent = False
|
|
questions_and_answers = item.get('questions_and_answers', [])
|
|
|
|
for q_a in questions_and_answers:
|
|
q_text = q_a.get('question', '').lower()
|
|
a_text = q_a.get('answer', '')
|
|
|
|
if "wie viele kinder" in q_text:
|
|
num_children = a_text
|
|
elif "nachricht" in q_text or "anmerkung" in q_text:
|
|
additional_notes = a_text
|
|
elif "schöne bilder" in q_text and "website veröffentlichen" in q_text:
|
|
if "ja, gerne" in a_text.lower():
|
|
has_consent = True
|
|
|
|
# Construct the final string: "[☑] Name, X Kinder // HH:MM Uhr (Notes)"
|
|
# matching: Halime Türe, 1 Kind // 12:00 Uhr
|
|
final_text = ""
|
|
if has_consent:
|
|
# We use a placeholder character or string that we will handle in overlay_text_on_pdf
|
|
# because standard Helvetica doesn't support Unicode checkbox.
|
|
final_text += "☑ "
|
|
|
|
final_text += f"{name}"
|
|
if num_children:
|
|
final_text += f", {num_children}"
|
|
|
|
final_text += f" // {time_str} Uhr"
|
|
|
|
if additional_notes:
|
|
final_text += f" ({additional_notes})"
|
|
|
|
formatted_data.append(final_text)
|
|
|
|
logger.info(f"Processed {len(formatted_data)} invitees.")
|
|
return formatted_data
|
|
|
|
|
|
def overlay_text_on_pdf(base_pdf_path: str, output_pdf_path: str, texts: list):
|
|
"""
|
|
Target:
|
|
Element 1: X: 72mm, Y: 22mm + 9mm = 31mm
|
|
Element 2: X: 72mm, Y: 171mm + 9mm = 180mm
|
|
"""
|
|
|
|
# Convert mm to points (1 mm = 2.83465 points)
|
|
mm_to_pt = 2.83465
|
|
|
|
# A4 dimensions in points (approx 595.27 x 841.89)
|
|
page_width, page_height = A4
|
|
|
|
# User coordinates are from top-left.
|
|
# ReportLab uses bottom-left as (0,0).
|
|
# Element 1 (Top): X = 72mm, Y = 31mm (from top) -> Y = page_height - 31mm
|
|
# Element 2 (Bottom): X = 72mm, Y = 180mm (from top) -> Y = page_height - 180mm
|
|
|
|
x_pos = 72 * mm_to_pt
|
|
y_pos_1 = page_height - (31 * mm_to_pt)
|
|
y_pos_2 = page_height - (180 * mm_to_pt)
|
|
|
|
reader = PdfReader(base_pdf_path)
|
|
writer = PdfWriter()
|
|
|
|
total_pages = len(reader.pages)
|
|
max_capacity = total_pages * 2
|
|
|
|
if len(texts) > max_capacity:
|
|
logger.warning(f"Not enough pages in base PDF. Have {len(texts)} invitees but only space for {max_capacity}. Truncating.")
|
|
texts = texts[:max_capacity]
|
|
|
|
# We need to process pairs of texts for each page
|
|
text_pairs = [texts[i:i+2] for i in range(0, len(texts), 2)]
|
|
|
|
for page_idx, pair in enumerate(text_pairs):
|
|
if page_idx >= total_pages:
|
|
break # Safety first
|
|
|
|
# Create a new blank page in memory to draw the text
|
|
packet = io.BytesIO()
|
|
can = canvas.Canvas(packet, pagesize=A4)
|
|
|
|
# Draw the text.
|
|
# We handle the "☑" character manually since Helvetica might not support it.
|
|
def draw_text_with_checkbox(can, x, y, text):
|
|
current_x = x
|
|
if text.startswith("☑ "):
|
|
# Draw a checkbox manually
|
|
size = 10
|
|
# Draw box (baseline adjustment to align with text)
|
|
can.rect(x, y - 1, size, size)
|
|
# Draw checkmark
|
|
can.setLineWidth(1.5)
|
|
can.line(x + 2, y + 3, x + 4.5, y + 0.5)
|
|
can.line(x + 4.5, y + 0.5, x + 8.5, y + 7)
|
|
can.setLineWidth(1)
|
|
|
|
# Move text X position to the right
|
|
current_x += size + 5
|
|
text = text[2:] # Remove the "☑ " from string
|
|
|
|
can.setFont("Helvetica", 12)
|
|
can.drawString(current_x, y, text)
|
|
|
|
if len(pair) > 0:
|
|
draw_text_with_checkbox(can, x_pos, y_pos_1, pair[0])
|
|
if len(pair) > 1:
|
|
draw_text_with_checkbox(can, x_pos, y_pos_2, pair[1])
|
|
|
|
can.save()
|
|
packet.seek(0)
|
|
|
|
# Read the text PDF we just created
|
|
new_pdf = PdfReader(packet)
|
|
text_page = new_pdf.pages[0]
|
|
|
|
# Get the specific page from the original PDF
|
|
page_to_merge = reader.pages[page_idx]
|
|
page_to_merge.merge_page(text_page)
|
|
|
|
writer.add_page(page_to_merge)
|
|
|
|
# If there are pages left in the base PDF that we didn't use, append them too?
|
|
# Usually you'd want to keep them or discard them. We'll discard unused pages for now
|
|
# to avoid empty cards, or you can change this loop to include them.
|
|
|
|
with open(output_pdf_path, "wb") as output_file:
|
|
writer.write(output_file)
|
|
|
|
logger.info(f"Successfully generated overlaid PDF at {output_pdf_path}")
|