From 0eb7279740171fc47aeefbf587684c45ebd8f43c Mon Sep 17 00:00:00 2001 From: Floke Date: Thu, 17 Jul 2025 11:03:28 +0000 Subject: [PATCH] =?UTF-8?q?Statistikauswertung=20erg=C3=A4nzt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Minimale Anpassungen zur Verbesserung (Vorname Separat, keine zwei und hintereinander) Ergänzung Statistikmodul --- scrape_fotograf.py | 180 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 147 insertions(+), 33 deletions(-) diff --git a/scrape_fotograf.py b/scrape_fotograf.py index 9e60a0ad..d976ab4a 100644 --- a/scrape_fotograf.py +++ b/scrape_fotograf.py @@ -15,10 +15,9 @@ from selenium.common.exceptions import TimeoutException, NoSuchElementException, # --- Konfiguration & Konstanten --- CREDENTIALS_FILE = 'fotograf_credentials.json' OUTPUT_DIR = 'output' -OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'supermailer_fertige_liste.csv') LOGIN_URL = 'https://app.fotograf.de/login/login' -# --- Selektoren (FINALE, VOLLSTÄNDIGE VERSION) --- +# --- Selektoren --- SELECTORS = { "cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll", "login_user": "#login-email", @@ -32,14 +31,16 @@ SELECTORS = { "person_vorname": ".//span[text()='Vorname']/following-sibling::strong", "person_logins": ".//span[text()='Logins']/following-sibling::strong", "person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]", - # NEU: Selector zur Überprüfung, ob ein Kauf getätigt wurde - "purchase_icon": ".//img[@alt='Bestellungen mit diesem Foto']", + # Selektoren für die Statistik-Zählung + "person_all_photos": ".//div[@data-key]", + "person_purchased_photos": ".//div[@data-key and .//img[@alt='Bestellungen mit diesem Foto']]", "potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]", "quick_login_url": "//a[@id='quick-login-url']", "buyer_email": "//span[contains(., '@')]" } def take_error_screenshot(driver, error_name): + """Speichert einen Screenshot des aktuellen Browserfensters in den output-Ordner.""" os.makedirs(OUTPUT_DIR, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"error_{error_name}_{timestamp}.png" @@ -51,6 +52,7 @@ def take_error_screenshot(driver, error_name): print(f"!!! Konnte keinen Screenshot speichern: {e}") def setup_driver(): + """Initialisiert und konfiguriert den Chrome WebDriver.""" print("Initialisiere Chrome WebDriver...") options = Options() options.add_argument('--headless') @@ -66,6 +68,7 @@ def setup_driver(): return None def load_all_credentials(): + """Lädt alle Anmeldedaten aus der JSON-Datei.""" try: with open(CREDENTIALS_FILE, 'r') as f: return json.load(f) @@ -73,6 +76,7 @@ def load_all_credentials(): return None def login(driver, username, password): + """Führt den Login-Vorgang auf fotograf.de durch.""" print("Starte Login-Vorgang...") try: driver.get(LOGIN_URL) @@ -98,16 +102,17 @@ def login(driver, username, password): take_error_screenshot(driver, "login_error") return False -def process_full_job(driver, job_url): +# --- Modus 1: E-Mail-Listen-Erstellung --- +def process_reminder_mode(driver, job_url): + """Sammelt Daten für die E-Mail-Erinnerungskampagne.""" wait = WebDriverWait(driver, 15) try: job_id_match = re.search(r'/(\d+)', job_url) - if not job_id_match: - raise ValueError("Konnte keine numerische Job-ID finden.") + if not job_id_match: raise ValueError("Konnte keine numerische Job-ID finden.") job_id = job_id_match.group(1) settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}" - except (AttributeError, IndexError, ValueError) as e: + except Exception as e: print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren. Grund: {e}") return [] @@ -118,7 +123,6 @@ def process_full_job(driver, job_url): print(f"Auftragsname: '{job_name}'") except TimeoutException: print("Konnte den Auftragsnamen nicht finden.") - take_error_screenshot(driver, "job_name_not_found") return [] albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}" @@ -138,7 +142,6 @@ def process_full_job(driver, job_url): print(f"{len(albums_to_visit)} gültige Album-Links gesammelt.") except TimeoutException: print("Konnte die Album-Liste nicht finden.") - take_error_screenshot(driver, "album_overview_error") return [] final_results = [] @@ -152,8 +155,7 @@ def process_full_job(driver, job_url): for page_num in range(1, num_pages + 1): current_page_url = album['url'] - if page_num > 1: - current_page_url += f"?page_guest_accesses={page_num}" + if page_num > 1: current_page_url += f"?page_guest_accesses={page_num}" print(f" Verarbeite Seite {page_num}...") driver.get(current_page_url) @@ -170,19 +172,14 @@ def process_full_job(driver, job_url): if int(login_count_text) <= 1: vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text - # --- NEUE KAUF-PRÜFUNG --- try: - # Finde den Foto-Container, der dem Info-Block folgt photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]") - # Prüfe, ob darin ein Warenkorb-Icon existiert - purchase_icons = photo_container.find_elements(By.XPATH, SELECTORS["purchase_icon"]) + purchase_icons = photo_container.find_elements(By.XPATH, SELECTORS["person_purchased_photos"]) if len(purchase_icons) > 0: print(f" --> INFO: '{vorname}' hat bereits gekauft. Überspringe.") - continue # Springe zur nächsten Person + continue except NoSuchElementException: - # Kein Foto-Container gefunden, also auch kein Kauf pass - # --- ENDE KAUF-PRÜFUNG --- print(f" --> ERFOLG: '{vorname}' mit {login_count_text} Login(s) gefunden (und kein Kauf).") @@ -226,51 +223,157 @@ def process_full_job(driver, job_url): wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"]))) except TimeoutException: print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.") - take_error_screenshot(driver, f"album_{album['name']}_error") continue return final_results def aggregate_results_by_email(results): + """Fasst Ergebnisse pro E-Mail-Adresse zusammen.""" print("\nBeginne mit der Aggregation der Ergebnisse pro E-Mail-Adresse...") aggregated_data = {} for result in results: email = result['E-Mail-Adresse Käufer'] + child_name = "Familienbilder" if result['Name des Kindes'] == "Familie" else result['Name des Kindes'] + html_link = f'Fotos von {child_name}' if email not in aggregated_data: aggregated_data[email] = { - 'Name Käufer': result['Name Käufer'], + 'Name Käufer': result['Name Käufer'].split(' ')[0], 'E-Mail-Adresse Käufer': email, - 'Kindernamen_list': [result['Name des Kindes']], - 'LinksHTML_list': [f'Fotos von {result["Name des Kindes"]}'] + 'Kindernamen_list': [child_name], + 'LinksHTML_list': [html_link] } else: - aggregated_data[email]['Kindernamen_list'].append(result['Name des Kindes']) - aggregated_data[email]['LinksHTML_list'].append(f'Fotos von {result["Name des Kindes"]}') + aggregated_data[email]['Kindernamen_list'].append(child_name) + aggregated_data[email]['LinksHTML_list'].append(html_link) + final_list = [] for email, data in aggregated_data.items(): + names_list = data['Kindernamen_list'] + if len(names_list) > 2: + kindernamen_str = ', '.join(names_list[:-1]) + ' und ' + names_list[-1] + else: + kindernamen_str = ' und '.join(names_list) final_list.append({ 'Name Käufer': data['Name Käufer'], 'E-Mail-Adresse Käufer': email, - 'Kindernamen': ' und '.join(data['Kindernamen_list']), + 'Kindernamen': kindernamen_str, 'LinksHTML': '

