diff --git a/scrape_fotograf.py b/scrape_fotograf.py
index 9e60a0ad..d976ab4a 100644
--- a/scrape_fotograf.py
+++ b/scrape_fotograf.py
@@ -15,10 +15,9 @@ from selenium.common.exceptions import TimeoutException, NoSuchElementException,
# --- Konfiguration & Konstanten ---
CREDENTIALS_FILE = 'fotograf_credentials.json'
OUTPUT_DIR = 'output'
-OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'supermailer_fertige_liste.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login'
-# --- Selektoren (FINALE, VOLLSTÄNDIGE VERSION) ---
+# --- Selektoren ---
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
@@ -32,14 +31,16 @@ SELECTORS = {
"person_vorname": ".//span[text()='Vorname']/following-sibling::strong",
"person_logins": ".//span[text()='Logins']/following-sibling::strong",
"person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]",
- # NEU: Selector zur Überprüfung, ob ein Kauf getätigt wurde
- "purchase_icon": ".//img[@alt='Bestellungen mit diesem Foto']",
+ # Selektoren für die Statistik-Zählung
+ "person_all_photos": ".//div[@data-key]",
+ "person_purchased_photos": ".//div[@data-key and .//img[@alt='Bestellungen mit diesem Foto']]",
"potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]",
"quick_login_url": "//a[@id='quick-login-url']",
"buyer_email": "//span[contains(., '@')]"
}
def take_error_screenshot(driver, error_name):
+ """Speichert einen Screenshot des aktuellen Browserfensters in den output-Ordner."""
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
@@ -51,6 +52,7 @@ def take_error_screenshot(driver, error_name):
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver():
+ """Initialisiert und konfiguriert den Chrome WebDriver."""
print("Initialisiere Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
@@ -66,6 +68,7 @@ def setup_driver():
return None
def load_all_credentials():
+ """Lädt alle Anmeldedaten aus der JSON-Datei."""
try:
with open(CREDENTIALS_FILE, 'r') as f:
return json.load(f)
@@ -73,6 +76,7 @@ def load_all_credentials():
return None
def login(driver, username, password):
+ """Führt den Login-Vorgang auf fotograf.de durch."""
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
@@ -98,16 +102,17 @@ def login(driver, username, password):
take_error_screenshot(driver, "login_error")
return False
-def process_full_job(driver, job_url):
+# --- Modus 1: E-Mail-Listen-Erstellung ---
+def process_reminder_mode(driver, job_url):
+ """Sammelt Daten für die E-Mail-Erinnerungskampagne."""
wait = WebDriverWait(driver, 15)
try:
job_id_match = re.search(r'/(\d+)', job_url)
- if not job_id_match:
- raise ValueError("Konnte keine numerische Job-ID finden.")
+ if not job_id_match: raise ValueError("Konnte keine numerische Job-ID finden.")
job_id = job_id_match.group(1)
settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
- except (AttributeError, IndexError, ValueError) as e:
+ except Exception as e:
print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren. Grund: {e}")
return []
@@ -118,7 +123,6 @@ def process_full_job(driver, job_url):
print(f"Auftragsname: '{job_name}'")
except TimeoutException:
print("Konnte den Auftragsnamen nicht finden.")
- take_error_screenshot(driver, "job_name_not_found")
return []
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
@@ -138,7 +142,6 @@ def process_full_job(driver, job_url):
print(f"{len(albums_to_visit)} gültige Album-Links gesammelt.")
except TimeoutException:
print("Konnte die Album-Liste nicht finden.")
- take_error_screenshot(driver, "album_overview_error")
return []
final_results = []
@@ -152,8 +155,7 @@ def process_full_job(driver, job_url):
for page_num in range(1, num_pages + 1):
current_page_url = album['url']
- if page_num > 1:
- current_page_url += f"?page_guest_accesses={page_num}"
+ if page_num > 1: current_page_url += f"?page_guest_accesses={page_num}"
print(f" Verarbeite Seite {page_num}...")
driver.get(current_page_url)
@@ -170,19 +172,14 @@ def process_full_job(driver, job_url):
if int(login_count_text) <= 1:
vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text
- # --- NEUE KAUF-PRÜFUNG ---
try:
- # Finde den Foto-Container, der dem Info-Block folgt
photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]")
- # Prüfe, ob darin ein Warenkorb-Icon existiert
- purchase_icons = photo_container.find_elements(By.XPATH, SELECTORS["purchase_icon"])
+ purchase_icons = photo_container.find_elements(By.XPATH, SELECTORS["person_purchased_photos"])
if len(purchase_icons) > 0:
print(f" --> INFO: '{vorname}' hat bereits gekauft. Überspringe.")
- continue # Springe zur nächsten Person
+ continue
except NoSuchElementException:
- # Kein Foto-Container gefunden, also auch kein Kauf
pass
- # --- ENDE KAUF-PRÜFUNG ---
print(f" --> ERFOLG: '{vorname}' mit {login_count_text} Login(s) gefunden (und kein Kauf).")
@@ -226,51 +223,157 @@ def process_full_job(driver, job_url):
wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"])))
except TimeoutException:
print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.")
- take_error_screenshot(driver, f"album_{album['name']}_error")
continue
return final_results
def aggregate_results_by_email(results):
+ """Fasst Ergebnisse pro E-Mail-Adresse zusammen."""
print("\nBeginne mit der Aggregation der Ergebnisse pro E-Mail-Adresse...")
aggregated_data = {}
for result in results:
email = result['E-Mail-Adresse Käufer']
+ child_name = "Familienbilder" if result['Name des Kindes'] == "Familie" else result['Name des Kindes']
+ html_link = f'Fotos von {child_name}'
if email not in aggregated_data:
aggregated_data[email] = {
- 'Name Käufer': result['Name Käufer'],
+ 'Name Käufer': result['Name Käufer'].split(' ')[0],
'E-Mail-Adresse Käufer': email,
- 'Kindernamen_list': [result['Name des Kindes']],
- 'LinksHTML_list': [f'Fotos von {result["Name des Kindes"]}']
+ 'Kindernamen_list': [child_name],
+ 'LinksHTML_list': [html_link]
}
else:
- aggregated_data[email]['Kindernamen_list'].append(result['Name des Kindes'])
- aggregated_data[email]['LinksHTML_list'].append(f'Fotos von {result["Name des Kindes"]}')
+ aggregated_data[email]['Kindernamen_list'].append(child_name)
+ aggregated_data[email]['LinksHTML_list'].append(html_link)
+
final_list = []
for email, data in aggregated_data.items():
+ names_list = data['Kindernamen_list']
+ if len(names_list) > 2:
+ kindernamen_str = ', '.join(names_list[:-1]) + ' und ' + names_list[-1]
+ else:
+ kindernamen_str = ' und '.join(names_list)
final_list.append({
'Name Käufer': data['Name Käufer'],
'E-Mail-Adresse Käufer': email,
- 'Kindernamen': ' und '.join(data['Kindernamen_list']),
+ 'Kindernamen': kindernamen_str,
'LinksHTML': '
'.join(data['LinksHTML_list'])
})
print(f"Aggregation abgeschlossen. {len(results)} Roh-Einträge zu {len(final_list)} einzigartigen E-Mails zusammengefasst.")
return final_list
def save_aggregated_results_to_csv(results):
+ """Speichert die aggregierten Daten für Supermailer."""
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
+
+ output_file = os.path.join(OUTPUT_DIR, 'supermailer_fertige_liste.csv')
os.makedirs(OUTPUT_DIR, exist_ok=True)
fieldnames = ["Name Käufer", "E-Mail-Adresse Käufer", "Kindernamen", "LinksHTML"]
- print(f"\nSpeichere {len(results)} aggregierte Ergebnisse in '{OUTPUT_FILE}'...")
- with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
+ print(f"\nSpeichere {len(results)} aggregierte Ergebnisse in '{output_file}'...")
+ with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
+# --- Modus 2: Statistik-Auswertung ---
+def process_statistics_mode(driver, job_url):
+ """Sammelt und druckt Statistiken pro Album."""
+ wait = WebDriverWait(driver, 15)
+
+ try:
+ job_id = re.search(r'/(\d+)', job_url).group(1)
+ except Exception:
+ print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren.")
+ return []
+
+ albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
+ print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}")
+ driver.get(albums_overview_url)
+
+ albums_to_visit = []
+ try:
+ album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"])))
+ for row in album_rows:
+ try:
+ album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"])
+ albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')})
+ except NoSuchElementException: continue
+ except TimeoutException:
+ print("Konnte die Album-Liste nicht finden.")
+ return []
+
+ statistics = []
+ print("\n--- STATISTIK-AUSWERTUNG ---")
+ for album in albums_to_visit:
+ print(f"\nAlbum: {album['name']}")
+ driver.get(album['url'])
+ try:
+ total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text
+ num_pages = math.ceil(int(total_codes_text) / 20)
+
+ total_children_in_album = 0
+ children_with_purchase = 0
+ children_with_all_purchased = 0
+
+ for page_num in range(1, num_pages + 1):
+ if page_num > 1: driver.get(album['url'] + f"?page_guest_accesses={page_num}")
+
+ person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
+
+ for person_row in person_rows:
+ total_children_in_album += 1
+ try:
+ photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]")
+
+ num_total_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_all_photos"]))
+ num_purchased_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_purchased_photos"]))
+
+ if num_purchased_photos > 0:
+ children_with_purchase += 1
+
+ if num_total_photos > 0 and num_total_photos == num_purchased_photos:
+ children_with_all_purchased += 1
+ except NoSuchElementException:
+ continue
+
+ print(f" - Kinder insgesamt: {total_children_in_album}")
+ print(f" - Kinder mit (mind. 1) Kauf: {children_with_purchase}")
+ print(f" - Kinder (Alle Bilder gekauft): {children_with_all_purchased}")
+ statistics.append({
+ "Album": album['name'],
+ "Kinder insgesamt": total_children_in_album,
+ "Kinder mit Käufen": children_with_purchase,
+ "Kinder (Alle Bilder gekauft)": children_with_all_purchased
+ })
+
+ except Exception as e:
+ print(f" Fehler bei der Auswertung dieses Albums: {e}")
+ continue
+
+ return statistics
+
+def save_statistics_to_csv(results):
+ """Speichert die Statistik-Daten in einer CSV-Datei."""
+ if not results:
+ print("\nKeine Statistikdaten zum Speichern vorhanden.")
+ return
+
+ output_file = os.path.join(OUTPUT_DIR, 'job_statistik.csv')
+ os.makedirs(OUTPUT_DIR, exist_ok=True)
+ fieldnames = ["Album", "Kinder insgesamt", "Kinder mit Käufen", "Kinder (Alle Bilder gekauft)"]
+ print(f"\nSpeichere Statistik für {len(results)} Alben in '{output_file}'...")
+ with open(output_file, 'w', newline='', encoding='utf-8') as f:
+ writer = csv.DictWriter(f, fieldnames=fieldnames)
+ writer.writeheader()
+ writer.writerows(results)
+ print("Speichern erfolgreich!")
+
+# --- Haupt-Logik ---
def get_profile_choice():
+ """Zeigt ein Menü zur Profilauswahl."""
all_credentials = load_all_credentials()
if not all_credentials: return None
profiles = list(all_credentials.keys())
@@ -287,11 +390,18 @@ def get_profile_choice():
except ValueError: print("Ungültige Eingabe.")
def main():
- print("--- Fotograf.de Scraper (mit Datenaggregation) ---")
+ """Hauptfunktion des Skripts."""
+ print("--- Fotograf.de Scraper (v3.1 - The Analyst) ---")
+
+ while True:
+ mode = input("Bitte Modus wählen:\n 1) E-Mail-Liste erstellen\n 2) Statistik auswerten\nWahl: ")
+ if mode in ['1', '2']: break
+ else: print("Ungültige Eingabe.")
+
credentials = get_profile_choice()
if not credentials: return
- job_url_raw = input("Bitte gib eine beliebige URL des zu bearbeitenden Fotoauftrags ein: ")
+ job_url_raw = input("Bitte eine beliebige URL des zu bearbeitenden Fotoauftrags ein: ")
match = re.search(r'(https?://[^\s]+)', job_url_raw)
if not match:
@@ -308,9 +418,13 @@ def main():
try:
if login(driver, credentials['username'], credentials['password']):
- raw_results = process_full_job(driver, job_url)
- aggregated_results = aggregate_results_by_email(raw_results)
- save_aggregated_results_to_csv(aggregated_results)
+ if mode == '1':
+ raw_results = process_reminder_mode(driver, job_url)
+ aggregated_results = aggregate_results_by_email(raw_results)
+ save_aggregated_results_to_csv(aggregated_results)
+ elif mode == '2':
+ stats_results = process_statistics_mode(driver, job_url)
+ save_statistics_to_csv(stats_results)
else:
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
finally: