Files
Brancheneinstufung2/scrape_fotograf.py
Floke d8d3d1509e Letzte Version war Super
Hat gut funktioniert, jetzt versuchen wir noch die Pagination zu ergänzen
2025-07-16 18:51:29 +00:00

269 lines
12 KiB
Python

import json
import os
import time
import csv
import math
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException, InvalidArgumentException
# --- Konfiguration & Konstanten ---
CREDENTIALS_FILE = 'fotograf_credentials.json'
OUTPUT_DIR = 'output'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login'
# --- Selektoren (FINALE, VOLLSTÄNDIGE VERSION) ---
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"job_name": "h1",
"album_overview_rows": "//table/tbody/tr",
"album_overview_link": ".//td[2]//a",
# NEU: Selector für die Gesamtzahl der Zugangscodes
"access_code_count": "//span[text()='Zugangscodes']/following-sibling::strong",
"person_rows": "//div[contains(@class, 'border-legacy-silver-550') and .//span[text()='Logins']]",
"person_vorname": ".//span[text()='Vorname']/following-sibling::strong",
"person_logins": ".//span[text()='Logins']/following-sibling::strong",
"person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]",
"potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]",
"quick_login_url": "//a[@id='quick-login-url']",
"buyer_email": "//span[contains(., '@')]"
}
def take_error_screenshot(driver, error_name):
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(OUTPUT_DIR, filename)
try:
driver.save_screenshot(filepath)
print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}")
except Exception as e:
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver():
print("Initialisiere Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/google-chrome'
try:
driver = webdriver.Chrome(options=options)
return driver
except Exception as e:
print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
return None
def load_all_credentials():
try:
with open(CREDENTIALS_FILE, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def login(driver, username, password):
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 10)
try:
print("Suche nach Cookie-Banner...")
wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click()
print("Cookie-Banner akzeptiert.")
time.sleep(1)
except TimeoutException:
print("Kein Cookie-Banner gefunden, fahre fort.")
print("Fülle Anmeldeformular aus...")
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
print("Klicke auf Login...")
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
print("Warte auf die nächste Seite...")
wait.until(EC.url_contains('/config_dashboard/index'))
print("Login erfolgreich!")
return True
except Exception as e:
print(f"Login fehlgeschlagen. Grund: {e}")
take_error_screenshot(driver, "login_error")
return False
def process_full_job(driver, job_url):
wait = WebDriverWait(driver, 15)
print(f"\nVerarbeite Job-URL: {job_url}")
try:
driver.get(job_url)
except InvalidArgumentException:
print(f"!!! FEHLER: Die URL '{job_url}' wurde von Selenium als ungültig angesehen.")
return []
try:
job_name = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text
print(f"Auftragsname: '{job_name}'")
except TimeoutException:
print("Konnte den Auftragsnamen nicht finden.")
take_error_screenshot(driver, "job_name_not_found")
return []
job_id = job_url.split('/')[-1]
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}")
driver.get(albums_overview_url)
albums_to_visit = []
try:
album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"])))
print(f"{len(album_rows)} Alben in der Übersicht gefunden.")
for row in album_rows:
try:
album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"])
albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')})
except NoSuchElementException:
continue
print(f"{len(albums_to_visit)} gültige Album-Links gesammelt.")
except TimeoutException:
print("Konnte die Album-Liste nicht finden.")
take_error_screenshot(driver, "album_overview_error")
return []
final_results = []
for album in albums_to_visit:
print(f"\n--- Betrete Album: {album['name']} ---")
driver.get(album['url'])
try:
# NEU: Pagination-Logik
total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text
num_pages = math.ceil(int(total_codes_text) / 20)
print(f"Album hat {total_codes_text} Zugangscodes auf {num_pages} Seite(n).")
for page_num in range(1, num_pages + 1):
current_page_url = album['url']
if page_num > 1:
current_page_url += f"?page_guest_accesses={page_num}"
print(f" Verarbeite Seite {page_num}...")
driver.get(current_page_url)
num_persons = len(wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))))
print(f" {num_persons} Personen auf dieser Seite gefunden.")
for i in range(num_persons):
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
person_row = person_rows[i]
login_count_text = person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text
if int(login_count_text) == 0:
vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text
print(f" --> ERFOLG: '{vorname}' mit 0 Logins gefunden!")
access_code_page_url = person_row.find_element(By.XPATH, SELECTORS["person_access_code_link"]).get_attribute('href')
driver.get(access_code_page_url)
print(f" Navigiere zur Kommunikations-Seite für '{vorname}'...")
for attempt in range(3):
try:
wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["quick_login_url"])))
schnell_login_url = driver.find_element(By.XPATH, SELECTORS["quick_login_url"]).get_attribute('href')
potential_buyer_element = driver.find_element(By.XPATH, SELECTORS["potential_buyer_link"])
kaeufer_name = potential_buyer_element.text
print(f" Käufer: '{kaeufer_name}', Schnell-Login: GEFUNDEN")
potential_buyer_element.click()
print(f" Navigiere zur Käufer-Detailseite...")
email = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["buyer_email"]))).text
print(f" FINALE ERFOLG: E-Mail gefunden: {email}")
final_results.append({
"Name des Kindes": vorname,
"Name Käufer": kaeufer_name,
"E-Mail-Adresse Käufer": email,
"Schnell Login URL": schnell_login_url
})
break
except StaleElementReferenceException:
print(f" Timing-Fehler, Versuch {attempt + 1}/3...")
time.sleep(1)
if attempt == 2: raise
except TimeoutException:
print(f" Timeout beim Warten auf Details für '{vorname}'.")
take_error_screenshot(driver, f"timeout_error_{vorname}")
break
print(f" Kehre zurück zur Album-Seite {page_num}...")
driver.get(current_page_url)
wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"])))
except TimeoutException:
print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.")
take_error_screenshot(driver, f"album_{album['name']}_error")
continue
return final_results
def save_results_to_csv(results):
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
os.makedirs(OUTPUT_DIR, exist_ok=True)
fieldnames = ["Name des Kindes", "Name Käufer", "E-Mail-Adresse Käufer", "Schnell Login URL"]
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
def get_profile_choice():
all_credentials = load_all_credentials()
if not all_credentials: return None
profiles = list(all_credentials.keys())
print("\nBitte wähle das zu verwendende Profil:")
for i, p in enumerate(profiles): print(f" {i + 1}) {p}")
while True:
try:
c = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: "))
if 1 <= c <= len(profiles):
p_name = profiles[c - 1]
print(f"Profil '{p_name}' ausgewählt.")
return all_credentials[p_name]
else: print("Ungültige Auswahl.")
except ValueError: print("Ungültige Eingabe.")
def main():
print("--- Fotograf.de Scraper für Nutzer ohne Logins (FINALE VERSION) ---")
credentials = get_profile_choice()
if not credentials: return
job_url_raw = input("Bitte gib die URL des zu bearbeitenden Fotoauftrags ein (Einstellungs-Seite): ")
job_url_cleaned = job_url_raw.replace("\x1b[200~", "").replace("\x1b[201~", "")
job_url = job_url_cleaned.strip()
if "fotograf.de/config_jobs_settings/index/" not in job_url:
print("Dies scheint keine gültige URL für die Auftragseinstellungen zu sein.")
return
driver = setup_driver()
if not driver: return
try:
if login(driver, credentials['username'], credentials['password']):
all_results = process_full_job(driver, job_url)
save_results_to_csv(all_results)
else:
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
finally:
print("\nSkript beendet. Schließe WebDriver.")
if driver: driver.quit()
if __name__ == "__main__":
main()