scrape_fotograf.py aktualisiert

This commit is contained in:
2025-07-16 15:52:46 +00:00
parent fd53dcf065
commit 5e21ae6dc1

View File

@@ -16,7 +16,7 @@ OUTPUT_DIR = 'output'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv') OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login' LOGIN_URL = 'https://app.fotograf.de/login/login'
# --- Selektoren (FINALE, VOLLSTÄNDIGE VERSION) --- # --- Selektoren (unverändert) ---
SELECTORS = { SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll", "cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email", "login_user": "#login-email",
@@ -35,7 +35,6 @@ SELECTORS = {
} }
def take_error_screenshot(driver, error_name): def take_error_screenshot(driver, error_name):
"""Speichert einen Screenshot des aktuellen Browserfensters in den output-Ordner."""
os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png" filename = f"error_{error_name}_{timestamp}.png"
@@ -47,7 +46,6 @@ def take_error_screenshot(driver, error_name):
print(f"!!! Konnte keinen Screenshot speichern: {e}") print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver(): def setup_driver():
"""Initialisiert und konfiguriert den Chrome WebDriver."""
print("Initialisiere Chrome WebDriver...") print("Initialisiere Chrome WebDriver...")
options = Options() options = Options()
options.add_argument('--headless') options.add_argument('--headless')
@@ -63,7 +61,6 @@ def setup_driver():
return None return None
def load_all_credentials(): def load_all_credentials():
"""Lädt alle Anmeldedaten aus der JSON-Datei."""
try: try:
with open(CREDENTIALS_FILE, 'r') as f: with open(CREDENTIALS_FILE, 'r') as f:
return json.load(f) return json.load(f)
@@ -71,7 +68,6 @@ def load_all_credentials():
return None return None
def login(driver, username, password): def login(driver, username, password):
"""Führt den Login-Vorgang auf fotograf.de durch."""
print("Starte Login-Vorgang...") print("Starte Login-Vorgang...")
try: try:
driver.get(LOGIN_URL) driver.get(LOGIN_URL)
@@ -98,7 +94,6 @@ def login(driver, username, password):
return False return False
def process_full_job(driver, job_url): def process_full_job(driver, job_url):
"""Die finale, robuste Hauptverarbeitungslogik."""
wait = WebDriverWait(driver, 15) wait = WebDriverWait(driver, 15)
print(f"\nVerarbeite Job-URL: {job_url}") print(f"\nVerarbeite Job-URL: {job_url}")
@@ -137,7 +132,7 @@ def process_full_job(driver, job_url):
print(f"\n--- Betrete Album: {album['name']} ---") print(f"\n--- Betrete Album: {album['name']} ---")
driver.get(album['url']) driver.get(album['url'])
try: try:
# Robuste Schleife, die den "Stale Element"-Fehler verhindert # GEÄNDERT: Robuste Schleife, die den Stale-Element-Fehler verhindert
num_persons = len(wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))) num_persons = len(wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))))
print(f"{num_persons} Personen in diesem Album gefunden.") print(f"{num_persons} Personen in diesem Album gefunden.")
@@ -146,19 +141,18 @@ def process_full_job(driver, job_url):
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))) person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
person_row = person_rows[i] person_row = person_rows[i]
try: # Führe Aktionen nur aus, wenn Logins 0 sind. So vermeiden wir unnötige Navigation.
login_count_text = person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text login_count_text = person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text
login_count = int(login_count_text) if int(login_count_text) == 0:
vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text
print(f" --> ERFOLG: '{vorname}' mit 0 Logins gefunden!")
if login_count == 0: access_code_page_url = person_row.find_element(By.XPATH, SELECTORS["person_access_code_link"]).get_attribute('href')
vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text driver.get(access_code_page_url)
print(f" --> ERFOLG: '{vorname}' mit 0 Logins gefunden!") print(f" Navigiere zur Kommunikations-Seite für '{vorname}'...")
access_code_page_url = person_row.find_element(By.XPATH, SELECTORS["person_access_code_link"]).get_attribute('href') try:
driver.get(access_code_page_url) # GEZIELTER TRY-BLOCK für die Interaktion auf der neuen Seite
print(f" Navigiere zur Kommunikations-Seite für '{vorname}'...")
# Alle Daten von dieser Seite extrahieren
schnell_login_url = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["quick_login_url"]))).get_attribute('href') schnell_login_url = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["quick_login_url"]))).get_attribute('href')
potential_buyer_element = wait.until(EC.element_to_be_clickable((By.XPATH, SELECTORS["potential_buyer_link"]))) potential_buyer_element = wait.until(EC.element_to_be_clickable((By.XPATH, SELECTORS["potential_buyer_link"])))
kaeufer_name = potential_buyer_element.text kaeufer_name = potential_buyer_element.text
@@ -177,15 +171,19 @@ def process_full_job(driver, job_url):
"E-Mail-Adresse Käufer": email, "E-Mail-Adresse Käufer": email,
"Schnell Login URL": schnell_login_url "Schnell Login URL": schnell_login_url
}) })
except (TimeoutException, StaleElementReferenceException) as e:
# Wichtig: Navigiere zur Album-Seite zurück, bevor die Schleife weitergeht print(f" FEHLER beim Verarbeiten der Detailseite für '{vorname}'. Überspringe. Grund: {e}")
print(f" Kehre zurück zur Album-Übersicht '{album['name']}'...") take_error_screenshot(driver, f"detail_page_error_{vorname}")
# Wichtig: Trotz Fehler zurückkehren, um die Schleife nicht zu sprengen
driver.get(album['url']) driver.get(album['url'])
# Warten, bis die Seite wieder bereit ist für die nächste Iteration
wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"]))) wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"])))
except (ValueError, NoSuchElementException, TimeoutException, StaleElementReferenceException) as e: continue
print(f" Fehler bei der Verarbeitung einer Person, überspringe: {e}")
continue # Wichtig: Navigiere zur Album-Seite zurück, bevor die Schleife weitergeht
print(f" Kehre zurück zur Album-Übersicht '{album['name']}'...")
driver.get(album['url'])
# Warten, bis die Seite wieder bereit ist für die nächste Iteration
wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"])))
except TimeoutException: except TimeoutException:
print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.") print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.")
take_error_screenshot(driver, f"album_{album['name']}_error") take_error_screenshot(driver, f"album_{album['name']}_error")
@@ -194,7 +192,6 @@ def process_full_job(driver, job_url):
return final_results return final_results
def save_results_to_csv(results): def save_results_to_csv(results):
"""Speichert die gesammelten Daten in einer CSV-Datei."""
if not results: if not results:
print("\nKeine Daten zum Speichern vorhanden.") print("\nKeine Daten zum Speichern vorhanden.")
return return
@@ -208,7 +205,6 @@ def save_results_to_csv(results):
print("Speichern erfolgreich!") print("Speichern erfolgreich!")
def get_profile_choice(): def get_profile_choice():
"""Zeigt ein Menü der verfügbaren Profile und gibt die Auswahl des Benutzers zurück."""
all_credentials = load_all_credentials() all_credentials = load_all_credentials()
if not all_credentials: return None if not all_credentials: return None
profiles = list(all_credentials.keys()) profiles = list(all_credentials.keys())
@@ -225,7 +221,6 @@ def get_profile_choice():
except ValueError: print("Ungültige Eingabe.") except ValueError: print("Ungültige Eingabe.")
def main(): def main():
"""Hauptfunktion des Skripts."""
print("--- Fotograf.de Scraper für Nutzer ohne Logins (FINALE VERSION) ---") print("--- Fotograf.de Scraper für Nutzer ohne Logins (FINALE VERSION) ---")
credentials = get_profile_choice() credentials = get_profile_choice()
if not credentials: return if not credentials: return