Files
Brancheneinstufung2/scrape_fotograf.py
Floke d2d775f41c Anpassung für mehrfach vorkommende E-Mails
ERweiterung die das Mehrfache vorkommen von E-Mails löst.
2025-07-17 09:10:24 +00:00

340 lines
16 KiB
Python

import json
import os
import time
import csv
import math
import re # Modul für reguläre Ausdrücke
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException, InvalidArgumentException
# --- Konfiguration & Konstanten ---
CREDENTIALS_FILE = 'fotograf_credentials.json'
OUTPUT_DIR = 'output'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'nutzer_mit_wenig_logins.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login'
# --- Selektoren (unverändert) ---
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
"login_pass": "#login-password",
"login_button": "#login-submit",
"job_name": "h1",
"album_overview_rows": "//table/tbody/tr",
"album_overview_link": ".//td[2]//a",
"access_code_count": "//span[text()='Zugangscodes']/following-sibling::strong",
"person_rows": "//div[contains(@class, 'border-legacy-silver-550') and .//span[text()='Logins']]",
"person_vorname": ".//span[text()='Vorname']/following-sibling::strong",
"person_logins": ".//span[text()='Logins']/following-sibling::strong",
"person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]",
"potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]",
"quick_login_url": "//a[@id='quick-login-url']",
"buyer_email": "//span[contains(., '@')]"
}
# --- Hilfsfunktionen (unverändert) ---
def take_error_screenshot(driver, error_name):
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
filepath = os.path.join(OUTPUT_DIR, filename)
try:
driver.save_screenshot(filepath)
print(f"!!! Fehler aufgetreten. Screenshot gespeichert unter: {filepath}")
except Exception as e:
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver():
print("Initialisiere Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1920,1200')
options.binary_location = '/usr/bin/google-chrome'
try:
driver = webdriver.Chrome(options=options)
return driver
except Exception as e:
print(f"Fehler bei der Initialisierung des WebDrivers: {e}")
return None
def load_all_credentials():
try:
with open(CREDENTIALS_FILE, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def login(driver, username, password):
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
wait = WebDriverWait(driver, 10)
try:
print("Suche nach Cookie-Banner...")
wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, SELECTORS["cookie_accept_button"]))).click()
print("Cookie-Banner akzeptiert.")
time.sleep(1)
except TimeoutException:
print("Kein Cookie-Banner gefunden, fahre fort.")
print("Fülle Anmeldeformular aus...")
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["login_user"]))).send_keys(username)
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_pass"]).send_keys(password)
print("Klicke auf Login...")
driver.find_element(By.CSS_SELECTOR, SELECTORS["login_button"]).click()
print("Warte auf die nächste Seite...")
wait.until(EC.url_contains('/config_dashboard/index'))
print("Login erfolgreich!")
return True
except Exception as e:
print(f"Login fehlgeschlagen. Grund: {e}")
take_error_screenshot(driver, "login_error")
return False
# --- Hauptlogik (unverändert) ---
def process_full_job(driver, job_url):
wait = WebDriverWait(driver, 15)
try:
job_id_match = re.search(r'/(\d+)', job_url)
if not job_id_match:
raise ValueError("Konnte keine numerische Job-ID finden.")
job_id = job_id_match.group(1)
# Wir konstruieren die Einstellungs-URL, um den Job-Namen zu holen
settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
except (AttributeError, IndexError, ValueError) as e:
print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren. Grund: {e}")
return []
print(f"\nVerarbeite Job-ID: {job_id}")
driver.get(settings_url)
try:
job_name = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, SELECTORS["job_name"]))).text
print(f"Auftragsname: '{job_name}'")
except TimeoutException:
print("Konnte den Auftragsnamen nicht finden.")
take_error_screenshot(driver, "job_name_not_found")
return []
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}")
driver.get(albums_overview_url)
albums_to_visit = []
try:
album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"])))
print(f"{len(album_rows)} Alben in der Übersicht gefunden.")
for row in album_rows:
try:
album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"])
albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')})
except NoSuchElementException:
continue
print(f"{len(albums_to_visit)} gültige Album-Links gesammelt.")
except TimeoutException:
print("Konnte die Album-Liste nicht finden.")
take_error_screenshot(driver, "album_overview_error")
return []
final_results = []
for album in albums_to_visit:
print(f"\n--- Betrete Album: {album['name']} ---")
driver.get(album['url'])
try:
total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text
num_pages = math.ceil(int(total_codes_text) / 20)
print(f"Album hat {total_codes_text} Zugangscodes auf {num_pages} Seite(n).")
for page_num in range(1, num_pages + 1):
current_page_url = album['url']
if page_num > 1:
current_page_url += f"?page_guest_accesses={page_num}"
print(f" Verarbeite Seite {page_num}...")
driver.get(current_page_url)
num_persons = len(wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"]))))
print(f" {num_persons} Personen auf dieser Seite gefunden.")
for i in range(num_persons):
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
person_row = person_rows[i]
login_count_text = person_row.find_element(By.XPATH, SELECTORS["person_logins"]).text
if int(login_count_text) <= 1:
vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text
print(f" --> ERFOLG: '{vorname}' mit {login_count_text} Login(s) gefunden!")
access_code_page_url = person_row.find_element(By.XPATH, SELECTORS["person_access_code_link"]).get_attribute('href')
driver.get(access_code_page_url)
print(f" Navigiere zur Kommunikations-Seite für '{vorname}'...")
for attempt in range(3):
try:
wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["quick_login_url"])))
schnell_login_url = driver.find_element(By.XPATH, SELECTORS["quick_login_url"]).get_attribute('href')
potential_buyer_element = driver.find_element(By.XPATH, SELECTORS["potential_buyer_link"])
kaeufer_name = potential_buyer_element.text
print(f" Käufer: '{kaeufer_name}', Schnell-Login: GEFUNDEN")
potential_buyer_element.click()
print(f" Navigiere zur Käufer-Detailseite...")
email = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["buyer_email"]))).text
print(f" FINALE ERFOLG: E-Mail gefunden: {email}")
final_results.append({
"Name des Kindes": vorname,
"Name Käufer": kaeufer_name,
"E-Mail-Adresse Käufer": email,
"Schnell Login URL": schnell_login_url
})
break
except StaleElementReferenceException:
print(f" Timing-Fehler, Versuch {attempt + 1}/3...")
time.sleep(1)
if attempt == 2: raise
except TimeoutException:
print(f" Timeout beim Warten auf Details für '{vorname}'.")
take_error_screenshot(driver, f"timeout_error_{vorname}")
break
print(f" Kehre zurück zur Album-Seite {page_num}...")
driver.get(current_page_url)
wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"])))
except TimeoutException:
print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.")
take_error_screenshot(driver, f"album_{album['name']}_error")
continue
return final_results
def aggregate_results_by_email(results):
"""
Nimmt die Liste der rohen Ergebnisse und fasst sie pro E-Mail-Adresse zusammen.
Erstellt kombinierte Felder für Kindernamen und HTML-Links.
"""
print("\nBeginne mit der Aggregation der Ergebnisse pro E-Mail-Adresse...")
aggregated_data = {}
for result in results:
email = result['E-Mail-Adresse Käufer']
if email not in aggregated_data:
# Erster Eintrag für diese E-Mail
aggregated_data[email] = {
'Name Käufer': result['Name Käufer'],
'E-Mail-Adresse Käufer': email,
'Kindernamen_list': [result['Name des Kindes']],
'LinksHTML_list': [f'<a href="{result["Schnell Login URL"]}">Fotos von {result["Name des Kindes"]}</a>']
}
else:
# Weiteres Kind für eine bereits bekannte E-Mail hinzufügen
aggregated_data[email]['Kindernamen_list'].append(result['Name des Kindes'])
aggregated_data[email]['LinksHTML_list'].append(f'<a href="{result["Schnell Login URL"]}">Fotos von {result["Name des Kindes"]}</a>')
# Umwandlung des Dictionaries in die finale Listenform für die CSV
final_list = []
for email, data in aggregated_data.items():
final_list.append({
'Name Käufer': data['Name Käufer'],
'E-Mail-Adresse Käufer': email,
'Kindernamen': ' und '.join(data['Kindernamen_list']),
'LinksHTML': '<br><br>'.join(data['LinksHTML_list'])
})
print(f"Aggregation abgeschlossen. {len(results)} Roh-Einträge zu {len(final_list)} einzigartigen E-Mails zusammengefasst.")
return final_list
def save_aggregated_results_to_csv(results):
"""Speichert die aggregierten Daten in einer CSV-Datei."""
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Neue Spaltennamen für die aggregierte Datei
fieldnames = ["Name Käufer", "E-Mail-Adresse Käufer", "Kindernamen", "LinksHTML"]
print(f"\nSpeichere {len(results)} aggregierte Ergebnisse in '{OUTPUT_FILE}'...")
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
def save_results_to_csv(results):
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
os.makedirs(OUTPUT_DIR, exist_ok=True)
fieldnames = ["Name des Kindes", "Name Käufer", "E-Mail-Adresse Käufer", "Schnell Login URL"]
print(f"\nSpeichere {len(results)} Ergebnisse in '{OUTPUT_FILE}'...")
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
def get_profile_choice():
all_credentials = load_all_credentials()
if not all_credentials: return None
profiles = list(all_credentials.keys())
print("\nBitte wähle das zu verwendende Profil:")
for i, p in enumerate(profiles): print(f" {i + 1}) {p}")
while True:
try:
c = int(input(f"Gib eine Zahl zwischen 1 und {len(profiles)} ein: "))
if 1 <= c <= len(profiles):
p_name = profiles[c - 1]
print(f"Profil '{p_name}' ausgewählt.")
return all_credentials[p_name]
else: print("Ungültige Auswahl.")
except ValueError: print("Ungültige Eingabe.")
# --- Finale, korrigierte main-Funktion ---
def main():
print("--- Fotograf.de Scraper (mit Datenaggregation) ---")
credentials = get_profile_choice()
if not credentials: return
job_url_raw = input("Bitte gib eine beliebige URL des zu bearbeitenden Fotoauftrags ein: ")
match = re.search(r'(https?://[^\s]+)', job_url_raw)
if not match:
print("Keine gültige URL in der Eingabe gefunden.")
return
job_url = match.group(1).strip()
if "fotograf.de/config_jobs_" not in job_url or not re.search(r'/\d+', job_url):
print("Dies scheint keine gültige URL für einen Fotoauftrag zu sein.")
return
driver = setup_driver()
if not driver: return
try:
if login(driver, credentials['username'], credentials['password']):
# Schritt 1: Rohe Daten sammeln
raw_results = process_full_job(driver, job_url)
# Schritt 2: Daten aggregieren
aggregated_results = aggregate_results_by_email(raw_results)
# Schritt 3: Aggregierte Daten speichern
save_aggregated_results_to_csv(aggregated_results)
else:
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
finally:
print("\nSkript beendet. Schließe WebDriver.")
if driver: driver.quit()
if __name__ == "__main__":
# Hier werden alle Funktionen (inklusive der eingeklappten) benötigt
# Fügen Sie hier den vollständigen Code der Hilfsfunktionen ein.
pass