Files
Brancheneinstufung2/scrape_fotograf.py
Floke 18ffe86eda Ergänzung um hat gekauft
Erweiterung, dass Käufer ausgeschlossen werden.
2025-07-17 10:31:40 +00:00

321 lines
15 KiB
Python

import json
import os
import time
import csv
import math
import re
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException, InvalidArgumentException
# --- Konfiguration & Konstanten ---
CREDENTIALS_FILE = 'fotograf_credentials.json'
OUTPUT_DIR = 'output'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'supermailer_fertige_liste.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login'
# --- Selektoren (FINALE, VOLLSTÄNDIGE VERSION) ---
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"job_name": "h1",
"album_overview_rows": "//table/tbody/tr",
"album_overview_link": ".//td[2]//a",
"access_code_count": "//span[text()='Zugangscodes']/following-sibling::strong",
"person_rows": "//div[contains(@class, 'border-legacy-silver-550') and .//span[text()='Logins']]",
"person_vorname": ".//span[text()='Vorname']/following-sibling::strong",
"person_logins": ".//span[text()='Logins']/following-sibling::strong",
"person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]",
# NEU: Selector zur Überprüfung, ob ein Kauf getätigt wurde
"purchase_icon": ".//img[@alt='Bestellungen mit diesem Foto']",
"potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]",
"quick_login_url": "//a[@id='quick-login-url']",
"buyer_email": "//span[contains(., '@')]"
}
def take_error_screenshot(driver, error_name):
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(OUTPUT_DIR, filename)
try:
driver.save_screenshot(filepath)
print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}")
except Exception as e:
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver():
print("Initialisiere Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/google-chrome'
try:
driver = webdriver.Chrome(options=options)
return driver
except Exception as e:
print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
return None
def load_all_credentials():
try:
with open(CREDENTIALS_FILE, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def login(driver, username, password):
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 10)
try:
print("Suche nach Cookie-Banner...")
wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click()
print("Cookie-Banner akzeptiert.")
time.sleep(1)
except TimeoutException:
print("Kein Cookie-Banner gefunden, fahre fort.")
print("Fülle Anmeldeformular aus...")
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
print("Klicke auf Login...")
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
print("Warte auf die nächste Seite...")
wait.until(EC.url_contains('/config_dashboard/index'))
print("Login erfolgreich!")
return True
except Exception as e:
print(f"Login fehlgeschlagen. Grund: {e}")
take_error_screenshot(driver, "login_error")
return False
def process_full_job(driver, job_url):
wait = WebDriverWait(driver, 15)
try:
job_id_match = re.search(r'/(\d+)', job_url)
if not job_id_match:
raise ValueError("Konnte keine numerische Job-ID finden.")
job_id = job_id_match.group(1)
settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
except (AttributeError, IndexError, ValueError) as e:
print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren. Grund: {e}")
return []
print(f"\nVerarbeite Job-ID: {job_id}")
driver.get(settings_url)
try:
job_name = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text
print(f"Auftragsname: '{job_name}'")
except TimeoutException:
print("Konnte den Auftragsnamen nicht finden.")
take_error_screenshot(driver, "job_name_not_found")
return []
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}")
driver.get(albums_overview_url)
albums_to_visit = []
try:
album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"])))
print(f"{len(album_rows)} Alben in der Übersicht gefunden.")
for row in album_rows:
try:
album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"])
albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')})
except NoSuchElementException:
continue
print(f"{len(albums_to_visit)} gültige Album-Links gesammelt.")
except TimeoutException:
print("Konnte die Album-Liste nicht finden.")
take_error_screenshot(driver, "album_overview_error")
return []
final_results = []
for album in albums_to_visit:
print(f"\n--- Betrete Album: {album['name']} ---")
driver.get(album['url'])
try:
total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text
num_pages = math.ceil(int(total_codes_text) / 20)
print(f"Album hat {total_codes_text} Zugangscodes auf {num_pages} Seite(n).")
for page_num in range(1, num_pages + 1):
current_page_url = album['url']
if page_num > 1:
current_page_url += f"?page_guest_accesses={page_num}"
print(f" Verarbeite Seite {page_num}...")
driver.get(current_page_url)
num_persons = len(wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))))
print(f" {num_persons} Personen auf dieser Seite gefunden.")
for i in range(num_persons):
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
person_row = person_rows[i]
login_count_text = person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text
if int(login_count_text) <= 1:
vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text
# --- NEUE KAUF-PRÜFUNG ---
try:
# Finde den Foto-Container, der dem Info-Block folgt
photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]")
# Prüfe, ob darin ein Warenkorb-Icon existiert
purchase_icons = photo_container.find_elements(By.XPATH, SELECTORS["purchase_icon"])
if len(purchase_icons) > 0:
print(f" --> INFO: '{vorname}' hat bereits gekauft. Überspringe.")
continue # Springe zur nächsten Person
except NoSuchElementException:
# Kein Foto-Container gefunden, also auch kein Kauf
pass
# --- ENDE KAUF-PRÜFUNG ---
print(f" --> ERFOLG: '{vorname}' mit {login_count_text} Login(s) gefunden (und kein Kauf).")
access_code_page_url = person_row.find_element(By.XPATH, SELECTORS["person_access_code_link"]).get_attribute('href')
driver.get(access_code_page_url)
print(f" Navigiere zur Kommunikations-Seite für '{vorname}'...")
for attempt in range(3):
try:
wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["quick_login_url"])))
schnell_login_url = driver.find_element(By.XPATH, SELECTORS["quick_login_url"]).get_attribute('href')
potential_buyer_element = driver.find_element(By.XPATH, SELECTORS["potential_buyer_link"])
kaeufer_name = potential_buyer_element.text
print(f" Käufer: '{kaeufer_name}', Schnell-Login: GEFUNDEN")
potential_buyer_element.click()
print(f" Navigiere zur Käufer-Detailseite...")
email = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["buyer_email"]))).text
print(f" FINALE ERFOLG: E-Mail gefunden: {email}")
final_results.append({
"Name des Kindes": vorname,
"Name Käufer": kaeufer_name,
"E-Mail-Adresse Käufer": email,
"Schnell Login URL": schnell_login_url
})
break
except StaleElementReferenceException:
print(f" Timing-Fehler, Versuch {attempt + 1}/3...")
time.sleep(1)
if attempt == 2: raise
except TimeoutException:
print(f" Timeout beim Warten auf Details für '{vorname}'.")
take_error_screenshot(driver, f"timeout_error_{vorname}")
break
print(f" Kehre zurück zur Album-Seite {page_num}...")
driver.get(current_page_url)
wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"])))
except TimeoutException:
print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.")
take_error_screenshot(driver, f"album_{album['name']}_error")
continue
return final_results
def aggregate_results_by_email(results):
print("\nBeginne mit der Aggregation der Ergebnisse pro E-Mail-Adresse...")
aggregated_data = {}
for result in results:
email = result['E-Mail-Adresse Käufer']
if email not in aggregated_data:
aggregated_data[email] = {
'Name Käufer': result['Name Käufer'],
'E-Mail-Adresse Käufer': email,
'Kindernamen_list': [result['Name des Kindes']],
'LinksHTML_list': [f'<a href="{result["Schnell Login URL"]}">Fotos von {result["Name des Kindes"]}</a>']
}
else:
aggregated_data[email]['Kindernamen_list'].append(result['Name des Kindes'])
aggregated_data[email]['LinksHTML_list'].append(f'<a href="{result["Schnell Login URL"]}">Fotos von {result["Name des Kindes"]}</a>')
final_list = []
for email, data in aggregated_data.items():
final_list.append({
'Name Käufer': data['Name Käufer'],
'E-Mail-Adresse Käufer': email,
'Kindernamen': ' und '.join(data['Kindernamen_list']),
'LinksHTML': '<br><br>'.join(data['LinksHTML_list'])
})
print(f"Aggregation abgeschlossen. {len(results)} Roh-Einträge zu {len(final_list)} einzigartigen E-Mails zusammengefasst.")
return final_list
def save_aggregated_results_to_csv(results):
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
os.makedirs(OUTPUT_DIR, exist_ok=True)
fieldnames = ["Name Käufer", "E-Mail-Adresse Käufer", "Kindernamen", "LinksHTML"]
print(f"\nSpeichere {len(results)} aggregierte Ergebnisse in '{OUTPUT_FILE}'...")
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
def get_profile_choice():
all_credentials = load_all_credentials()
if not all_credentials: return None
profiles = list(all_credentials.keys())
print("\nBitte wähle das zu verwendende Profil:")
for i, p in enumerate(profiles): print(f" {i + 1}) {p}")
while True:
try:
c = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: "))
if 1 <= c <= len(profiles):
p_name = profiles[c - 1]
print(f"Profil '{p_name}' ausgewählt.")
return all_credentials[p_name]
else: print("Ungültige Auswahl.")
except ValueError: print("Ungültige Eingabe.")
def main():
print("--- Fotograf.de Scraper (mit Datenaggregation) ---")
credentials = get_profile_choice()
if not credentials: return
job_url_raw = input("Bitte gib eine beliebige URL des zu bearbeitenden Fotoauftrags ein: ")
match = re.search(r'(https?://[^\s]+)', job_url_raw)
if not match:
print("Keine gültige URL in der Eingabe gefunden.")
return
job_url = match.group(1).strip()
if "fotograf.de/config_jobs_" not in job_url or not re.search(r'/\d+', job_url):
print("Dies scheint keine gültige URL für einen Fotoauftrag zu sein.")
return
driver = setup_driver()
if not driver: return
try:
if login(driver, credentials['username'], credentials['password']):
raw_results = process_full_job(driver, job_url)
aggregated_results = aggregate_results_by_email(raw_results)
save_aggregated_results_to_csv(aggregated_results)
else:
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
finally:
print("\nSkript beendet. Schließe WebDriver.")
if driver: driver.quit()
if __name__ == "__main__":
main()