import json import os import time import csv import math import re from datetime import datetime from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException, InvalidArgumentException # --- Konfiguration & Konstanten --- CREDENTIALS_FILE = 'fotograf_credentials.json' OUTPUT_DIR = 'output' LOGIN_URL = 'https://app.fotograf.de/login/login' # --- Selektoren --- SELECTORS = { "cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll", "login_user": "#login-email", "login_pass": "#login-password", "login_button": "#login-submit", "job_name": "h1", "album_overview_rows": "//table/tbody/tr", "album_overview_link": ".//td[2]//a", "access_code_count": "//span[text()='Zugangscodes']/following-sibling::strong", "person_rows": "//div[contains(@class, 'border-legacy-silver-550') and .//span[text()='Logins']]", "person_vorname": ".//span[text()='Vorname']/following-sibling::strong", "person_logins": ".//span[text()='Logins']/following-sibling::strong", "person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]", # Selektoren für die Statistik-Zählung "person_all_photos": ".//div[@data-key]", "person_purchased_photos": ".//div[@data-key and .//img[@alt='Bestellungen mit diesem Foto']]", "person_access_card_photo": ".//div[@data-key and contains(@class, 'opacity-50')]", # NEU: Identifiziert die Zugangskarte "potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]", "quick_login_url": "//a[@id='quick-login-url']", "buyer_email": "//span[contains(., '@')]" } def take_error_screenshot(driver, error_name): os.makedirs(OUTPUT_DIR, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"error_{error_name}_{timestamp}.png" filepath = os.path.join(OUTPUT_DIR, filename) try: driver.save_screenshot(filepath) print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}") except Exception as e: print(f"!!! Konnte keinen Screenshot speichern: {e}") def setup_driver(): print("Initialisiere Chrome WebDriver...") options = Options() options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--window-size=1920,1200') options.binary_location = '/usr/bin/google-chrome' try: driver = webdriver.Chrome(options=options) return driver except Exception as e: print(f"Fehler bei der Initialisierung des WebDrivers: {e}") return None def load_all_credentials(): try: with open(CREDENTIALS_FILE, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return None def login(driver, username, password): print("Starte Login-Vorgang...") try: driver.get(LOGIN_URL) wait = WebDriverWait(driver, 10) try: print("Suche nach Cookie-Banner...") wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click() print("Cookie-Banner akzeptiert.") time.sleep(1) except TimeoutException: print("Kein Cookie-Banner gefunden, fahre fort.") print("Fülle Anmeldeformular aus...") wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username) driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password) print("Klicke auf Login...") driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click() print("Warte auf die nächste Seite...") wait.until(EC.url_contains('/config_dashboard/index')) print("Login erfolgreich!") return True except Exception as e: print(f"Login fehlgeschlagen. Grund: {e}") take_error_screenshot(driver, "login_error") return False def process_reminder_mode(driver, job_url): wait = WebDriverWait(driver, 15) try: job_id_match = re.search(r'/(\d+)', job_url) if not job_id_match: raise ValueError("Konnte keine numerische Job-ID finden.") job_id = job_id_match.group(1) settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}" except Exception as e: print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren. Grund: {e}") return [] print(f"\nVerarbeite Job-ID: {job_id}") driver.get(settings_url) try: job_name = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text print(f"Auftragsname: '{job_name}'") except TimeoutException: print("Konnte den Auftragsnamen nicht finden.") return [] albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}" print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}") driver.get(albums_overview_url) albums_to_visit = [] try: album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"]))) print(f"{len(album_rows)} Alben in der Übersicht gefunden.") for row in album_rows: try: album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"]) albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')}) except NoSuchElementException: continue print(f"{len(albums_to_visit)} gültige Album-Links gesammelt.") except TimeoutException: print("Konnte die Album-Liste nicht finden.") return [] final_results = [] for album in albums_to_visit: print(f"\n--- Betrete Album: {album['name']} ---") driver.get(album['url']) try: total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text num_pages = math.ceil(int(total_codes_text) / 20) print(f"Album hat {total_codes_text} Zugangscodes auf {num_pages} Seite(n).") for page_num in range(1, num_pages + 1): current_page_url = album['url'] if page_num > 1: current_page_url += f"?page_guest_accesses={page_num}" print(f" Verarbeite Seite {page_num}...") driver.get(current_page_url) num_persons = len(wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))) print(f" {num_persons} Personen auf dieser Seite gefunden.") for i in range(num_persons): person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))) person_row = person_rows[i] login_count_text = person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text if int(login_count_text) <= 1: vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text try: photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]") purchase_icons = photo_container.find_elements(By.XPATH, SELECTORS["person_purchased_photos"]) if len(purchase_icons) > 0: print(f" --> INFO: '{vorname}' hat bereits gekauft. Überspringe.") continue except NoSuchElementException: pass print(f" --> ERFOLG: '{vorname}' mit {login_count_text} Login(s) gefunden (und kein Kauf).") access_code_page_url = person_row.find_element(By.XPATH, SELECTORS["person_access_code_link"]).get_attribute('href') driver.get(access_code_page_url) print(f" Navigiere zur Kommunikations-Seite für '{vorname}'...") for attempt in range(3): try: wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["quick_login_url"]))) schnell_login_url = driver.find_element(By.XPATH, SELECTORS["quick_login_url"]).get_attribute('href') potential_buyer_element = driver.find_element(By.XPATH, SELECTORS["potential_buyer_link"]) kaeufer_name = potential_buyer_element.text print(f" Käufer: '{kaeufer_name}', Schnell-Login: GEFUNDEN") potential_buyer_element.click() print(f" Navigiere zur Käufer-Detailseite...") email = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["buyer_email"]))).text print(f" FINALE ERFOLG: E-Mail gefunden: {email}") final_results.append({ "Name des Kindes": vorname, "Name Käufer": kaeufer_name, "E-Mail-Adresse Käufer": email, "Schnell Login URL": schnell_login_url }) break except StaleElementReferenceException: print(f" Timing-Fehler, Versuch {attempt + 1}/3...") time.sleep(1) if attempt == 2: raise except TimeoutException: print(f" Timeout beim Warten auf Details für '{vorname}'.") take_error_screenshot(driver, f"timeout_error_{vorname}") break print(f" Kehre zurück zur Album-Seite {page_num}...") driver.get(current_page_url) wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"]))) except TimeoutException: print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.") continue return final_results def aggregate_results_by_email(results): print("\nBeginne mit der Aggregation der Ergebnisse pro E-Mail-Adresse...") aggregated_data = {} for result in results: email = result['E-Mail-Adresse Käufer'] child_name = "Familienbilder" if result['Name des Kindes'] == "Familie" else result['Name des Kindes'] html_link = f'Fotos von {child_name}' if email not in aggregated_data: aggregated_data[email] = { 'Name Käufer': result['Name Käufer'].split(' ')[0], 'E-Mail-Adresse Käufer': email, 'Kindernamen_list': [child_name], 'LinksHTML_list': [html_link] } else: aggregated_data[email]['Kindernamen_list'].append(child_name) aggregated_data[email]['LinksHTML_list'].append(html_link) final_list = [] for email, data in aggregated_data.items(): names_list = data['Kindernamen_list'] if len(names_list) > 2: kindernamen_str = ', '.join(names_list[:-1]) + ' und ' + names_list[-1] else: kindernamen_str = ' und '.join(names_list) final_list.append({ 'Name Käufer': data['Name Käufer'], 'E-Mail-Adresse Käufer': email, 'Kindernamen': kindernamen_str, 'LinksHTML': '

