Files
Brancheneinstufung2/scrape_fotograf.py
2025-07-16 13:24:48 +00:00

260 lines
11 KiB
Python

import json
import os
import time
import csv
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# --- Konfiguration & Konstanten ---
CREDENTIALS_FILE = 'fotograf_credentials.json'
OUTPUT_DIR = 'output'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login'
# --- Selektoren (zentral verwaltet für einfache Anpassung) ---
# GEÄNDERT: Bessere, spezifischere Selektoren
SELECTORS = {
"cookie_accept_button": "//button[normalize-space()='Allow all']", # NEU: Robuster XPath für den Cookie-Button
"login_user": "#FotografEmail",
"login_pass": "#FotografPassword",
"login_button": "#login-submit", # GEÄNDERT: Stabiler Selector über die ID
"job_name": "h1",
"album_rows": "div.table-row-group > div.table-row",
"album_link": "a.text-legacy-azure-200",
"child_name": "div:nth-of-type(4) strong",
"login_count": "div:nth-of-type(6) strong",
"buyer_link": "a.block:has(span:contains('Käufer'))",
"buyer_email": "div.flex:nth-of-type(4) span"
}
# NEU: Hilfsfunktion für Screenshots bei Fehlern
def take_error_screenshot(driver, error_name):
"""Speichert einen Screenshot des aktuellen Browserfensters in den output-Ordner."""
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(OUTPUT_DIR, filename)
try:
driver.save_screenshot(filepath)
print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}")
except Exception as e:
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver():
"""Initialisiert und konfiguriert den Chrome WebDriver."""
print("Initialisiere Chrome WebDriver...")
options = Options()
# Aktivieren Sie die nächste Zeile für die Fehlersuche, um zu sehen, was passiert
# options.add_argument('--headless=new')
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/google-chrome'
try:
driver = webdriver.Chrome(options=options)
return driver
except Exception as e:
print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
return None
def load_all_credentials():
"""Lädt alle Anmeldedaten aus der JSON-Datei."""
try:
with open(CREDENTIALS_FILE, 'r') as f:
return json.load(f)
except FileNotFoundError:
print(f"Fehler: {CREDENTIALS_FILE} nicht gefunden.")
return None
except json.JSONDecodeError:
print(f"Fehler: {CREDENTIALS_FILE} ist keine gültige JSON-Datei.")
return None
# GEÄNDERT: Die Login-Funktion klickt jetzt den Cookie-Banner weg
def login(driver, username, password):
"""Führt den Login-Vorgang auf fotograf.de durch."""
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 10) # 10 Sekunden sollten reichen
# NEU: Auf den Cookie-Banner warten und ihn akzeptieren
try:
print("Suche nach Cookie-Banner...")
# Wir verwenden hier XPath, da es für Textsuche sehr gut geeignet ist.
cookie_button = wait.until(EC.element_to_be_clickable((By.XPATH, SELECTORS["cookie_accept_button"])))
cookie_button.click()
print("Cookie-Banner akzeptiert.")
time.sleep(1) # Kurze Pause, damit der Banner verschwindet
except TimeoutException:
# Wenn der Banner nicht erscheint (z.B. bei späteren Besuchen), ist das auch OK.
print("Kein Cookie-Banner gefunden, fahre fort.")
# Warten und Anmeldedaten eingeben
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
# Warten auf erfolgreichen Login
wait.until(EC.url_contains('/config_dashboard/index'))
print("Login erfolgreich!")
return True
except TimeoutException:
print("Login fehlgeschlagen. Timeout beim Warten auf Elemente oder die nächste Seite.")
take_error_screenshot(driver, "login_timeout")
return False
except Exception as e:
print(f"Ein unerwarteter Fehler beim Login ist aufgetreten: {e}")
take_error_screenshot(driver, "login_unexpected")
return False
# ... (der Rest des Skripts bleibt unverändert) ...
def process_job(driver, job_url):
"""Verarbeitet einen einzelnen Fotoauftrag."""
print(f"\nVerarbeite Job-URL: {job_url}")
job_id = job_url.split('/')[-1]
albums_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
driver.get(settings_url)
wait = WebDriverWait(driver, 15)
try:
job_name = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text
print(f"Auftragsname: '{job_name}'")
except TimeoutException:
print("Konnte den Auftragsnamen nicht finden. Überprüfe den Selector oder die Seite.")
take_error_screenshot(driver, "job_name_not_found")
return []
print(f"Navigiere zur Alben-Übersicht: {albums_url}")
driver.get(albums_url)
albums_to_process = []
try:
album_rows = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, SELECTORS["album_rows"])))
print(f"{len(album_rows)} Alben gefunden. Prüfe auf Logins...")
for row in album_rows:
try:
if int(row.find_element(By.CSS_SELECTOR, SELECTORS["login_count"]).text) == 0:
child_name = row.find_element(By.CSS_SELECTOR, SELECTORS["child_name"]).text
album_link = row.find_element(By.CSS_SELECTOR, SELECTORS["album_link"]).get_attribute('href')
albums_to_process.append({"child_name": child_name, "album_detail_url": album_link})
print(f" -> Gefunden: Kind '{child_name}' mit 0 Logins.")
except (NoSuchElementException, ValueError):
pass
except TimeoutException:
print("Keine Alben auf der Seite gefunden oder Timeout beim Warten.")
take_error_screenshot(driver, "album_list_timeout")
return []
results = []
print(f"\nVerarbeite {len(albums_to_process)} Alben mit 0 Logins im Detail...")
for album in albums_to_process:
try:
print(f" Rufe Detailseite für '{album['child_name']}' auf...")
driver.get(album["album_detail_url"])
buyer_link_element = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_link"])))
buyer_name = buyer_link_element.text.replace('Käufer ', '').strip()
buyer_page_url = buyer_link_element.get_attribute('href')
print(f" Käufer gefunden: '{buyer_name}'. Rufe Käuferseite auf...")
driver.get(buyer_page_url)
buyer_email = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_email"]))).text
print(f" E-Mail gefunden: {buyer_email}")
results.append({
"Auftragsname": job_name,
"Kind Vorname": album["child_name"],
"Käufer Name": buyer_name,
"Käufer E-Mail": buyer_email,
})
time.sleep(1)
except TimeoutException:
print(f" Fehler: Timeout beim Verarbeiten der Detailseite für '{album['child_name']}'.")
take_error_screenshot(driver, f"detail_page_timeout_{album['child_name']}")
except Exception as e:
print(f" Ein unerwarteter Fehler bei '{album['child_name']}': {e}")
take_error_screenshot(driver, f"detail_page_unexpected_{album['child_name']}")
return results
def save_results_to_csv(results):
"""Speichert die gesammelten Daten in einer CSV-Datei."""
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=results[0].keys())
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
def get_profile_choice():
"""Zeigt ein Menü der verfügbaren Profile und gibt die Auswahl des Benutzers zurück."""
all_credentials = load_all_credentials()
if not all_credentials:
return None
profiles = list(all_credentials.keys())
print("\nBitte wähle das zu verwendende Profil:")
for i, profile_name in enumerate(profiles):
print(f" {i + 1}) {profile_name}")
while True:
try:
choice = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: "))
if 1 <= choice <= len(profiles):
selected_profile_name = profiles[choice - 1]
print(f"Profil '{selected_profile_name}' ausgewählt.")
return all_credentials[selected_profile_name]
else:
print("Ungültige Auswahl. Bitte versuche es erneut.")
except ValueError:
print("Ungültige Eingabe. Bitte gib eine Zahl ein.")
def main():
"""Hauptfunktion des Skripts."""
print("--- Fotograf.de Scraper für Nutzer ohne Logins ---")
credentials = get_profile_choice()
if not credentials:
return
job_url = input("Bitte gib die URL des zu bearbeitenden Fotoauftrags ein: ")
if "fotograf.de/config_jobs_settings/index/" not in job_url:
print("Dies scheint keine gültige URL für die Auftragseinstellungen zu sein.")
return
driver = setup_driver()
if not driver:
return
try:
if login(driver, credentials['username'], credentials['password']):
all_results = process_job(driver, job_url)
save_results_to_csv(all_results)
else:
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
finally:
print("\nSkript beendet. Schließe WebDriver.")
if driver:
driver.quit()
if __name__ == "__main__":
main()