Files
Brancheneinstufung2/scrape_fotograf.py
2025-07-16 15:49:03 +00:00

251 lines
12 KiB
Python

import json
import os
import time
import csv
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
# --- Konfiguration & Konstanten ---
CREDENTIALS_FILE = 'fotograf_credentials.json'
OUTPUT_DIR = 'output'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login'
# --- Selektoren (FINALE, VOLLSTÄNDIGE VERSION) ---
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"job_name": "h1",
"album_overview_rows": "//table/tbody/tr",
"album_overview_link": ".//td[2]//a",
"person_rows": "//div[contains(@class, 'border-legacy-silver-550') and .//span[text()='Logins']]",
"person_vorname": ".//span[text()='Vorname']/following-sibling::strong",
"person_logins": ".//span[text()='Logins']/following-sibling::strong",
"person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]",
"potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]",
"quick_login_url": "//a[@id='quick-login-url']",
"buyer_email": "//span[contains(., '@')]"
}
def take_error_screenshot(driver, error_name):
"""Speichert einen Screenshot des aktuellen Browserfensters in den output-Ordner."""
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(OUTPUT_DIR, filename)
try:
driver.save_screenshot(filepath)
print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}")
except Exception as e:
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver():
"""Initialisiert und konfiguriert den Chrome WebDriver."""
print("Initialisiere Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/google-chrome'
try:
driver = webdriver.Chrome(options=options)
return driver
except Exception as e:
print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
return None
def load_all_credentials():
"""Lädt alle Anmeldedaten aus der JSON-Datei."""
try:
with open(CREDENTIALS_FILE, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def login(driver, username, password):
"""Führt den Login-Vorgang auf fotograf.de durch."""
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 10)
try:
print("Suche nach Cookie-Banner...")
wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click()
print("Cookie-Banner akzeptiert.")
time.sleep(1)
except TimeoutException:
print("Kein Cookie-Banner gefunden, fahre fort.")
print("Fülle Anmeldeformular aus...")
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
print("Klicke auf Login...")
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
print("Warte auf die nächste Seite...")
wait.until(EC.url_contains('/config_dashboard/index'))
print("Login erfolgreich!")
return True
except Exception as e:
print(f"Login fehlgeschlagen. Grund: {e}")
take_error_screenshot(driver, "login_error")
return False
def process_full_job(driver, job_url):
"""Die finale, robuste Hauptverarbeitungslogik."""
wait = WebDriverWait(driver, 15)
print(f"\nVerarbeite Job-URL: {job_url}")
driver.get(job_url)
try:
job_name = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text
print(f"Auftragsname: '{job_name}'")
except TimeoutException:
print("Konnte den Auftragsnamen nicht finden.")
take_error_screenshot(driver, "job_name_not_found")
return []
job_id = job_url.split('/')[-1]
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}")
driver.get(albums_overview_url)
albums_to_visit = []
try:
album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"])))
print(f"{len(album_rows)} Alben in der Übersicht gefunden.")
for row in album_rows:
try:
album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"])
albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')})
except NoSuchElementException:
continue
print(f"{len(albums_to_visit)} gültige Album-Links gesammelt.")
except TimeoutException:
print("Konnte die Album-Liste nicht finden.")
take_error_screenshot(driver, "album_overview_error")
return []
final_results = []
for album in albums_to_visit:
print(f"\n--- Betrete Album: {album['name']} ---")
driver.get(album['url'])
try:
# Robuste Schleife, die den "Stale Element"-Fehler verhindert
num_persons = len(wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))))
print(f"{num_persons} Personen in diesem Album gefunden.")
for i in range(num_persons):
# In jeder Iteration die Elemente neu finden, um "Stale" zu vermeiden!
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
person_row = person_rows[i]
try:
login_count_text = person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text
login_count = int(login_count_text)
if login_count == 0:
vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text
print(f" --> ERFOLG: '{vorname}' mit 0 Logins gefunden!")
access_code_page_url = person_row.find_element(By.XPATH, SELECTORS["person_access_code_link"]).get_attribute('href')
driver.get(access_code_page_url)
print(f" Navigiere zur Kommunikations-Seite für '{vorname}'...")
# Alle Daten von dieser Seite extrahieren
schnell_login_url = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["quick_login_url"]))).get_attribute('href')
potential_buyer_element = wait.until(EC.element_to_be_clickable((By.XPATH, SELECTORS["potential_buyer_link"])))
kaeufer_name = potential_buyer_element.text
print(f" Käufer: '{kaeufer_name}', Schnell-Login: GEFUNDEN")
potential_buyer_element.click()
print(f" Navigiere zur Käufer-Detailseite...")
email = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["buyer_email"]))).text
print(f" FINALE ERFOLG: E-Mail gefunden: {email}")
final_results.append({
"Name des Kindes": vorname,
"Name Käufer": kaeufer_name,
"E-Mail-Adresse Käufer": email,
"Schnell Login URL": schnell_login_url
})
# Wichtig: Navigiere zur Album-Seite zurück, bevor die Schleife weitergeht
print(f" Kehre zurück zur Album-Übersicht '{album['name']}'...")
driver.get(album['url'])
# Warten, bis die Seite wieder bereit ist für die nächste Iteration
wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"])))
except (ValueError, NoSuchElementException, TimeoutException, StaleElementReferenceException) as e:
print(f" Fehler bei der Verarbeitung einer Person, überspringe: {e}")
continue
except TimeoutException:
print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.")
take_error_screenshot(driver, f"album_{album['name']}_error")
continue
return final_results
def save_results_to_csv(results):
"""Speichert die gesammelten Daten in einer CSV-Datei."""
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
os.makedirs(OUTPUT_DIR, exist_ok=True)
fieldnames = ["Name des Kindes", "Name Käufer", "E-Mail-Adresse Käufer", "Schnell Login URL"]
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
def get_profile_choice():
"""Zeigt ein Menü der verfügbaren Profile und gibt die Auswahl des Benutzers zurück."""
all_credentials = load_all_credentials()
if not all_credentials: return None
profiles = list(all_credentials.keys())
print("\nBitte wähle das zu verwendende Profil:")
for i, p in enumerate(profiles): print(f" {i + 1}) {p}")
while True:
try:
c = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: "))
if 1 <= c <= len(profiles):
p_name = profiles[c - 1]
print(f"Profil '{p_name}' ausgewählt.")
return all_credentials[p_name]
else: print("Ungültige Auswahl.")
except ValueError: print("Ungültige Eingabe.")
def main():
"""Hauptfunktion des Skripts."""
print("--- Fotograf.de Scraper für Nutzer ohne Logins (FINALE VERSION) ---")
credentials = get_profile_choice()
if not credentials: return
job_url = input("Bitte gib die URL des zu bearbeitenden Fotoauftrags ein (Einstellungs-Seite): ")
if "fotograf.de/config_jobs_settings/index/" not in job_url:
print("Dies scheint keine gültige URL für die Auftragseinstellungen zu sein.")
return
driver = setup_driver()
if not driver: return
try:
if login(driver, credentials['username'], credentials['password']):
all_results = process_full_job(driver, job_url)
save_results_to_csv(all_results)
else:
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
finally:
print("\nSkript beendet. Schließe WebDriver.")
if driver: driver.quit()
if __name__ == "__main__":
main()