'.join(data['LinksHTML_list']) }) print(f"Aggregation abgeschlossen. {len(results)} Roh-Einträge zu {len(final_list)} einzigartigen E-Mails zusammengefasst.") return final_list def save_aggregated_results_to_csv(results): if not results: print("\nKeine Daten zum Speichern vorhanden.") return output_file = os.path.join(OUTPUT_DIR, 'supermailer_fertige_liste.csv') os.makedirs(OUTPUT_DIR, exist_ok=True) fieldnames = ["Name Käufer", "E-Mail-Adresse Käufer", "Kindernamen", "LinksHTML"] print(f"\nSpeichere {len(results)} aggregierte Ergebnisse in '{output_file}'...") with open(output_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(results) print("Speichern erfolgreich!") # --- Modus 2: Statistik-Auswertung --- def process_statistics_mode(driver, job_url): wait = WebDriverWait(driver, 15) try: job_id = re.search(r'/(\d+)', job_url).group(1) except Exception: print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren.") return [] albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}" print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}") driver.get(albums_overview_url) albums_to_visit = [] try: album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"]))) for row in album_rows: try: album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"]) albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')}) except NoSuchElementException: continue except TimeoutException: print("Konnte die Album-Liste nicht finden.") return [] statistics = [] print("\n--- STATISTIK-AUSWERTUNG ---") for album in albums_to_visit: print(f"\nAlbum: {album['name']}") driver.get(album['url']) try: total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text num_pages = math.ceil(int(total_codes_text) / 20) total_children_in_album = 0 children_with_purchase = 0 children_with_all_purchased = 0 for page_num in range(1, num_pages + 1): if page_num > 1: driver.get(album['url'] + f"?page_guest_accesses={page_num}") person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))) for person_row in person_rows: total_children_in_album += 1 try: photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]") # GEÄNDERTE ZÄHLLOGIK num_total_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_all_photos"])) num_purchased_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_purchased_photos"])) num_access_cards = len(photo_container.find_elements(By.XPATH, SELECTORS["person_access_card_photo"])) buyable_photos = num_total_photos - num_access_cards if num_purchased_photos > 0: children_with_purchase += 1 if buyable_photos > 0 and buyable_photos == num_purchased_photos: children_with_all_purchased += 1 except NoSuchElementException: continue print(f" - Kinder insgesamt: {total_children_in_album}") print(f" - Kinder mit (mind. 1) Kauf: {children_with_purchase}") print(f" - Kinder (Alle Bilder gekauft): {children_with_all_purchased}") statistics.append({ "Album": album['name'], "Kinder insgesamt": total_children_in_album, "Kinder mit Käufen": children_with_purchase, "Kinder (Alle Bilder gekauft)": children_with_all_purchased }) except Exception as e: print(f" Fehler bei der Auswertung dieses Albums: {e}") continue return statistics def save_statistics_to_csv(results): if not results: print("\nKeine Statistikdaten zum Speichern vorhanden.") return output_file = os.path.join(OUTPUT_DIR, 'job_statistik.csv') os.makedirs(OUTPUT_DIR, exist_ok=True) fieldnames = ["Album", "Kinder insgesamt", "Kinder mit Käufen", "Kinder (Alle Bilder gekauft)"] print(f"\nSpeichere Statistik für {len(results)} Alben in '{output_file}'...") with open(output_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(results) print("Speichern erfolgreich!") def get_profile_choice(): all_credentials = load_all_credentials() if not all_credentials: return None profiles = list(all_credentials.keys()) print("\nBitte wähle das zu verwendende Profil:") for i, p in enumerate(profiles): print(f" {i + 1}) {p}") while True: try: c = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: ")) if 1 <= c <= len(profiles): p_name = profiles[c - 1] print(f"Profil '{p_name}' ausgewählt.") return all_credentials[p_name] else: print("Ungültige Auswahl.") except ValueError: print("Ungültige Eingabe.") def main(): print("--- Fotograf.de Scraper (v3.2 - The Master Analyst) ---") while True: mode = input("Bitte Modus wählen:\n 1) E-Mail-Liste erstellen\n 2) Statistik auswerten\nWahl: ") if mode in ['1', '2']: break else: print("Ungültige Eingabe.") credentials = get_profile_choice() if not credentials: return job_url_raw = input("Bitte eine beliebige URL des zu bearbeitenden Fotoauftrags ein: ") match = re.search(r'(https?://[^\s]+)', job_url_raw) if not match: print("Keine gültige URL in der Eingabe gefunden.") return job_url = match.group(1).strip() if "fotograf.de/config_jobs_" not in job_url or not re.search(r'/\d+', job_url): print("Dies scheint keine gültige URL für einen Fotoauftrag zu sein.") return driver = setup_driver() if not driver: return try: if login(driver, credentials['username'], credentials['password']): if mode == '1': raw_results = process_reminder_mode(driver, job_url) aggregated_results = aggregate_results_by_email(raw_results) save_aggregated_results_to_csv(aggregated_results) elif mode == '2': stats_results = process_statistics_mode(driver, job_url) save_statistics_to_csv(stats_results) else: print("Skript wird beendet, da der Login fehlgeschlagen ist.") finally: print("\nSkript beendet. Schließe WebDriver.") if driver: driver.quit() if __name__ == "__main__": main()