240 lines
10 KiB
Python
240 lines
10 KiB
Python
import json
|
|
import os
|
|
import time
|
|
import csv
|
|
from datetime import datetime
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import TimeoutException, NoSuchElementException
|
|
|
|
# --- Konfiguration & Konstanten ---
|
|
CREDENTIALS_FILE = 'fotograf_credentials.json'
|
|
OUTPUT_DIR = 'output'
|
|
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv')
|
|
LOGIN_URL = 'https://app.fotograf.de/login/login'
|
|
|
|
# --- Selektoren ---
|
|
# FINALE, KORREKTE VERSION mit präzisem Album-Row-Selector
|
|
SELECTORS = {
|
|
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
|
|
"login_user": "#login-email",
|
|
"login_pass": "#login-password",
|
|
"login_button": "#login-submit",
|
|
"job_name": "h1",
|
|
# NEU: Findet die Tabellen-Kopfzeile und wählt dann alle folgenden Zeilen als Geschwister aus.
|
|
"album_rows": "div:has(> div:contains('Fotos insgesamt')) ~ div",
|
|
"album_link": "a[href*='/config_jobs_photos/gallery/']",
|
|
# Dieser Selector sollte jetzt im korrekten Zeilen-Kontext funktionieren.
|
|
"login_count": "div:nth-child(7)",
|
|
"buyer_link": "a.block:has(span:contains('Käufer'))",
|
|
"buyer_email": "div.flex:nth-of-type(4) span"
|
|
}
|
|
|
|
def take_error_screenshot(driver, error_name):
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"error_{error_name}_{timestamp}.png"
|
|
filepath = os.path.join(OUTPUT_DIR, filename)
|
|
try:
|
|
driver.save_screenshot(filepath)
|
|
print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}")
|
|
except Exception as e:
|
|
print(f"!!! Konnte keinen Screenshot speichern: {e}")
|
|
|
|
def setup_driver():
|
|
print("Initialisiere Chrome WebDriver...")
|
|
options = Options()
|
|
options.add_argument('--headless')
|
|
options.add_argument('--no-sandbox')
|
|
options.add_argument('--disable-dev-shm-usage')
|
|
options.add_argument('--window-size=1920,1200')
|
|
options.binary_location = '/usr/bin/google-chrome'
|
|
|
|
try:
|
|
driver = webdriver.Chrome(options=options)
|
|
return driver
|
|
except Exception as e:
|
|
print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
|
|
return None
|
|
|
|
def load_all_credentials():
|
|
try:
|
|
with open(CREDENTIALS_FILE, 'r') as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return None
|
|
|
|
def login(driver, username, password):
|
|
print("Starte Login-Vorgang...")
|
|
try:
|
|
driver.get(LOGIN_URL)
|
|
wait = WebDriverWait(driver, 10)
|
|
try:
|
|
print("Suche nach Cookie-Banner...")
|
|
cookie_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"])))
|
|
cookie_button.click()
|
|
print("Cookie-Banner akzeptiert.")
|
|
time.sleep(1)
|
|
except TimeoutException:
|
|
print("Kein Cookie-Banner gefunden, fahre fort.")
|
|
print("Fülle Anmeldeformular aus...")
|
|
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
|
|
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
|
|
print("Klicke auf Login...")
|
|
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
|
|
print("Warte auf die nächste Seite...")
|
|
wait.until(EC.url_contains('/config_dashboard/index'))
|
|
print("Login erfolgreich!")
|
|
return True
|
|
except TimeoutException:
|
|
print("Login fehlgeschlagen.")
|
|
take_error_screenshot(driver, "login_timeout")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Ein unerwarteter Fehler beim Login: {e}")
|
|
take_error_screenshot(driver, "login_unexpected")
|
|
return False
|
|
|
|
def process_job(driver, job_url):
|
|
print(f"\nVerarbeite Job-URL: {job_url}")
|
|
job_id = job_url.split('/')[-1]
|
|
albums_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
|
|
settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
|
|
|
|
driver.get(settings_url)
|
|
wait = WebDriverWait(driver, 15)
|
|
|
|
try:
|
|
job_name = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text
|
|
print(f"Auftragsname: '{job_name}'")
|
|
except TimeoutException:
|
|
print("Konnte den Auftragsnamen nicht finden.")
|
|
take_error_screenshot(driver, "job_name_not_found")
|
|
return []
|
|
|
|
print(f"Navigiere zur Alben-Übersicht: {albums_url}")
|
|
driver.get(albums_url)
|
|
|
|
albums_to_process = []
|
|
try:
|
|
# Warten, bis die Kopfzeile da ist, um sicherzustellen, dass die Tabelle geladen ist
|
|
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div:contains('Fotos insgesamt')")))
|
|
time.sleep(1) # Kurze extra Wartezeit, falls die Zeilen nach der Kopfzeile nachladen
|
|
|
|
album_rows = driver.find_elements(By.CSS_SELECTOR, SELECTORS["album_rows"])
|
|
print(f"{len(album_rows)} Album-Zeilen gefunden. Prüfe auf Logins...")
|
|
|
|
for i, row in enumerate(album_rows):
|
|
print(f"\n--- Analysiere Zeile {i+1} ---")
|
|
try:
|
|
row_html = row.get_attribute('outerHTML')
|
|
print(f"DEBUG (HTML-Ausschnitt): {row_html[:400]}...")
|
|
|
|
login_count_element = row.find_element(By.CSS_SELECTOR, SELECTORS["login_count"])
|
|
login_count_text = login_count_element.text.strip()
|
|
|
|
print(f"DEBUG (Gefundener Login-Text): '{login_count_text}'")
|
|
|
|
if int(login_count_text) == 0:
|
|
album_link_element = row.find_element(By.CSS_SELECTOR, SELECTORS["album_link"])
|
|
child_name = album_link_element.text
|
|
album_link = album_link_element.get_attribute('href')
|
|
|
|
albums_to_process.append({
|
|
"child_name": child_name,
|
|
"album_detail_url": album_link
|
|
})
|
|
print(f" --> ERFOLG: Album '{child_name}' mit 0 Logins zur Verarbeitung hinzugefügt.")
|
|
else:
|
|
print(f" --> INFO: Album wird übersprungen (Logins > 0).")
|
|
|
|
except (NoSuchElementException, ValueError) as e:
|
|
print(f" --> FEHLER: Konnte Zeile nicht verarbeiten. Grund: {e}")
|
|
|
|
except TimeoutException:
|
|
print("Die Album-Tabelle wurde nicht gefunden.")
|
|
take_error_screenshot(driver, "album_list_timeout")
|
|
return []
|
|
|
|
results = []
|
|
print(f"\nVerarbeite {len(albums_to_process)} Alben mit 0 Logins im Detail...")
|
|
for album in albums_to_process:
|
|
try:
|
|
print(f" Rufe Detailseite für '{album['child_name']}' auf...")
|
|
driver.get(album["album_detail_url"])
|
|
buyer_link_element = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_link"])))
|
|
buyer_name = buyer_link_element.text.replace('Käufer ', '').strip()
|
|
buyer_page_url = buyer_link_element.get_attribute('href')
|
|
print(f" Käufer gefunden: '{buyer_name}'. Rufe Käuferseite auf...")
|
|
driver.get(buyer_page_url)
|
|
buyer_email = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["buyer_email"]))).text
|
|
print(f" E-Mail gefunden: {buyer_email}")
|
|
results.append({
|
|
"Auftragsname": job_name,
|
|
"Kind Vorname": album["child_name"],
|
|
"Käufer Name": buyer_name,
|
|
"Käufer E-Mail": buyer_email,
|
|
})
|
|
time.sleep(1)
|
|
except TimeoutException:
|
|
print(f" Fehler: Timeout bei '{album['child_name']}'.")
|
|
take_error_screenshot(driver, f"detail_page_timeout_{album['child_name']}")
|
|
except Exception as e:
|
|
print(f" Unerwarteter Fehler bei '{album['child_name']}': {e}")
|
|
take_error_screenshot(driver, f"detail_page_unexpected_{album['child_name']}")
|
|
|
|
return results
|
|
|
|
def save_results_to_csv(results):
|
|
if not results:
|
|
print("\nKeine Daten zum Speichern vorhanden.")
|
|
return
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
|
|
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=results[0].keys())
|
|
writer.writeheader()
|
|
writer.writerows(results)
|
|
print("Speichern erfolgreich!")
|
|
|
|
def get_profile_choice():
|
|
all_credentials = load_all_credentials()
|
|
if not all_credentials: return None
|
|
profiles = list(all_credentials.keys())
|
|
print("\nBitte wähle das zu verwendende Profil:")
|
|
for i, p in enumerate(profiles): print(f" {i + 1}) {p}")
|
|
while True:
|
|
try:
|
|
c = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: "))
|
|
if 1 <= c <= len(profiles):
|
|
p_name = profiles[c - 1]
|
|
print(f"Profil '{p_name}' ausgewählt.")
|
|
return all_credentials[p_name]
|
|
else: print("Ungültige Auswahl.")
|
|
except ValueError: print("Ungültige Eingabe.")
|
|
|
|
def main():
|
|
print("--- Fotograf.de Scraper für Nutzer ohne Logins ---")
|
|
credentials = get_profile_choice()
|
|
if not credentials: return
|
|
job_url = input("Bitte gib die URL des zu bearbeitenden Fotoauftrags ein: ")
|
|
if "fotograf.de/config_jobs_settings/index/" not in job_url:
|
|
print("Dies scheint keine gültige URL zu sein.")
|
|
return
|
|
driver = setup_driver()
|
|
if not driver: return
|
|
try:
|
|
if login(driver, credentials['username'], credentials['password']):
|
|
all_results = process_job(driver, job_url)
|
|
save_results_to_csv(all_results)
|
|
else:
|
|
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
|
|
finally:
|
|
print("\nSkript beendet. Schließe WebDriver.")
|
|
if driver: driver.quit()
|
|
|
|
if __name__ == "__main__":
|
|
main() |