scrape_fotograf.py aktualisiert

This commit is contained in:
2025-07-16 15:49:03 +00:00
parent 8a42c27942
commit 8e5de724cf

View File

@@ -16,7 +16,7 @@ OUTPUT_DIR = 'output'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_ohne_logins.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login'
# --- Selektoren (unverändert) ---
# --- Selektoren (FINALE, VOLLSTÄNDIGE VERSION) ---
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
@@ -30,11 +30,12 @@ SELECTORS = {
"person_logins": ".//span[text()='Logins']/following-sibling::strong",
"person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]",
"potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]",
"buyer_email": "//span[contains(., '@')]"
"quick_login_url": "//a[@id='quick-login-url']",
"buyer_email": "//span[contains(., '@')]"
}
# ... (Funktionen take_error_screenshot, setup_driver, load_all_credentials, login bleiben unverändert) ...
def take_error_screenshot(driver, error_name):
"""Speichert einen Screenshot des aktuellen Browserfensters in den output-Ordner."""
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
@@ -46,6 +47,7 @@ def take_error_screenshot(driver, error_name):
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver():
"""Initialisiert und konfiguriert den Chrome WebDriver."""
print("Initialisiere Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
@@ -61,6 +63,7 @@ def setup_driver():
return None
def load_all_credentials():
"""Lädt alle Anmeldedaten aus der JSON-Datei."""
try:
with open(CREDENTIALS_FILE, 'r') as f:
return json.load(f)
@@ -68,6 +71,7 @@ def load_all_credentials():
return None
def login(driver, username, password):
"""Führt den Login-Vorgang auf fotograf.de durch."""
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
@@ -93,8 +97,8 @@ def login(driver, username, password):
take_error_screenshot(driver, "login_error")
return False
def process_full_job(driver, job_url):
"""Die finale, robuste Hauptverarbeitungslogik."""
wait = WebDriverWait(driver, 15)
print(f"\nVerarbeite Job-URL: {job_url}")
@@ -133,12 +137,19 @@ def process_full_job(driver, job_url):
print(f"\n--- Betrete Album: {album['name']} ---")
driver.get(album['url'])
try:
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
print(f"{len(person_rows)} Personen in diesem Album gefunden.")
# Robuste Schleife, die den "Stale Element"-Fehler verhindert
num_persons = len(wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))))
print(f"{num_persons} Personen in diesem Album gefunden.")
for person_row in person_rows:
for i in range(num_persons):
# In jeder Iteration die Elemente neu finden, um "Stale" zu vermeiden!
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
person_row = person_rows[i]
try:
login_count = int(person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text)
login_count_text = person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text
login_count = int(login_count_text)
if login_count == 0:
vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text
print(f" --> ERFOLG: '{vorname}' mit 0 Logins gefunden!")
@@ -147,34 +158,33 @@ def process_full_job(driver, job_url):
driver.get(access_code_page_url)
print(f" Navigiere zur Kommunikations-Seite für '{vorname}'...")
# ----- NEUER, ROBUSTER RETRY-BLOCK -----
# Dieser Block versucht den Klick bis zu 3 Mal, falls ein StaleElement-Fehler auftritt.
for attempt in range(3):
try:
wait.until(EC.element_to_be_clickable((By.XPATH, SELECTORS["potential_buyer_link"]))).click()
print(f" Navigiere zur Käufer-Detailseite...")
break # Erfolgreich, also Schleife verlassen
except StaleElementReferenceException:
print(f" Timing-Fehler (StaleElement), Versuch {attempt + 1}/3...")
time.sleep(0.5) # Kurze Pause vor dem nächsten Versuch
else:
# Wird ausgeführt, wenn die Schleife ohne 'break' endet
print(" Konnte Käufer-Link auch nach mehreren Versuchen nicht klicken.")
raise # Den letzten Fehler erneut auslösen, um das Problem zu signalisieren
# ----- ENDE DES RETRY-BLOCKS -----
# Alle Daten von dieser Seite extrahieren
schnell_login_url = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["quick_login_url"]))).get_attribute('href')
potential_buyer_element = wait.until(EC.element_to_be_clickable((By.XPATH, SELECTORS["potential_buyer_link"])))
kaeufer_name = potential_buyer_element.text
print(f" Käufer: '{kaeufer_name}', Schnell-Login: GEFUNDEN")
potential_buyer_element.click()
print(f" Navigiere zur Käufer-Detailseite...")
email = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["buyer_email"]))).text
print(f" FINALE ERFOLG: E-Mail gefunden: {email}")
final_results.append({
"Auftragsname": job_name,
"Album": album['name'],
"Kind Vorname": vorname,
"Käufer E-Mail": email
"Name des Kindes": vorname,
"Name Käufer": kaeufer_name,
"E-Mail-Adresse Käufer": email,
"Schnell Login URL": schnell_login_url
})
driver.get(album['url']) # Zurück zur Album-Detailseite
except (ValueError, NoSuchElementException, TimeoutException):
# Wichtig: Navigiere zur Album-Seite zurück, bevor die Schleife weitergeht
print(f" Kehre zurück zur Album-Übersicht '{album['name']}'...")
driver.get(album['url'])
# Warten, bis die Seite wieder bereit ist für die nächste Iteration
wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"])))
except (ValueError, NoSuchElementException, TimeoutException, StaleElementReferenceException) as e:
print(f" Fehler bei der Verarbeitung einer Person, überspringe: {e}")
continue
except TimeoutException:
print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.")
@@ -183,13 +193,13 @@ def process_full_job(driver, job_url):
return final_results
# ... (Funktionen save_results_to_csv, get_profile_choice, main bleiben unverändert) ...
def save_results_to_csv(results):
"""Speichert die gesammelten Daten in einer CSV-Datei."""
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
os.makedirs(OUTPUT_DIR, exist_ok=True)
fieldnames = ["Auftragsname", "Album", "Kind Vorname", "Käufer E-Mail"]
fieldnames = ["Name des Kindes", "Name Käufer", "E-Mail-Adresse Käufer", "Schnell Login URL"]
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
@@ -198,6 +208,7 @@ def save_results_to_csv(results):
print("Speichern erfolgreich!")
def get_profile_choice():
"""Zeigt ein Menü der verfügbaren Profile und gibt die Auswahl des Benutzers zurück."""
all_credentials = load_all_credentials()
if not all_credentials: return None
profiles = list(all_credentials.keys())
@@ -214,6 +225,7 @@ def get_profile_choice():
except ValueError: print("Ungültige Eingabe.")
def main():
"""Hauptfunktion des Skripts."""
print("--- Fotograf.de Scraper für Nutzer ohne Logins (FINALE VERSION) ---")
credentials = get_profile_choice()
if not credentials: return