Statistikauswertung ergänzt

Minimale Anpassungen zur Verbesserung (Vorname Separat, keine zwei und hintereinander)
Ergänzung Statistikmodul
This commit is contained in:
2025-07-17 11:03:28 +00:00
parent 18ffe86eda
commit 0eb7279740

View File

@@ -15,10 +15,9 @@ from selenium.common.exceptions import TimeoutException, NoSuchElementException,
# --- Konfiguration & Konstanten ---
CREDENTIALS_FILE = 'fotograf_credentials.json'
OUTPUT_DIR = 'output'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'supermailer_fertige_liste.csv')
LOGIN_URL = 'https://app.fotograf.de/login/login'
# --- Selektoren (FINALE, VOLLSTÄNDIGE VERSION) ---
# --- Selektoren ---
SELECTORS = {
"cookie_accept_button": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"login_user": "#login-email",
@@ -32,14 +31,16 @@ SELECTORS = {
"person_vorname": ".//span[text()='Vorname']/following-sibling::strong",
"person_logins": ".//span[text()='Logins']/following-sibling::strong",
"person_access_code_link": ".//a[contains(@data-qa-id, 'guest-access-banner-access-code')]",
# NEU: Selector zur Überprüfung, ob ein Kauf getätigt wurde
"purchase_icon": ".//img[@alt='Bestellungen mit diesem Foto']",
# Selektoren für die Statistik-Zählung
"person_all_photos": ".//div[@data-key]",
"person_purchased_photos": ".//div[@data-key and .//img[@alt='Bestellungen mit diesem Foto']]",
"potential_buyer_link": "//a[contains(@href, '/config_customers/view_customer')]",
"quick_login_url": "//a[@id='quick-login-url']",
"buyer_email": "//span[contains(., '@')]"
}
def take_error_screenshot(driver, error_name):
"""Speichert einen Screenshot des aktuellen Browserfensters in den output-Ordner."""
os.makedirs(OUTPUT_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"error_{error_name}_{timestamp}.png"
@@ -51,6 +52,7 @@ def take_error_screenshot(driver, error_name):
print(f"!!! Konnte keinen Screenshot speichern: {e}")
def setup_driver():
"""Initialisiert und konfiguriert den Chrome WebDriver."""
print("Initialisiere Chrome WebDriver...")
options = Options()
options.add_argument('--headless')
@@ -66,6 +68,7 @@ def setup_driver():
return None
def load_all_credentials():
"""Lädt alle Anmeldedaten aus der JSON-Datei."""
try:
with open(CREDENTIALS_FILE, 'r') as f:
return json.load(f)
@@ -73,6 +76,7 @@ def load_all_credentials():
return None
def login(driver, username, password):
"""Führt den Login-Vorgang auf fotograf.de durch."""
print("Starte Login-Vorgang...")
try:
driver.get(LOGIN_URL)
@@ -98,16 +102,17 @@ def login(driver, username, password):
take_error_screenshot(driver, "login_error")
return False
def process_full_job(driver, job_url):
# --- Modus 1: E-Mail-Listen-Erstellung ---
def process_reminder_mode(driver, job_url):
"""Sammelt Daten für die E-Mail-Erinnerungskampagne."""
wait = WebDriverWait(driver, 15)
try:
job_id_match = re.search(r'/(\d+)', job_url)
if not job_id_match:
raise ValueError("Konnte keine numerische Job-ID finden.")
if not job_id_match: raise ValueError("Konnte keine numerische Job-ID finden.")
job_id = job_id_match.group(1)
settings_url = f"https://app.fotograf.de/config_jobs_settings/index/{job_id}"
except (AttributeError, IndexError, ValueError) as e:
except Exception as e:
print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren. Grund: {e}")
return []
@@ -118,7 +123,6 @@ def process_full_job(driver, job_url):
print(f"Auftragsname: '{job_name}'")
except TimeoutException:
print("Konnte den Auftragsnamen nicht finden.")
take_error_screenshot(driver, "job_name_not_found")
return []
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
@@ -138,7 +142,6 @@ def process_full_job(driver, job_url):
print(f"{len(albums_to_visit)} gültige Album-Links gesammelt.")
except TimeoutException:
print("Konnte die Album-Liste nicht finden.")
take_error_screenshot(driver, "album_overview_error")
return []
final_results = []
@@ -152,8 +155,7 @@ def process_full_job(driver, job_url):
for page_num in range(1, num_pages + 1):
current_page_url = album['url']
if page_num > 1:
current_page_url += f"?page_guest_accesses={page_num}"
if page_num > 1: current_page_url += f"?page_guest_accesses={page_num}"
print(f" Verarbeite Seite {page_num}...")
driver.get(current_page_url)
@@ -170,19 +172,14 @@ def process_full_job(driver, job_url):
if int(login_count_text) <= 1:
vorname = person_row.find_element(By.XPATH, SELECTORS["person_vorname"]).text
# --- NEUE KAUF-PRÜFUNG ---
try:
# Finde den Foto-Container, der dem Info-Block folgt
photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]")
# Prüfe, ob darin ein Warenkorb-Icon existiert
purchase_icons = photo_container.find_elements(By.XPATH, SELECTORS["purchase_icon"])
purchase_icons = photo_container.find_elements(By.XPATH, SELECTORS["person_purchased_photos"])
if len(purchase_icons) > 0:
print(f" --> INFO: '{vorname}' hat bereits gekauft. Überspringe.")
continue # Springe zur nächsten Person
continue
except NoSuchElementException:
# Kein Foto-Container gefunden, also auch kein Kauf
pass
# --- ENDE KAUF-PRÜFUNG ---
print(f" --> ERFOLG: '{vorname}' mit {login_count_text} Login(s) gefunden (und kein Kauf).")
@@ -226,51 +223,157 @@ def process_full_job(driver, job_url):
wait.until(EC.presence_of_element_located((By.XPATH, SELECTORS["person_rows"])))
except TimeoutException:
print(f" Keine Personen-Daten im Album '{album['name']}' gefunden. Überspringe.")
take_error_screenshot(driver, f"album_{album['name']}_error")
continue
return final_results
def aggregate_results_by_email(results):
"""Fasst Ergebnisse pro E-Mail-Adresse zusammen."""
print("\nBeginne mit der Aggregation der Ergebnisse pro E-Mail-Adresse...")
aggregated_data = {}
for result in results:
email = result['E-Mail-Adresse Käufer']
child_name = "Familienbilder" if result['Name des Kindes'] == "Familie" else result['Name des Kindes']
html_link = f'<a href="{result["Schnell Login URL"]}">Fotos von {child_name}</a>'
if email not in aggregated_data:
aggregated_data[email] = {
'Name Käufer': result['Name Käufer'],
'Name Käufer': result['Name Käufer'].split(' ')[0],
'E-Mail-Adresse Käufer': email,
'Kindernamen_list': [result['Name des Kindes']],
'LinksHTML_list': [f'<a href="{result["Schnell Login URL"]}">Fotos von {result["Name des Kindes"]}</a>']
'Kindernamen_list': [child_name],
'LinksHTML_list': [html_link]
}
else:
aggregated_data[email]['Kindernamen_list'].append(result['Name des Kindes'])
aggregated_data[email]['LinksHTML_list'].append(f'<a href="{result["Schnell Login URL"]}">Fotos von {result["Name des Kindes"]}</a>')
aggregated_data[email]['Kindernamen_list'].append(child_name)
aggregated_data[email]['LinksHTML_list'].append(html_link)
final_list = []
for email, data in aggregated_data.items():
names_list = data['Kindernamen_list']
if len(names_list) > 2:
kindernamen_str = ', '.join(names_list[:-1]) + ' und ' + names_list[-1]
else:
kindernamen_str = ' und '.join(names_list)
final_list.append({
'Name Käufer': data['Name Käufer'],
'E-Mail-Adresse Käufer': email,
'Kindernamen': ' und '.join(data['Kindernamen_list']),
'Kindernamen': kindernamen_str,
'LinksHTML': '<br><br>'.join(data['LinksHTML_list'])
})
print(f"Aggregation abgeschlossen. {len(results)} Roh-Einträge zu {len(final_list)} einzigartigen E-Mails zusammengefasst.")
return final_list
def save_aggregated_results_to_csv(results):
"""Speichert die aggregierten Daten für Supermailer."""
if not results:
print("\nKeine Daten zum Speichern vorhanden.")
return
output_file = os.path.join(OUTPUT_DIR, 'supermailer_fertige_liste.csv')
os.makedirs(OUTPUT_DIR, exist_ok=True)
fieldnames = ["Name Käufer", "E-Mail-Adresse Käufer", "Kindernamen", "LinksHTML"]
print(f"\nSpeichere {len(results)} aggregierte Ergebnisse in '{OUTPUT_FILE}'...")
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
print(f"\nSpeichere {len(results)} aggregierte Ergebnisse in '{output_file}'...")
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
# --- Modus 2: Statistik-Auswertung ---
def process_statistics_mode(driver, job_url):
"""Sammelt und druckt Statistiken pro Album."""
wait = WebDriverWait(driver, 15)
try:
job_id = re.search(r'/(\d+)', job_url).group(1)
except Exception:
print(f"!!! FEHLER: Konnte keine Job-ID aus der URL '{job_url}' extrahieren.")
return []
albums_overview_url = f"https://app.fotograf.de/config_jobs_photos/index/{job_id}"
print(f"Navigiere zur Alben-Übersicht: {albums_overview_url}")
driver.get(albums_overview_url)
albums_to_visit = []
try:
album_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["album_overview_rows"])))
for row in album_rows:
try:
album_link = row.find_element(By.XPATH, SELECTORS["album_overview_link"])
albums_to_visit.append({"name": album_link.text, "url": album_link.get_attribute('href')})
except NoSuchElementException: continue
except TimeoutException:
print("Konnte die Album-Liste nicht finden.")
return []
statistics = []
print("\n--- STATISTIK-AUSWERTUNG ---")
for album in albums_to_visit:
print(f"\nAlbum: {album['name']}")
driver.get(album['url'])
try:
total_codes_text = wait.until(EC.visibility_of_element_located((By.XPATH, SELECTORS["access_code_count"]))).text
num_pages = math.ceil(int(total_codes_text) / 20)
total_children_in_album = 0
children_with_purchase = 0
children_with_all_purchased = 0
for page_num in range(1, num_pages + 1):
if page_num > 1: driver.get(album['url'] + f"?page_guest_accesses={page_num}")
person_rows = wait.until(EC.presence_of_all_elements_located((By.XPATH, SELECTORS["person_rows"])))
for person_row in person_rows:
total_children_in_album += 1
try:
photo_container = person_row.find_element(By.XPATH, "./following-sibling::div[1]")
num_total_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_all_photos"]))
num_purchased_photos = len(photo_container.find_elements(By.XPATH, SELECTORS["person_purchased_photos"]))
if num_purchased_photos > 0:
children_with_purchase += 1
if num_total_photos > 0 and num_total_photos == num_purchased_photos:
children_with_all_purchased += 1
except NoSuchElementException:
continue
print(f" - Kinder insgesamt: {total_children_in_album}")
print(f" - Kinder mit (mind. 1) Kauf: {children_with_purchase}")
print(f" - Kinder (Alle Bilder gekauft): {children_with_all_purchased}")
statistics.append({
"Album": album['name'],
"Kinder insgesamt": total_children_in_album,
"Kinder mit Käufen": children_with_purchase,
"Kinder (Alle Bilder gekauft)": children_with_all_purchased
})
except Exception as e:
print(f" Fehler bei der Auswertung dieses Albums: {e}")
continue
return statistics
def save_statistics_to_csv(results):
"""Speichert die Statistik-Daten in einer CSV-Datei."""
if not results:
print("\nKeine Statistikdaten zum Speichern vorhanden.")
return
output_file = os.path.join(OUTPUT_DIR, 'job_statistik.csv')
os.makedirs(OUTPUT_DIR, exist_ok=True)
fieldnames = ["Album", "Kinder insgesamt", "Kinder mit Käufen", "Kinder (Alle Bilder gekauft)"]
print(f"\nSpeichere Statistik für {len(results)} Alben in '{output_file}'...")
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print("Speichern erfolgreich!")
# --- Haupt-Logik ---
def get_profile_choice():
"""Zeigt ein Menü zur Profilauswahl."""
all_credentials = load_all_credentials()
if not all_credentials: return None
profiles = list(all_credentials.keys())
@@ -287,11 +390,18 @@ def get_profile_choice():
except ValueError: print("Ungültige Eingabe.")
def main():
print("--- Fotograf.de Scraper (mit Datenaggregation) ---")
"""Hauptfunktion des Skripts."""
print("--- Fotograf.de Scraper (v3.1 - The Analyst) ---")
while True:
mode = input("Bitte Modus wählen:\n 1) E-Mail-Liste erstellen\n 2) Statistik auswerten\nWahl: ")
if mode in ['1', '2']: break
else: print("Ungültige Eingabe.")
credentials = get_profile_choice()
if not credentials: return
job_url_raw = input("Bitte gib eine beliebige URL des zu bearbeitenden Fotoauftrags ein: ")
job_url_raw = input("Bitte eine beliebige URL des zu bearbeitenden Fotoauftrags ein: ")
match = re.search(r'(https?://[^\s]+)', job_url_raw)
if not match:
@@ -308,9 +418,13 @@ def main():
try:
if login(driver, credentials['username'], credentials['password']):
raw_results = process_full_job(driver, job_url)
aggregated_results = aggregate_results_by_email(raw_results)
save_aggregated_results_to_csv(aggregated_results)
if mode == '1':
raw_results = process_reminder_mode(driver, job_url)
aggregated_results = aggregate_results_by_email(raw_results)
save_aggregated_results_to_csv(aggregated_results)
elif mode == '2':
stats_results = process_statistics_mode(driver, job_url)
save_statistics_to_csv(stats_results)
else:
print("Skript wird beendet, da der Login fehlgeschlagen ist.")
finally: