scrape_fotograf.py aktualisiert
This commit is contained in:
@@ -2,6 +2,7 @@ import json
|
||||
import os
|
||||
import time
|
||||
import csv
|
||||
from datetime import datetime
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.common.by import By
|
||||
@@ -21,19 +22,32 @@ SELECTORS = {
|
||||
"login_pass": "#FotografPassword",
|
||||
"login_button": "button[type='submit']",
|
||||
"job_name": "h1",
|
||||
"album_rows": "div.table-row-group > div.table-row", # Container für jede Album-Zeile
|
||||
"album_rows": "div.table-row-group > div.table-row",
|
||||
"album_link": "a.text-legacy-azure-200",
|
||||
"child_name": "div:nth-of-type(4) strong",
|
||||
"login_count": "div:nth-of-type(6) strong",
|
||||
"buyer_link": "a.block:has(span:contains('Käufer'))", # Modernerer Selector
|
||||
"buyer_link": "a.block:has(span:contains('Käufer'))",
|
||||
"buyer_email": "div.flex:nth-of-type(4) span"
|
||||
}
|
||||
|
||||
# NEU: Hilfsfunktion für Screenshots bei Fehlern
|
||||
def take_error_screenshot(driver, error_name):
|
||||
"""Speichert einen Screenshot des aktuellen Browserfensters in den output-Ordner."""
|
||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"error_{error_name}_{timestamp}.png"
|
||||
filepath = os.path.join(OUTPUT_DIR, filename)
|
||||
try:
|
||||
driver.save_screenshot(filepath)
|
||||
print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}")
|
||||
except Exception as e:
|
||||
print(f"!!! Konnte keinen Screenshot speichern: {e}")
|
||||
|
||||
def setup_driver():
|
||||
"""Initialisiert und konfiguriert den Chrome WebDriver."""
|
||||
print("Initialisiere Chrome WebDriver...")
|
||||
options = Options()
|
||||
options.add_argument('--headless') # Headless für Docker-Umgebungen empfohlen
|
||||
options.add_argument('--headless')
|
||||
options.add_argument('--no-sandbox')
|
||||
options.add_argument('--disable-dev-shm-usage')
|
||||
options.add_argument('--window-size=1920,1200')
|
||||
@@ -44,20 +58,14 @@ def setup_driver():
|
||||
return driver
|
||||
except Exception as e:
|
||||
print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
|
||||
print("Stelle sicher, dass Google Chrome und der passende Chromedriver im Docker-Image korrekt installiert sind.")
|
||||
return None
|
||||
|
||||
def load_credentials(profile_name):
|
||||
"""Lädt Anmeldedaten für ein bestimmtes Profil aus der JSON-Datei."""
|
||||
print(f"Lade Anmeldedaten für Profil: {profile_name}")
|
||||
# GEÄNDERT: Lädt alle Credentials, damit wir ein Menü anzeigen können
|
||||
def load_all_credentials():
|
||||
"""Lädt alle Anmeldedaten aus der JSON-Datei."""
|
||||
try:
|
||||
with open(CREDENTIALS_FILE, 'r') as f:
|
||||
all_credentials = json.load(f)
|
||||
if profile_name in all_credentials:
|
||||
return all_credentials[profile_name]
|
||||
else:
|
||||
print(f"Fehler: Profil '{profile_name}' nicht in {CREDENTIALS_FILE} gefunden.")
|
||||
return None
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
print(f"Fehler: {CREDENTIALS_FILE} nicht gefunden.")
|
||||
return None
|
||||
@@ -72,28 +80,25 @@ def login(driver, username, password):
|
||||
driver.get(LOGIN_URL)
|
||||
wait = WebDriverWait(driver, 15)
|
||||
|
||||
# Warten und Anmeldedaten eingeben
|
||||
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
|
||||
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
|
||||
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
|
||||
|
||||
# Warten auf erfolgreichen Login (z.B. durch prüfen der URL oder eines Dashboard-Elements)
|
||||
wait.until(EC.url_contains('/config_dashboard/index'))
|
||||
print("Login erfolgreich!")
|
||||
return True
|
||||
except TimeoutException:
|
||||
print("Login fehlgeschlagen. Timeout beim Warten auf Elemente oder die nächste Seite.")
|
||||
take_error_screenshot(driver, "login_timeout") # NEU: Screenshot bei Fehler
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Ein unerwarteter Fehler beim Login ist aufgetreten: {e}")
|
||||
take_error_screenshot(driver, "login_unexpected") # NEU: Screenshot bei Fehler
|
||||
return False
|
||||
|
||||
def process_job(driver, job_url):
|
||||
"""Verarbeitet einen einzelnen Fotoauftrag."""
|
||||
print(f"\nVerarbeite Job-URL: {job_url}")
|
||||
|
||||
# 1. Zur Job-Seite navigieren und Job-Namen ausgeben
|
||||
# Die URL muss zur Alben-Übersicht führen. Wir leiten sie aus der Job-URL ab.
|
||||
job_id = job_url.split('/')[-1]
|
||||
albums_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
|
||||
settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
|
||||
@@ -106,42 +111,32 @@ def process_job(driver, job_url):
|
||||
print(f"Auftragsname: '{job_name}'")
|
||||
except TimeoutException:
|
||||
print("Konnte den Auftragsnamen nicht finden. Überprüfe den Selector oder die Seite.")
|
||||
take_error_screenshot(driver, "job_name_not_found") # NEU: Screenshot bei Fehler
|
||||
return []
|
||||
|
||||
# 2. Zur Alben-Übersicht navigieren
|
||||
print(f"Navigiere zur Alben-Übersicht: {albums_url}")
|
||||
driver.get(albums_url)
|
||||
|
||||
# 3. Alben mit 0 Logins identifizieren und deren Daten sammeln
|
||||
albums_to_process = []
|
||||
try:
|
||||
# Warten, bis die Album-Zeilen geladen sind
|
||||
album_rows = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, SELECTORS["album_rows"])))
|
||||
print(f"{len(album_rows)} Alben gefunden. Prüfe auf Logins...")
|
||||
|
||||
for row in album_rows:
|
||||
try:
|
||||
login_count_text = row.find_element(By.CSS_SELECTOR, SELECTORS["login_count"]).text
|
||||
if int(login_count_text) == 0:
|
||||
if int(row.find_element(By.CSS_SELECTOR, SELECTORS["login_count"]).text) == 0:
|
||||
child_name = row.find_element(By.CSS_SELECTOR, SELECTORS["child_name"]).text
|
||||
album_link = row.find_element(By.CSS_SELECTOR, SELECTORS["album_link"]).get_attribute('href')
|
||||
|
||||
albums_to_process.append({
|
||||
"child_name": child_name,
|
||||
"album_detail_url": album_link
|
||||
})
|
||||
print(f" -> Gefunden: Kind '{child_name}' mit 0 Logins. Link wird zur späteren Verarbeitung gespeichert.")
|
||||
|
||||
except (NoSuchElementException, ValueError) as e:
|
||||
# Ignoriere Zeilen, die nicht dem erwarteten Format entsprechen (z.B. Kopfzeile)
|
||||
# print(f" -> Zeile übersprungen, konnte Daten nicht extrahieren: {e}")
|
||||
albums_to_process.append({"child_name": child_name, "album_detail_url": album_link})
|
||||
print(f" -> Gefunden: Kind '{child_name}' mit 0 Logins.")
|
||||
except (NoSuchElementException, ValueError):
|
||||
pass
|
||||
|
||||
except TimeoutException:
|
||||
print("Keine Alben auf der Seite gefunden oder Timeout beim Warten.")
|
||||
take_error_screenshot(driver, "album_list_timeout") # NEU: Screenshot bei Fehler
|
||||
return []
|
||||
|
||||
# 4. Käuferdetails für die identifizierten Alben abrufen
|
||||
results = []
|
||||
print(f"\nVerarbeite {len(albums_to_process)} Alben mit 0 Logins im Detail...")
|
||||
for album in albums_to_process:
|
||||
@@ -149,7 +144,6 @@ def process_job(driver, job_url):
|
||||
print(f" Rufe Detailseite für '{album['child_name']}' auf...")
|
||||
driver.get(album["album_detail_url"])
|
||||
|
||||
# Käufer-Link finden und aufrufen
|
||||
buyer_link_element = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_link"])))
|
||||
buyer_name = buyer_link_element.text.replace('Käufer ', '').strip()
|
||||
buyer_page_url = buyer_link_element.get_attribute('href')
|
||||
@@ -157,7 +151,6 @@ def process_job(driver, job_url):
|
||||
print(f" Käufer gefunden: '{buyer_name}'. Rufe Käuferseite auf...")
|
||||
driver.get(buyer_page_url)
|
||||
|
||||
# E-Mail-Adresse extrahieren
|
||||
buyer_email = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_email"]))).text
|
||||
print(f" E-Mail gefunden: {buyer_email}")
|
||||
|
||||
@@ -167,12 +160,14 @@ def process_job(driver, job_url):
|
||||
"Käufer Name": buyer_name,
|
||||
"Käufer E-Mail": buyer_email,
|
||||
})
|
||||
time.sleep(1) # Kurze Pause, um die Server nicht zu überlasten
|
||||
time.sleep(1)
|
||||
|
||||
except TimeoutException:
|
||||
print(f" Fehler: Timeout beim Verarbeiten der Detailseite für '{album['child_name']}'. Element nicht gefunden.")
|
||||
print(f" Fehler: Timeout beim Verarbeiten der Detailseite für '{album['child_name']}'.")
|
||||
take_error_screenshot(driver, f"detail_page_timeout_{album['child_name']}") # NEU: Screenshot bei Fehler
|
||||
except Exception as e:
|
||||
print(f" Ein unerwarteter Fehler bei der Detailverarbeitung für '{album['child_name']}' ist aufgetreten: {e}")
|
||||
print(f" Ein unerwarteter Fehler bei '{album['child_name']}': {e}")
|
||||
take_error_screenshot(driver, f"detail_page_unexpected_{album['child_name']}") # NEU: Screenshot bei Fehler
|
||||
|
||||
return results
|
||||
|
||||
@@ -181,35 +176,51 @@ def save_results_to_csv(results):
|
||||
if not results:
|
||||
print("\nKeine Daten zum Speichern vorhanden.")
|
||||
return
|
||||
|
||||
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
|
||||
|
||||
# Sicherstellen, dass der output-Ordner existiert
|
||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||
|
||||
# CSV schreiben
|
||||
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
|
||||
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
|
||||
writer = csv.DictWriter(f, fieldnames=results[0].keys())
|
||||
writer.writeheader()
|
||||
writer.writerows(results)
|
||||
|
||||
print("Speichern erfolgreich!")
|
||||
|
||||
# NEU: Funktion zur Profilauswahl
|
||||
def get_profile_choice():
|
||||
"""Zeigt ein Menü der verfügbaren Profile und gibt die Auswahl des Benutzers zurück."""
|
||||
all_credentials = load_all_credentials()
|
||||
if not all_credentials:
|
||||
return None
|
||||
|
||||
profiles = list(all_credentials.keys())
|
||||
print("\nBitte wähle das zu verwendende Profil:")
|
||||
for i, profile_name in enumerate(profiles):
|
||||
print(f" {i + 1}) {profile_name}")
|
||||
|
||||
while True:
|
||||
try:
|
||||
choice = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: "))
|
||||
if 1 <= choice <= len(profiles):
|
||||
selected_profile_name = profiles[choice - 1]
|
||||
print(f"Profil '{selected_profile_name}' ausgewählt.")
|
||||
return all_credentials[selected_profile_name]
|
||||
else:
|
||||
print("Ungültige Auswahl. Bitte versuche es erneut.")
|
||||
except ValueError:
|
||||
print("Ungültige Eingabe. Bitte gib eine Zahl ein.")
|
||||
|
||||
|
||||
def main():
|
||||
"""Hauptfunktion des Skripts."""
|
||||
print("--- Fotograf.de Scraper für Nutzer ohne Logins ---")
|
||||
|
||||
# Profil abfragen
|
||||
profile_name = input("Bitte gib den Namen des zu verwendenden Profils ein (z.B. 'Kindergarten'): ")
|
||||
credentials = load_credentials(profile_name)
|
||||
# GEÄNDERT: Profilauswahl über Menü
|
||||
credentials = get_profile_choice()
|
||||
if not credentials:
|
||||
return
|
||||
|
||||
# Job-URL abfragen
|
||||
job_url = input("Bitte gib die URL des zu bearbeitenden Fotoauftrags ein: ")
|
||||
if "fotograf.de" not in job_url:
|
||||
print("Dies scheint keine gültige fotograf.de URL zu sein.")
|
||||
if "fotograf.de/config_jobs_settings/index/" not in job_url:
|
||||
print("Dies scheint keine gültige URL für die Auftragseinstellungen zu sein.")
|
||||
return
|
||||
|
||||
driver = setup_driver()
|
||||
@@ -218,16 +229,15 @@ def main():
|
||||
|
||||
try:
|
||||
if login(driver, credentials['username'], credentials['password']):
|
||||
# Verarbeitung starten
|
||||
all_results = process_job(driver, job_url)
|
||||
# Ergebnisse speichern
|
||||
save_results_to_csv(all_results)
|
||||
else:
|
||||
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
|
||||
|
||||
finally:
|
||||
print("\nSkript beendet. Schließe WebDriver.")
|
||||
driver.quit()
|
||||
if driver:
|
||||
driver.quit()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user