Files
Brancheneinstufung2/scrape_fotograf.py
2025-07-16 14:53:23 +00:00

238 lines
9.7 KiB
Python

import json
import os
import time
import csv
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# --- Konfiguration & Konstanten ---
CREDENTIALS_FILE = 'fotograf_credentials.json'
OUTPUT_DIR = 'output'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login'
# --- Selektoren ---
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"job_name": "h1",
"album_rows": "div:has(a[href*='/config_jobs_photos/gallery/'])",
"album_link": "a[href*='/config_jobs_photos/gallery/']",
"login_count": "div:nth-child(7)",
"buyer_link": "a.block:has(span:contains('Käufer'))",
"buyer_email": "div.flex:nth-of-type(4) span"
}
def take_error_screenshot(driver, error_name):
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(OUTPUT_DIR, filename)
try:
driver.save_screenshot(filepath)
print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}")
except Exception as e:
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver():
print("Initialisiere Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/google-chrome'
try:
driver = webdriver.Chrome(options=options)
return driver
except Exception as e:
print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
return None
def load_all_credentials():
try:
with open(CREDENTIALS_FILE, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def login(driver, username, password):
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 10)
try:
print("Suche nach Cookie-Banner...")
cookie_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"])))
cookie_button.click()
print("Cookie-Banner akzeptiert.")
time.sleep(1)
except TimeoutException:
print("Kein Cookie-Banner gefunden, fahre fort.")
print("Fülle Anmeldeformular aus...")
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
print("Klicke auf Login...")
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
print("Warte auf die nächste Seite...")
wait.until(EC.url_contains('/config_dashboard/index'))
print("Login erfolgreich!")
return True
except TimeoutException:
print("Login fehlgeschlagen.")
take_error_screenshot(driver, "login_timeout")
return False
except Exception as e:
print(f"Ein unerwarteter Fehler beim Login: {e}")
take_error_screenshot(driver, "login_unexpected")
return False
def process_job(driver, job_url):
print(f"\nVerarbeite Job-URL: {job_url}")
job_id = job_url.split('/')[-1]
albums_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
driver.get(settings_url)
wait = WebDriverWait(driver, 15)
try:
job_name = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text
print(f"Auftragsname: '{job_name}'")
except TimeoutException:
print("Konnte den Auftragsnamen nicht finden.")
take_error_screenshot(driver, "job_name_not_found")
return []
print(f"Navigiere zur Alben-Übersicht: {albums_url}")
driver.get(albums_url)
albums_to_process = []
try:
album_rows = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, SELECTORS["album_rows"])))
print(f"{len(album_rows)} Alben gefunden. Prüfe auf Logins...")
for i, row in enumerate(album_rows):
print(f"\n--- Analysiere Zeile {i+1} ---")
try:
row_html = row.get_attribute('outerHTML')
print(f"DEBUG (HTML-Ausschnitt): {row_html[:400]}...")
login_count_element = row.find_element(By.CSS_SELECTOR, SELECTORS["login_count"])
login_count_text = login_count_element.text.strip()
print(f"DEBUG (Gefundener Login-Text): '{login_count_text}'")
if int(login_count_text) == 0:
album_link_element = row.find_element(By.CSS_SELECTOR, SELECTORS["album_link"])
child_name = album_link_element.text
album_link = album_link_element.get_attribute('href')
albums_to_process.append({
"child_name": child_name,
"album_detail_url": album_link
})
print(f" --> ERFOLG: Album '{child_name}' mit 0 Logins zur Verarbeitung hinzugefügt.")
else:
print(f" --> INFO: Album wird übersprungen (Logins > 0).")
except (NoSuchElementException, ValueError) as e:
print(f" --> FEHLER: Konnte Zeile nicht verarbeiten. Grund: {e}")
except TimeoutException:
print("Keine Alben auf der Seite gefunden oder Timeout beim Warten.")
take_error_screenshot(driver, "album_list_timeout")
return []
results = []
print(f"\nVerarbeite {len(albums_to_process)} Alben mit 0 Logins im Detail...")
if not albums_to_process:
# Hier geben wir die Funktion nur zurück, wenn keine Alben zu verarbeiten sind
pass # Placeholder
for album in albums_to_process:
try:
print(f" Rufe Detailseite für '{album['child_name']}' auf...")
driver.get(album["album_detail_url"])
buyer_link_element = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_link"])))
buyer_name = buyer_link_element.text.replace('Käufer ', '').strip()
buyer_page_url = buyer_link_element.get_attribute('href')
print(f" Käufer gefunden: '{buyer_name}'. Rufe Käuferseite auf...")
driver.get(buyer_page_url)
buyer_email = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_email"]))).text
print(f" E-Mail gefunden: {buyer_email}")
results.append({
"Auftragsname": job_name,
"Kind Vorname": album["child_name"],
"Käufer Name": buyer_name,
"Käufer E-Mail": buyer_email,
})
time.sleep(1)
except TimeoutException:
print(f" Fehler: Timeout bei '{album['child_name']}'.")
take_error_screenshot(driver, f"detail_page_timeout_{album['child_name']}")
except Exception as e:
print(f" Unerwarteter Fehler bei '{album['child_name']}': {e}")
take_error_screenshot(driver, f"detail_page_unexpected_{album['child_name']}")
return results
def save_results_to_csv(results):
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=results[0].keys())
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
def get_profile_choice():
all_credentials = load_all_credentials()
if not all_credentials: return None
profiles = list(all_credentials.keys())
print("\nBitte wähle das zu verwendende Profil:")
for i, p in enumerate(profiles): print(f" {i + 1}) {p}")
while True:
try:
c = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: "))
if 1 <= c <= len(profiles):
p_name = profiles[c - 1]
print(f"Profil '{p_name}' ausgewählt.")
return all_credentials[p_name]
else: print("Ungültige Auswahl.")
except ValueError: print("Ungültige Eingabe.")
def main():
print("--- Fotograf.de Scraper für Nutzer ohne Logins ---")
# KORRIGIERTE ZEILE
credentials = get_profile_choice()
if not credentials: return
job_url = input("Bitte gib die URL des zu bearbeitenden Fotoauftrags ein: ")
if "fotograf.de/config_jobs_settings/index/" not in job_url:
print("Dies scheint keine gültige URL zu sein.")
return
driver = setup_driver()
if not driver: return
try:
if login(driver, credentials['username'], credentials['password']):
all_results = process_job(driver, job_url)
save_results_to_csv(all_results)
else:
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
finally:
print("\nSkript beendet. Schließe WebDriver.")
if driver: driver.quit()
if __name__ == "__main__":
main()