scrape_fotograf.py aktualisiert

This commit is contained in:
2025-07-16 12:04:37 +00:00
parent 549e3dc77e
commit 4449f87ac4

View File

@@ -2,6 +2,7 @@ import json
import os import os
import time import time
import csv import csv
from datetime import datetime
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By from selenium.webdriver.common.by import By
@@ -21,19 +22,32 @@ SELECTORS = {
"login_pass": "#FotografPassword", "login_pass": "#FotografPassword",
"login_button": "button[type='submit']", "login_button": "button[type='submit']",
"job_name": "h1", "job_name": "h1",
"album_rows": "div.table-row-group > div.table-row", # Container für jede Album-Zeile "album_rows": "div.table-row-group > div.table-row",
"album_link": "a.text-legacy-azure-200", "album_link": "a.text-legacy-azure-200",
"child_name": "div:nth-of-type(4) strong", "child_name": "div:nth-of-type(4) strong",
"login_count": "div:nth-of-type(6) strong", "login_count": "div:nth-of-type(6) strong",
"buyer_link": "a.block:has(span:contains('Käufer'))", # Modernerer Selector "buyer_link": "a.block:has(span:contains('Käufer'))",
"buyer_email": "div.flex:nth-of-type(4) span" "buyer_email": "div.flex:nth-of-type(4) span"
} }
# NEU: Hilfsfunktion für Screenshots bei Fehlern
def take_error_screenshot(driver, error_name):
"""Speichert einen Screenshot des aktuellen Browserfensters in den output-Ordner."""
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(OUTPUT_DIR, filename)
try:
driver.save_screenshot(filepath)
print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}")
except Exception as e:
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver(): def setup_driver():
"""Initialisiert und konfiguriert den Chrome WebDriver.""" """Initialisiert und konfiguriert den Chrome WebDriver."""
print("Initialisiere Chrome WebDriver...") print("Initialisiere Chrome WebDriver...")
options = Options() options = Options()
options.add_argument('--headless') # Headless für Docker-Umgebungen empfohlen options.add_argument('--headless')
options.add_argument('--no-sandbox') options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200') options.add_argument('--window-size=1920,1200')
@@ -44,20 +58,14 @@ def setup_driver():
return driver return driver
except Exception as e: except Exception as e:
print(f"Fehler bei der Initialisierung des WebDrivers: {e}") print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
print("Stelle sicher, dass Google Chrome und der passende Chromedriver im Docker-Image korrekt installiert sind.")
return None return None
def load_credentials(profile_name): # GEÄNDERT: Lädt alle Credentials, damit wir ein Menü anzeigen können
"""Lädt Anmeldedaten für ein bestimmtes Profil aus der JSON-Datei.""" def load_all_credentials():
print(f"Lade Anmeldedaten für Profil: {profile_name}") """Lädt alle Anmeldedaten aus der JSON-Datei."""
try: try:
with open(CREDENTIALS_FILE, 'r') as f: with open(CREDENTIALS_FILE, 'r') as f:
all_credentials = json.load(f) return json.load(f)
if profile_name in all_credentials:
return all_credentials[profile_name]
else:
print(f"Fehler: Profil '{profile_name}' nicht in {CREDENTIALS_FILE} gefunden.")
return None
except FileNotFoundError: except FileNotFoundError:
print(f"Fehler: {CREDENTIALS_FILE} nicht gefunden.") print(f"Fehler: {CREDENTIALS_FILE} nicht gefunden.")
return None return None
@@ -72,28 +80,25 @@ def login(driver, username, password):
driver.get(LOGIN_URL) driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 15) wait = WebDriverWait(driver, 15)
# Warten und Anmeldedaten eingeben
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username) wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password) driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click() driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
# Warten auf erfolgreichen Login (z.B. durch prüfen der URL oder eines Dashboard-Elements)
wait.until(EC.url_contains('/config_dashboard/index')) wait.until(EC.url_contains('/config_dashboard/index'))
print("Login erfolgreich!") print("Login erfolgreich!")
return True return True
except TimeoutException: except TimeoutException:
print("Login fehlgeschlagen. Timeout beim Warten auf Elemente oder die nächste Seite.") print("Login fehlgeschlagen. Timeout beim Warten auf Elemente oder die nächste Seite.")
take_error_screenshot(driver, "login_timeout") # NEU: Screenshot bei Fehler
return False return False
except Exception as e: except Exception as e:
print(f"Ein unerwarteter Fehler beim Login ist aufgetreten: {e}") print(f"Ein unerwarteter Fehler beim Login ist aufgetreten: {e}")
take_error_screenshot(driver, "login_unexpected") # NEU: Screenshot bei Fehler
return False return False
def process_job(driver, job_url): def process_job(driver, job_url):
"""Verarbeitet einen einzelnen Fotoauftrag.""" """Verarbeitet einen einzelnen Fotoauftrag."""
print(f"\nVerarbeite Job-URL: {job_url}") print(f"\nVerarbeite Job-URL: {job_url}")
# 1. Zur Job-Seite navigieren und Job-Namen ausgeben
# Die URL muss zur Alben-Übersicht führen. Wir leiten sie aus der Job-URL ab.
job_id = job_url.split('/')[-1] job_id = job_url.split('/')[-1]
albums_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}" albums_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}" settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
@@ -106,42 +111,32 @@ def process_job(driver, job_url):
print(f"Auftragsname: '{job_name}'") print(f"Auftragsname: '{job_name}'")
except TimeoutException: except TimeoutException:
print("Konnte den Auftragsnamen nicht finden. Überprüfe den Selector oder die Seite.") print("Konnte den Auftragsnamen nicht finden. Überprüfe den Selector oder die Seite.")
take_error_screenshot(driver, "job_name_not_found") # NEU: Screenshot bei Fehler
return [] return []
# 2. Zur Alben-Übersicht navigieren
print(f"Navigiere zur Alben-Übersicht: {albums_url}") print(f"Navigiere zur Alben-Übersicht: {albums_url}")
driver.get(albums_url) driver.get(albums_url)
# 3. Alben mit 0 Logins identifizieren und deren Daten sammeln
albums_to_process = [] albums_to_process = []
try: try:
# Warten, bis die Album-Zeilen geladen sind
album_rows = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, SELECTORS["album_rows"]))) album_rows = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, SELECTORS["album_rows"])))
print(f"{len(album_rows)} Alben gefunden. Prüfe auf Logins...") print(f"{len(album_rows)} Alben gefunden. Prüfe auf Logins...")
for row in album_rows: for row in album_rows:
try: try:
login_count_text = row.find_element(By.CSS_SELECTOR, SELECTORS["login_count"]).text if int(row.find_element(By.CSS_SELECTOR, SELECTORS["login_count"]).text) == 0:
if int(login_count_text) == 0:
child_name = row.find_element(By.CSS_SELECTOR, SELECTORS["child_name"]).text child_name = row.find_element(By.CSS_SELECTOR, SELECTORS["child_name"]).text
album_link = row.find_element(By.CSS_SELECTOR, SELECTORS["album_link"]).get_attribute('href') album_link = row.find_element(By.CSS_SELECTOR, SELECTORS["album_link"]).get_attribute('href')
albums_to_process.append({"child_name": child_name, "album_detail_url": album_link})
albums_to_process.append({ print(f" -> Gefunden: Kind '{child_name}' mit 0 Logins.")
"child_name": child_name, except (NoSuchElementException, ValueError):
"album_detail_url": album_link
})
print(f" -> Gefunden: Kind '{child_name}' mit 0 Logins. Link wird zur späteren Verarbeitung gespeichert.")
except (NoSuchElementException, ValueError) as e:
# Ignoriere Zeilen, die nicht dem erwarteten Format entsprechen (z.B. Kopfzeile)
# print(f" -> Zeile übersprungen, konnte Daten nicht extrahieren: {e}")
pass pass
except TimeoutException: except TimeoutException:
print("Keine Alben auf der Seite gefunden oder Timeout beim Warten.") print("Keine Alben auf der Seite gefunden oder Timeout beim Warten.")
take_error_screenshot(driver, "album_list_timeout") # NEU: Screenshot bei Fehler
return [] return []
# 4. Käuferdetails für die identifizierten Alben abrufen
results = [] results = []
print(f"\nVerarbeite {len(albums_to_process)} Alben mit 0 Logins im Detail...") print(f"\nVerarbeite {len(albums_to_process)} Alben mit 0 Logins im Detail...")
for album in albums_to_process: for album in albums_to_process:
@@ -149,7 +144,6 @@ def process_job(driver, job_url):
print(f" Rufe Detailseite für '{album['child_name']}' auf...") print(f" Rufe Detailseite für '{album['child_name']}' auf...")
driver.get(album["album_detail_url"]) driver.get(album["album_detail_url"])
# Käufer-Link finden und aufrufen
buyer_link_element = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_link"]))) buyer_link_element = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_link"])))
buyer_name = buyer_link_element.text.replace('Käufer ', '').strip() buyer_name = buyer_link_element.text.replace('Käufer ', '').strip()
buyer_page_url = buyer_link_element.get_attribute('href') buyer_page_url = buyer_link_element.get_attribute('href')
@@ -157,7 +151,6 @@ def process_job(driver, job_url):
print(f" Käufer gefunden: '{buyer_name}'. Rufe Käuferseite auf...") print(f" Käufer gefunden: '{buyer_name}'. Rufe Käuferseite auf...")
driver.get(buyer_page_url) driver.get(buyer_page_url)
# E-Mail-Adresse extrahieren
buyer_email = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_email"]))).text buyer_email = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_email"]))).text
print(f" E-Mail gefunden: {buyer_email}") print(f" E-Mail gefunden: {buyer_email}")
@@ -167,12 +160,14 @@ def process_job(driver, job_url):
"Käufer Name": buyer_name, "Käufer Name": buyer_name,
"Käufer E-Mail": buyer_email, "Käufer E-Mail": buyer_email,
}) })
time.sleep(1) # Kurze Pause, um die Server nicht zu überlasten time.sleep(1)
except TimeoutException: except TimeoutException:
print(f" Fehler: Timeout beim Verarbeiten der Detailseite für '{album['child_name']}'. Element nicht gefunden.") print(f" Fehler: Timeout beim Verarbeiten der Detailseite für '{album['child_name']}'.")
take_error_screenshot(driver, f"detail_page_timeout_{album['child_name']}") # NEU: Screenshot bei Fehler
except Exception as e: except Exception as e:
print(f" Ein unerwarteter Fehler bei der Detailverarbeitung für '{album['child_name']}' ist aufgetreten: {e}") print(f" Ein unerwarteter Fehler bei '{album['child_name']}': {e}")
take_error_screenshot(driver, f"detail_page_unexpected_{album['child_name']}") # NEU: Screenshot bei Fehler
return results return results
@@ -181,35 +176,51 @@ def save_results_to_csv(results):
if not results: if not results:
print("\nKeine Daten zum Speichern vorhanden.") print("\nKeine Daten zum Speichern vorhanden.")
return return
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
# Sicherstellen, dass der output-Ordner existiert
os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
# CSV schreiben
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f: with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=results[0].keys()) writer = csv.DictWriter(f, fieldnames=results[0].keys())
writer.writeheader() writer.writeheader()
writer.writerows(results) writer.writerows(results)
print("Speichern erfolgreich!") print("Speichern erfolgreich!")
# NEU: Funktion zur Profilauswahl
def get_profile_choice():
"""Zeigt ein Menü der verfügbaren Profile und gibt die Auswahl des Benutzers zurück."""
all_credentials = load_all_credentials()
if not all_credentials:
return None
profiles = list(all_credentials.keys())
print("\nBitte wähle das zu verwendende Profil:")
for i, profile_name in enumerate(profiles):
print(f" {i + 1}) {profile_name}")
while True:
try:
choice = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: "))
if 1 <= choice <= len(profiles):
selected_profile_name = profiles[choice - 1]
print(f"Profil '{selected_profile_name}' ausgewählt.")
return all_credentials[selected_profile_name]
else:
print("Ungültige Auswahl. Bitte versuche es erneut.")
except ValueError:
print("Ungültige Eingabe. Bitte gib eine Zahl ein.")
def main(): def main():
"""Hauptfunktion des Skripts.""" """Hauptfunktion des Skripts."""
print("--- Fotograf.de Scraper für Nutzer ohne Logins ---") print("--- Fotograf.de Scraper für Nutzer ohne Logins ---")
# Profil abfragen # GEÄNDERT: Profilauswahl über Menü
profile_name = input("Bitte gib den Namen des zu verwendenden Profils ein (z.B. 'Kindergarten'): ") credentials = get_profile_choice()
credentials = load_credentials(profile_name)
if not credentials: if not credentials:
return return
# Job-URL abfragen
job_url = input("Bitte gib die URL des zu bearbeitenden Fotoauftrags ein: ") job_url = input("Bitte gib die URL des zu bearbeitenden Fotoauftrags ein: ")
if "fotograf.de" not in job_url: if "fotograf.de/config_jobs_settings/index/" not in job_url:
print("Dies scheint keine gültige fotograf.de URL zu sein.") print("Dies scheint keine gültige URL für die Auftragseinstellungen zu sein.")
return return
driver = setup_driver() driver = setup_driver()
@@ -218,16 +229,15 @@ def main():
try: try:
if login(driver, credentials['username'], credentials['password']): if login(driver, credentials['username'], credentials['password']):
# Verarbeitung starten
all_results = process_job(driver, job_url) all_results = process_job(driver, job_url)
# Ergebnisse speichern
save_results_to_csv(all_results) save_results_to_csv(all_results)
else: else:
print("Skript wird beendet, da der Login fehlgeschlagen ist.") print("Skript wird beendet, da der Login fehlgeschlagen ist.")
finally: finally:
print("\nSkript beendet. Schließe WebDriver.") print("\nSkript beendet. Schließe WebDriver.")
driver.quit() if driver:
driver.quit()
if __name__ == "__main__": if __name__ == "__main__":
main() main()