'.join(data['LinksHTML_list']) }) print(f"Aggregation abgeschlossen. {len(results)} Roh-Einträge zu {len(final_list)} einzigartigen E-Mails zusammengefasst.") return final_list def save_aggregated_results_to_csv(results): + """Speichert die aggregierten Daten für Supermailer.""" if not results: print("\nKeine Daten zum Speichern vorhanden.") return + + output_file = os.path.join(OUTPUT_DIR, 'supermailer_fertige_liste.csv') os.makedirs(OUTPUT_DIR, exist_ok=True) fieldnames = ["Name Käufer", "E-Mail-Adresse Käufer", "Kindernamen", "LinksHTML"] - print(f"\nSpeichere {len(results)} aggregierte Ergebnisse in '{OUTPUT_FILE}'...") - with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f: + print(f"\nSpeichere {len(results)} aggregierte Ergebnisse in '{output_file}'...") + with open(output_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(results) print("Speichern erfolgreich!") +# --- Modus 2: Statistik-Auswertung --- +def process_statistics_mode(driver, job_url): + """Sammelt und druckt Statistiken pro Album.""" + wait = WebDriverWait(driver, 15) + + try: + job_id = re.search(r'/(\d+)', job_url).group(1) + except Exception: + print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren.") + return [] + + albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}" + print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}") + driver.get(albums_overview_url) + + albums_to_visit = [] + try: + album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"]))) + for row in album_rows: + try: + album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"]) + albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')}) + except NoSuchElementException: continue + except TimeoutException: + print("Konnte die Album-Liste nicht finden.") + return [] + + statistics = [] + print("\n--- STATISTIK-AUSWERTUNG ---") + for album in albums_to_visit: + print(f"\nAlbum: {album['name']}") + driver.get(album['url']) + try: + total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text + num_pages = math.ceil(int(total_codes_text) / 20) + + total_children_in_album = 0 + children_with_purchase = 0 + children_with_all_purchased = 0 + + for page_num in range(1, num_pages + 1): + if page_num > 1: driver.get(album['url'] + f"?page_guest_accesses={page_num}") + + person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))) + + for person_row in person_rows: + total_children_in_album += 1 + try: + photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]") + + num_total_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_all_photos"])) + num_purchased_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_purchased_photos"])) + + if num_purchased_photos > 0: + children_with_purchase += 1 + + if num_total_photos > 0 and num_total_photos == num_purchased_photos: + children_with_all_purchased += 1 + except NoSuchElementException: + continue + + print(f" - Kinder insgesamt: {total_children_in_album}") + print(f" - Kinder mit (mind. 1) Kauf: {children_with_purchase}") + print(f" - Kinder (Alle Bilder gekauft): {children_with_all_purchased}") + statistics.append({ + "Album": album['name'], + "Kinder insgesamt": total_children_in_album, + "Kinder mit Käufen": children_with_purchase, + "Kinder (Alle Bilder gekauft)": children_with_all_purchased + }) + + except Exception as e: + print(f" Fehler bei der Auswertung dieses Albums: {e}") + continue + + return statistics + +def save_statistics_to_csv(results): + """Speichert die Statistik-Daten in einer CSV-Datei.""" + if not results: + print("\nKeine Statistikdaten zum Speichern vorhanden.") + return + + output_file = os.path.join(OUTPUT_DIR, 'job_statistik.csv') + os.makedirs(OUTPUT_DIR, exist_ok=True) + fieldnames = ["Album", "Kinder insgesamt", "Kinder mit Käufen", "Kinder (Alle Bilder gekauft)"] + print(f"\nSpeichere Statistik für {len(results)} Alben in '{output_file}'...") + with open(output_file, 'w', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(results) + print("Speichern erfolgreich!") + +# --- Haupt-Logik --- def get_profile_choice(): + """Zeigt ein Menü zur Profilauswahl.""" all_credentials = load_all_credentials() if not all_credentials: return None profiles = list(all_credentials.keys()) @@ -287,11 +390,18 @@ def get_profile_choice(): except ValueError: print("Ungültige Eingabe.") def main(): - print("--- Fotograf.de Scraper (mit Datenaggregation) ---") + """Hauptfunktion des Skripts.""" + print("--- Fotograf.de Scraper (v3.1 - The Analyst) ---") + + while True: + mode = input("Bitte Modus wählen:\n 1) E-Mail-Liste erstellen\n 2) Statistik auswerten\nWahl: ") + if mode in ['1', '2']: break + else: print("Ungültige Eingabe.") + credentials = get_profile_choice() if not credentials: return - job_url_raw = input("Bitte gib eine beliebige URL des zu bearbeitenden Fotoauftrags ein: ") + job_url_raw = input("Bitte eine beliebige URL des zu bearbeitenden Fotoauftrags ein: ") match = re.search(r'(https?://[^\s]+)', job_url_raw) if not match: @@ -308,9 +418,13 @@ def main(): try: if login(driver, credentials['username'], credentials['password']): - raw_results = process_full_job(driver, job_url) - aggregated_results = aggregate_results_by_email(raw_results) - save_aggregated_results_to_csv(aggregated_results) + if mode == '1': + raw_results = process_reminder_mode(driver, job_url) + aggregated_results = aggregate_results_by_email(raw_results) + save_aggregated_results_to_csv(aggregated_results) + elif mode == '2': + stats_results = process_statistics_mode(driver, job_url) + save_statistics_to_csv(stats_results) else: print("Skript wird beendet, da der Login fehlgeschlagen ist.") finally: