From 9b5c6683cef4b60b9fe37b7e966be378c9ee52f9 Mon Sep 17 00:00:00 2001 From: Floke Date: Thu, 17 Jul 2025 09:16:10 +0000 Subject: [PATCH] scrape_fotograf.py aktualisiert --- scrape_fotograf.py | 44 ++++---------------------------------------- 1 file changed, 4 insertions(+), 40 deletions(-) diff --git a/scrape_fotograf.py b/scrape_fotograf.py index a33c7bdd..d8358d89 100644 --- a/scrape_fotograf.py +++ b/scrape_fotograf.py @@ -3,7 +3,7 @@ import os import time import csv import math -import re # Modul für reguläre Ausdrücke +import re from datetime import datetime from selenium import webdriver from selenium.webdriver.chrome.options import Options @@ -15,10 +15,10 @@ from selenium.common.exceptions import TimeoutException, NoSuchElementException, # --- Konfiguration & Konstanten --- CREDENTIALS_FILE = 'fotograf_credentials.json' OUTPUT_DIR = 'output' -OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_mit_wenig_logins.csv') +OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'supermailer_fertige_liste.csv') LOGIN_URL = 'https://app.fotograf.de/login/login' -# --- Selektoren (unverändert) --- +# --- Selektoren --- SELECTORS = { "cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll", "login_user": "#login-email", @@ -37,7 +37,6 @@ SELECTORS = { "buyer_email": "//span[contains(., '@')]" } -# --- Hilfsfunktionen (unverändert) --- def take_error_screenshot(driver, error_name): os.makedirs(OUTPUT_DIR, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") @@ -97,7 +96,6 @@ def login(driver, username, password): take_error_screenshot(driver, "login_error") return False -# --- Hauptlogik (unverändert) --- def process_full_job(driver, job_url): wait = WebDriverWait(driver, 15) @@ -106,8 +104,6 @@ def process_full_job(driver, job_url): if not job_id_match: raise ValueError("Konnte keine numerische Job-ID finden.") job_id = job_id_match.group(1) - - # Wir konstruieren die Einstellungs-URL, um den Job-Namen zu holen settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}" except (AttributeError, IndexError, ValueError) as e: print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren. Grund: {e}") @@ -219,18 +215,11 @@ def process_full_job(driver, job_url): return final_results def aggregate_results_by_email(results): - """ - Nimmt die Liste der rohen Ergebnisse und fasst sie pro E-Mail-Adresse zusammen. - Erstellt kombinierte Felder für Kindernamen und HTML-Links. - """ print("\nBeginne mit der Aggregation der Ergebnisse pro E-Mail-Adresse...") aggregated_data = {} - for result in results: email = result['E-Mail-Adresse Käufer'] - if email not in aggregated_data: - # Erster Eintrag für diese E-Mail aggregated_data[email] = { 'Name Käufer': result['Name Käufer'], 'E-Mail-Adresse Käufer': email, @@ -238,11 +227,8 @@ def aggregate_results_by_email(results): 'LinksHTML_list': [f'Fotos von {result["Name des Kindes"]}'] } else: - # Weiteres Kind für eine bereits bekannte E-Mail hinzufügen aggregated_data[email]['Kindernamen_list'].append(result['Name des Kindes']) aggregated_data[email]['LinksHTML_list'].append(f'Fotos von {result["Name des Kindes"]}') - - # Umwandlung des Dictionaries in die finale Listenform für die CSV final_list = [] for email, data in aggregated_data.items(): final_list.append({ @@ -251,17 +237,14 @@ def aggregate_results_by_email(results): 'Kindernamen': ' und '.join(data['Kindernamen_list']), 'LinksHTML': '

'.join(data['LinksHTML_list']) }) - print(f"Aggregation abgeschlossen. {len(results)} Roh-Einträge zu {len(final_list)} einzigartigen E-Mails zusammengefasst.") return final_list def save_aggregated_results_to_csv(results): - """Speichert die aggregierten Daten in einer CSV-Datei.""" if not results: print("\nKeine Daten zum Speichern vorhanden.") return os.makedirs(OUTPUT_DIR, exist_ok=True) - # Neue Spaltennamen für die aggregierte Datei fieldnames = ["Name Käufer", "E-Mail-Adresse Käufer", "Kindernamen", "LinksHTML"] print(f"\nSpeichere {len(results)} aggregierte Ergebnisse in '{OUTPUT_FILE}'...") with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f: @@ -270,19 +253,6 @@ def save_aggregated_results_to_csv(results): writer.writerows(results) print("Speichern erfolgreich!") -def save_results_to_csv(results): - if not results: - print("\nKeine Daten zum Speichern vorhanden.") - return - os.makedirs(OUTPUT_DIR, exist_ok=True) - fieldnames = ["Name des Kindes", "Name Käufer", "E-Mail-Adresse Käufer", "Schnell Login URL"] - print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...") - with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f: - writer = csv.DictWriter(f, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(results) - print("Speichern erfolgreich!") - def get_profile_choice(): all_credentials = load_all_credentials() if not all_credentials: return None @@ -299,7 +269,6 @@ def get_profile_choice(): else: print("Ungültige Auswahl.") except ValueError: print("Ungültige Eingabe.") -# --- Finale, korrigierte main-Funktion --- def main(): print("--- Fotograf.de Scraper (mit Datenaggregation) ---") credentials = get_profile_choice() @@ -322,11 +291,8 @@ def main(): try: if login(driver, credentials['username'], credentials['password']): - # Schritt 1: Rohe Daten sammeln raw_results = process_full_job(driver, job_url) - # Schritt 2: Daten aggregieren aggregated_results = aggregate_results_by_email(raw_results) - # Schritt 3: Aggregierte Daten speichern save_aggregated_results_to_csv(aggregated_results) else: print("Skript wird beendet, da der Login fehlgeschlagen ist.") @@ -335,6 +301,4 @@ def main(): if driver: driver.quit() if __name__ == "__main__": - # Hier werden alle Funktionen (inklusive der eingeklappten) benötigt - # Fügen Sie hier den vollständigen Code der Hilfsfunktionen ein. - pass \ No newline at end of file + main() \ No newline at end of file