import json import os import time import csv from datetime import datetime from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException # --- Konfiguration & Konstanten --- CREDENTIALS_FILE = 'fotograf_credentials.json' OUTPUT_DIR = 'output' OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv') LOGIN_URL = 'https://app.fotograf.de/login/login' # --- Selektoren (unverändert) --- SELECTORS = { "cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll", "login_user": "#login-email", "login_pass": "#login-password", "login_button": "#login-submit", "job_name": "h1", "album_overview_rows": "//table/tbody/tr", "album_overview_link": ".//td[2]//a", "person_rows": "//div[contains(@class, 'border-legacy-silver-550') and .//span[text()='Logins']]", "person_vorname": ".//span[text()='Vorname']/following-sibling::strong", "person_logins": ".//span[text()='Logins']/following-sibling::strong", "person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]", "potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]", "quick_login_url": "//a[@id='quick-login-url']", "buyer_email": "//span[contains(., '@')]" } def take_error_screenshot(driver, error_name): os.makedirs(OUTPUT_DIR, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"error_{error_name}_{timestamp}.png" filepath = os.path.join(OUTPUT_DIR, filename) try: driver.save_screenshot(filepath) print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}") except Exception as e: print(f"!!! Konnte keinen Screenshot speichern: {e}") def setup_driver(): print("Initialisiere Chrome WebDriver...") options = Options() options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--window-size=1920,1200') options.binary_location = '/usr/bin/google-chrome' try: driver = webdriver.Chrome(options=options) return driver except Exception as e: print(f"Fehler bei der Initialisierung des WebDrivers: {e}") return None def load_all_credentials(): try: with open(CREDENTIALS_FILE, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return None def login(driver, username, password): print("Starte Login-Vorgang...") try: driver.get(LOGIN_URL) wait = WebDriverWait(driver, 10) try: print("Suche nach Cookie-Banner...") wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click() print("Cookie-Banner akzeptiert.") time.sleep(1) except TimeoutException: print("Kein Cookie-Banner gefunden, fahre fort.") print("Fülle Anmeldeformular aus...") wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username) driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password) print("Klicke auf Login...") driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click() print("Warte auf die nächste Seite...") wait.until(EC.url_contains('/config_dashboard/index')) print("Login erfolgreich!") return True except Exception as e: print(f"Login fehlgeschlagen. Grund: {e}") take_error_screenshot(driver, "login_error") return False def process_full_job(driver, job_url): wait = WebDriverWait(driver, 15) print(f"\nVerarbeite Job-URL: {job_url}") driver.get(job_url) try: job_name = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text print(f"Auftragsname: '{job_name}'") except TimeoutException: print("Konnte den Auftragsnamen nicht finden.") take_error_screenshot(driver, "job_name_not_found") return [] job_id = job_url.split('/')[-1] albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}" print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}") driver.get(albums_overview_url) albums_to_visit = [] try: album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"]))) print(f"{len(album_rows)} Alben in der Übersicht gefunden.") for row in album_rows: try: album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"]) albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')}) except NoSuchElementException: continue print(f"{len(albums_to_visit)} gültige Album-Links gesammelt.") except TimeoutException: print("Konnte die Album-Liste nicht finden.") take_error_screenshot(driver, "album_overview_error") return [] final_results = [] for album in albums_to_visit: print(f"\n--- Betrete Album: {album['name']} ---") driver.get(album['url']) try: num_persons = len(wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))) print(f"{num_persons} Personen in diesem Album gefunden.") for i in range(num_persons): person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))) person_row = person_rows[i] login_count_text = person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text if int(login_count_text) == 0: vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text print(f" --> ERFOLG: '{vorname}' mit 0 Logins gefunden!") access_code_page_url = person_row.find_element(By.XPATH, SELECTORS["person_access_code_link"]).get_attribute('href') driver.get(access_code_page_url) print(f" Navigiere zur Kommunikations-Seite für '{vorname}'...") # GEÄNDERTER, ROBUSTER BLOCK ZUR DATENEXTRAKTION for attempt in range(3): try: # Wir warten explizit, bis die Seite die Details anzeigt (z.B. den Schnell-Login) wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["quick_login_url"]))) schnell_login_url = driver.find_element(By.XPATH, SELECTORS["quick_login_url"]).get_attribute('href') potential_buyer_element = driver.find_element(By.XPATH, SELECTORS["potential_buyer_link"]) kaeufer_name = potential_buyer_element.text print(f" Käufer: '{kaeufer_name}', Schnell-Login: GEFUNDEN") potential_buyer_element.click() print(f" Navigiere zur Käufer-Detailseite...") email = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["buyer_email"]))).text print(f" FINALE ERFOLG: E-Mail gefunden: {email}") final_results.append({ "Name des Kindes": vorname, "Name Käufer": kaeufer_name, "E-Mail-Adresse Käufer": email, "Schnell Login URL": schnell_login_url }) break # Erfolg, also die Retry-Schleife verlassen except StaleElementReferenceException: print(f" Timing-Fehler (StaleElement), Versuch {attempt + 1}/3. Warte kurz...") time.sleep(1) # Längere Pause, um der Seite Zeit zu geben if attempt == 2: print(" Fehler war persistent, überspringe diese Person.") take_error_screenshot(driver, f"stale_error_{vorname}") except TimeoutException: print(f" Timeout beim Warten auf Details für '{vorname}'. Überspringe.") take_error_screenshot(driver, f"timeout_error_{vorname}") break # Breche die Retry-Schleife ab, da Warten keinen Sinn macht # Zurückkehren zur Album-Übersicht für die nächste Person print(f" Kehre zurück zur Album-Übersicht '{album['name']}'...") driver.get(album['url']) wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"]))) except TimeoutException: print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.") take_error_screenshot(driver, f"album_{album['name']}_error") continue return final_results def save_results_to_csv(results): if not results: print("\nKeine Daten zum Speichern vorhanden.") return os.makedirs(OUTPUT_DIR, exist_ok=True) fieldnames = ["Name des Kindes", "Name Käufer", "E-Mail-Adresse Käufer", "Schnell Login URL"] print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...") with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(results) print("Speichern erfolgreich!") def get_profile_choice(): all_credentials = load_all_credentials() if not all_credentials: return None profiles = list(all_credentials.keys()) print("\nBitte wähle das zu verwendende Profil:") for i, p in enumerate(profiles): print(f" {i + 1}) {p}") while True: try: c = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: ")) if 1 <= c <= len(profiles): p_name = profiles[c - 1] print(f"Profil '{p_name}' ausgewählt.") return all_credentials[p_name] else: print("Ungültige Auswahl.") except ValueError: print("Ungültige Eingabe.") def main(): print("--- Fotograf.de Scraper für Nutzer ohne Logins (FINALE VERSION) ---") credentials = get_profile_choice() if not credentials: return job_url = input("Bitte gib die URL des zu bearbeitenden Fotoauftrags ein (Einstellungs-Seite): ") if "fotograf.de/config_jobs_settings/index/" not in job_url: print("Dies scheint keine gültige URL für die Auftragseinstellungen zu sein.") return driver = setup_driver() if not driver: return try: if login(driver, credentials['username'], credentials['password']): all_results = process_full_job(driver, job_url) save_results_to_csv(all_results) else: print("Skript wird beendet, da der Login fehlgeschlagen ist.") finally: print("\nSkript beendet. Schließe WebDriver.") if driver: driver.quit() if __name__ == "__main__": main()