Files
Brancheneinstufung2/scrape_fotograf.py
2025-07-16 15:10:50 +00:00

236 lines
10 KiB
Python

import json
import os
import time
import csv
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, InvalidSelectorException
# --- Konfiguration & Konstanten ---
CREDENTIALS_FILE = 'fotograf_credentials.json'
OUTPUT_DIR = 'output'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login'
# --- Selektoren (FINALE, KORREKTE VERSION) ---
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"job_name": "h1",
# Album-Übersicht (basierend auf Ihrem XPath, der <table> enthielt)
"album_overview_rows": "//table/tbody/tr",
"album_overview_link": ".//td[2]//a", # Link ist in der 2. Spalte
"album_overview_logins": ".//td[7]", # Logins sind in der 7. Spalte
# Einzelpersonen-Ansicht (innerhalb eines Albums)
"person_rows": "//section[.//h3[contains(., 'Einzelfotos')]]//table/tbody/tr",
"person_vorname": ".//td[4]",
"person_logins": ".//td[6]",
"person_buyer_link": ".//td[7]//a",
# Käufer-Detailseite
"buyer_email": "//span[contains(., '@')]"
}
def take_error_screenshot(driver, error_name):
# ... (Funktion bleibt unverändert) ...
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(OUTPUT_DIR, filename)
try:
driver.save_screenshot(filepath)
print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}")
except Exception as e:
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver():
# ... (Funktion bleibt unverändert) ...
print("Initialisiere Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/google-chrome'
try:
driver = webdriver.Chrome(options=options)
return driver
except Exception as e:
print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
return None
def login(driver, username, password):
# ... (Funktion bleibt unverändert) ...
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 10)
try:
print("Suche nach Cookie-Banner...")
cookie_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"])))
cookie_button.click()
print("Cookie-Banner akzeptiert.")
time.sleep(1)
except TimeoutException:
print("Kein Cookie-Banner gefunden, fahre fort.")
print("Fülle Anmeldeformular aus...")
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
print("Klicke auf Login...")
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
print("Warte auf die nächste Seite...")
wait.until(EC.url_contains('/config_dashboard/index'))
print("Login erfolgreich!")
return True
except Exception as e:
print(f"Login fehlgeschlagen. Grund: {e}")
take_error_screenshot(driver, "login_error")
return False
# Die finale, vollständige Logik
def process_full_job(driver, job_url):
wait = WebDriverWait(driver, 15)
# 1. Job-Namen holen
print(f"\nVerarbeite Job-URL: {job_url}")
driver.get(job_url)
try:
job_name = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text
print(f"Auftragsname: '{job_name}'")
except TimeoutException:
print("Konnte den Auftragsnamen nicht finden.")
take_error_screenshot(driver, "job_name_not_found")
return []
# 2. Alle Album-Links von der Übersichtsseite sammeln
job_id = job_url.split('/')[-1]
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}")
driver.get(albums_overview_url)
albums_to_visit = []
try:
album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"])))
print(f"{len(album_rows)} Alben in der Übersicht gefunden.")
for row in album_rows:
album_name = row.find_element(By.XPATH, SELECTORS["album_overview_link"]).text
album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"]).get_attribute('href')
albums_to_visit.append({"name": album_name, "url": album_link})
print(f"Sammeln der Album-Links abgeschlossen.")
except TimeoutException:
print("Konnte die Album-Liste nicht finden.")
take_error_screenshot(driver, "album_overview_error")
return []
# 3. Jedes Album besuchen und die Personen mit 0 Logins finden
final_results = []
for album in albums_to_visit:
print(f"\n--- Betrete Album: {album['name']} ---")
driver.get(album['url'])
try:
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
print(f"{len(person_rows)} Personen in diesem Album gefunden.")
for person_row in person_rows:
try:
login_count = int(person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text)
if login_count == 0:
vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text
print(f" --> ERFOLG: '{vorname}' mit 0 Logins gefunden!")
buyer_link_element = person_row.find_element(By.XPATH, SELECTORS["person_buyer_link"])
buyer_page_url = buyer_link_element.get_attribute('href')
# Temporär eine neue Seite öffnen, um die E-Mail zu holen
# (man könnte auch die buyer_page_url speichern und später abarbeiten)
current_window = driver.current_window_handle
driver.execute_script("window.open(arguments[0]);", buyer_page_url)
driver.switch_to.window(driver.window_handles[-1])
email = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["buyer_email"]))).text
print(f" E-Mail gefunden: {email}")
final_results.append({
"Auftragsname": job_name,
"Album": album['name'],
"Kind Vorname": vorname,
"Käufer E-Mail": email
})
# Tab schließen und zurückkehren
driver.close()
driver.switch_to.window(current_window)
except (ValueError, NoSuchElementException):
# Ignoriere Zeilen, die nicht dem Format entsprechen
continue
except TimeoutException:
print(f" Keine Personen-Tabelle im Album '{album['name']}' gefunden. Überspringe.")
take_error_screenshot(driver, f"album_{album['name']}_error")
continue
return final_results
# ... (Rest des Skripts: save_results_to_csv, get_profile_choice, etc. bleiben gleich) ...
def save_results_to_csv(results):
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Feldnamen an die neue Struktur anpassen
fieldnames = ["Auftragsname", "Album", "Kind Vorname", "Käufer E-Mail"]
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
def get_profile_choice():
all_credentials = load_all_credentials()
if not all_credentials: return None
profiles = list(all_credentials.keys())
print("\nBitte wähle das zu verwendende Profil:")
for i, p in enumerate(profiles): print(f" {i + 1}) {p}")
while True:
try:
c = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: "))
if 1 <= c <= len(profiles):
p_name = profiles[c - 1]
print(f"Profil '{p_name}' ausgewählt.")
return all_credentials[p_name]
else: print("Ungültige Auswahl.")
except ValueError: print("Ungültige Eingabe.")
def main():
print("--- Fotograf.de Scraper für Nutzer ohne Logins (FINALE VERSION) ---")
credentials = get_profile_choice()
if not credentials: return
job_url = input("Bitte gib die URL des zu bearbeitenden Fotoauftrags ein (Einstellungs-Seite): ")
if "fotograf.de/config_jobs_settings/index/" not in job_url:
print("Dies scheint keine gültige URL für die Auftragseinstellungen zu sein.")
return
driver = setup_driver()
if not driver: return
try:
if login(driver, credentials['username'], credentials['password']):
all_results = process_full_job(driver, job_url)
save_results_to_csv(all_results)
else:
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
finally:
print("\nSkript beendet. Schließe WebDriver.")
if driver: driver.quit()
if __name__ == "__main__":
main()