import json import os import time import csv from datetime import datetime from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException, InvalidSelectorException # --- Konfiguration & Konstanten --- CREDENTIALS_FILE = 'fotograf_credentials.json' OUTPUT_DIR = 'output' OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv') LOGIN_URL = 'https://app.fotograf.de/login/login' # --- Selektoren --- # Diese werden im nächsten Schritt korrigiert. Für diesen Test sind sie teilweise irrelevant. SELECTORS = { "cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll", "login_user": "#login-email", "login_pass": "#login-password", "login_button": "#login-submit", "job_name": "h1", # Platzhalter, wird im nächsten Schritt korrigiert "album_rows": "//table/tbody/tr", "album_link": ".//td[2]//a", "login_count": ".//td[7]", # Diese sind für den Test relevant "buyer_link": "//a[contains(., 'Käufer')]", # Robusterer XPath "buyer_email": "//span[contains(., '@')]" # Robusterer XPath } def take_error_screenshot(driver, error_name): os.makedirs(OUTPUT_DIR, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"error_{error_name}_{timestamp}.png" filepath = os.path.join(OUTPUT_DIR, filename) try: driver.save_screenshot(filepath) print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}") except Exception as e: print(f"!!! Konnte keinen Screenshot speichern: {e}") def setup_driver(): print("Initialisiere Chrome WebDriver...") options = Options() options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--window-size=1920,1200') options.binary_location = '/usr/bin/google-chrome' try: driver = webdriver.Chrome(options=options) return driver except Exception as e: print(f"Fehler bei der Initialisierung des WebDrivers: {e}") return None def load_all_credentials(): try: with open(CREDENTIALS_FILE, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return None def login(driver, username, password): print("Starte Login-Vorgang...") try: driver.get(LOGIN_URL) wait = WebDriverWait(driver, 10) try: print("Suche nach Cookie-Banner...") cookie_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))) cookie_button.click() print("Cookie-Banner akzeptiert.") time.sleep(1) except TimeoutException: print("Kein Cookie-Banner gefunden, fahre fort.") print("Fülle Anmeldeformular aus...") wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username) driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password) print("Klicke auf Login...") driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click() print("Warte auf die nächste Seite...") wait.until(EC.url_contains('/config_dashboard/index')) print("Login erfolgreich!") return True except Exception as e: print(f"Login fehlgeschlagen. Grund: {e}") take_error_screenshot(driver, "login_error") return False # NEU: Eine dedizierte Funktion, um nur die Details einer Album-URL zu verarbeiten def process_single_album_details(driver, album_url, job_name): """Ruft eine Album-Detailseite auf und extrahiert die Käufer-Infos.""" results = [] wait = WebDriverWait(driver, 15) try: print(f"\n--> TEST: Rufe Album-Detailseite direkt auf: {album_url}") driver.get(album_url) print("Suche nach Käufer-Link...") buyer_link_element = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["buyer_link"]))) buyer_name = buyer_link_element.text.replace('Käufer ', '').strip() buyer_page_url = buyer_link_element.get_attribute('href') print(f" Käufer gefunden: '{buyer_name}'. Rufe Käuferseite auf...") driver.get(buyer_page_url) print("Suche nach E-Mail-Adresse...") buyer_email = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["buyer_email"]))).text print(f" E-Mail gefunden: {buyer_email}") results.append({ "Auftragsname": job_name, "Kind Vorname": "Kobolde (Test)", # Platzhalter, da wir die Liste nicht scannen "Käufer Name": buyer_name, "Käufer E-Mail": buyer_email, }) print("\n--> ERFOLG: Käufer-Daten erfolgreich extrahiert!") except Exception as e: print(f"--> FEHLER: Konnte Käufer-Details nicht extrahieren. Grund: {e}") take_error_screenshot(driver, "detail_page_error") return results def save_results_to_csv(results): if not results: print("\nKeine Daten zum Speichern vorhanden.") return os.makedirs(OUTPUT_DIR, exist_ok=True) print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...") with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=results[0].keys()) writer.writeheader() writer.writerows(results) print("Speichern erfolgreich!") def get_profile_choice(): all_credentials = load_all_credentials() if not all_credentials: return None profiles = list(all_credentials.keys()) print("\nBitte wähle das zu verwendende Profil:") for i, p in enumerate(profiles): print(f" {i + 1}) {p}") while True: try: c = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: ")) if 1 <= c <= len(profiles): p_name = profiles[c - 1] print(f"Profil '{p_name}' ausgewählt.") return all_credentials[p_name] else: print("Ungültige Auswahl.") except ValueError: print("Ungültige Eingabe.") # GEÄNDERT: Die main-Funktion führt jetzt den direkten Test aus def main(): print("--- Fotograf.de Scraper für Nutzer ohne Logins (DIREKTER ALBUM-TEST) ---") credentials = get_profile_choice() if not credentials: return # Die Job-URL wird nur noch für den Namen des Auftrags benötigt job_settings_url = "https://app.fotograf.de/config_jobs_settings/index/216142382" # Die von Ihnen bereitgestellte Test-URL test_album_url = "https://app.fotograf.de/config_jobs_photos/gallery/216142382/6137045" driver = setup_driver() if not driver: return try: if login(driver, credentials['username'], credentials['password']): # Job-Namen holen print(f"Rufe Job-Seite für Auftragsnamen auf: {job_settings_url}") driver.get(job_settings_url) job_name = WebDriverWait(driver, 10).until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text print(f"Auftragsname gefunden: '{job_name}'") # Führe den Test für das einzelne Album aus all_results = process_single_album_details(driver, test_album_url, job_name) save_results_to_csv(all_results) else: print("Skript wird beendet, da der Login fehlgeschlagen ist.") finally: print("\nSkript beendet. Schließe WebDriver.") if driver: driver.quit() if __name__ == "__main__": main()