From 1ca96f09b9e37b0ffbc0631e78b9e756c9861944 Mon Sep 17 00:00:00 2001 From: Floke Date: Thu, 17 Apr 2025 09:12:23 +0000 Subject: [PATCH] bugfix --- brancheneinstufung.py | 325 ++++++++++++++++++++++++++++++------------ 1 file changed, 236 insertions(+), 89 deletions(-) diff --git a/brancheneinstufung.py b/brancheneinstufung.py index e0da1eb0..4e4d8e63 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -1565,6 +1565,113 @@ def get_website_raw(url, max_length=1000, verify_cert=False): debug_print(f"Allgemeiner Fehler beim Scraping von {url}: {e}") return "k.A." +# Diese Funktion bleibt notwendig für den gebündelten OpenAI Call +@retry_on_failure +def summarize_batch_openai(tasks_data): + """ + Fasst eine Liste von Rohtexten in einem einzigen OpenAI API Call zusammen. + + Args: + tasks_data (list): Eine Liste von Dictionaries, jedes enthält: + {'row_num': int, 'raw_text': str} + + Returns: + dict: Ein Dictionary, das Zeilennummern auf ihre Zusammenfassungen mappt. + z.B. {2122: "Zusammenfassung A", 2123: "Zusammenfassung B"} + Bei Fehlern oder fehlenden Zusammenfassungen wird "k.A." verwendet. + """ + if not tasks_data: + return {} + + valid_tasks = [t for t in tasks_data if t.get("raw_text") and t["raw_text"] != "k.A." and t["raw_text"].strip()] + if not valid_tasks: + debug_print("Keine gültigen Rohtexte für Batch-Zusammenfassung gefunden.") + # Gib ein leeres Dict zurück, damit die aufrufende Funktion weiß, dass nichts zu tun war + # oder ein Dict mit k.A. für alle ursprünglichen Tasks? Besser letzteres. + return {t['row_num']: "k.A. (Kein gültiger Rohtext)" for t in tasks_data} + + + debug_print(f"Starte Batch-Zusammenfassung für {len(valid_tasks)} gültige Texte...") + + prompt_parts = [ + "Du bist ein KI-Assistent, der mehrere Website-Texte zusammenfasst.", + "Fasse jeden der folgenden Texte prägnant zusammen (max. 80-100 Wörter pro Text).", + "Konzentriere dich auf Haupttätigkeitsfeld, Produkte/Dienstleistungen und Zielgruppe.", + "Antworte ausschließlich mit den Ergebnissen, jede Zusammenfassung auf einer neuen Zeile, im folgenden Format:", + "RESULTAT : ", + "\n--- Texte zur Zusammenfassung ---" + ] + + text_block = "" + row_numbers_in_batch = [] + total_chars = 0 + # Reduziere das Limit, da die Antwort auch Platz braucht + max_chars_per_batch = 6000 # Vorsichtiger für Prompt + Antwort Tokens + + for task in valid_tasks: + row_num = task['row_num'] + raw_text = task['raw_text'] + # Kürzen nicht mehr nötig, da in get_website_raw schon passiert + # max_raw_length_per_item = 1000 + # if len(raw_text) > max_raw_length_per_item: + # raw_text = raw_text[:max_raw_length_per_item] + + entry_text = f"\n--- TEXT Zeile {row_num} ---\n{raw_text}\n--- ENDE TEXT Zeile {row_num} ---\n" + if total_chars + len(entry_text) > max_chars_per_batch: + debug_print(f"WARNUNG: Batch-Zeichenlimit ({max_chars_per_batch}) erreicht bei Zeile {row_num}. Dieser Text wird nicht in den Batch aufgenommen.") + continue + + text_block += entry_text + total_chars += len(entry_text) + row_numbers_in_batch.append(row_num) + + if not row_numbers_in_batch: + debug_print("Keine Texte im Batch nach Längenprüfung für OpenAI.") + # Gib k.A. für alle ursprünglichen Tasks zurück, für die wir einen Text hatten + return {t['row_num']: "k.A. (Batch-Limit erreicht)" for t in valid_tasks} + + prompt_parts.append(text_block) + prompt_parts.append("--- Ende der Texte ---") + prompt_parts.append("Bitte gib NUR die 'RESULTAT : ...' Zeilen zurück.") + final_prompt = "\n".join(prompt_parts) + + # OpenAI API Call + chat_response = call_openai_chat(final_prompt, temperature=0.2) + + # Antwort parsen + summaries = {row_num: "k.A. (Keine Antwort geparst)" for row_num in row_numbers_in_batch} + if chat_response: + lines = chat_response.strip().split('\n') + parsed_count = 0 + for line in lines: + match = re.match(r"RESULTAT (\d+): (.*)", line.strip()) + if match: + row_num = int(match.group(1)) + summary_text = match.group(2).strip() + if row_num in summaries: + summaries[row_num] = summary_text + parsed_count += 1 + + debug_print(f"Batch-Zusammenfassung: {parsed_count} von {len(row_numbers_in_batch)} Zusammenfassungen erfolgreich geparst.") + if parsed_count < len(row_numbers_in_batch): + debug_print(f"WARNUNG: Nicht alle Zusammenfassungen konnten geparst werden.") + debug_print(f"Komplette ChatGPT Antwort (Batch Summary):\n{chat_response}") + else: + debug_print("Fehler: Keine gültige Antwort von OpenAI für Batch-Zusammenfassung erhalten.") + + # Füge k.A. für Tasks hinzu, die ursprünglich gültigen Text hatten, aber evtl. wegen Limit nicht im Batch waren + for task in valid_tasks: + if task['row_num'] not in summaries: + summaries[task['row_num']] = "k.A. (Nicht im OpenAI-Batch enthalten)" + + # Füge k.A. für Tasks hinzu, die ungültigen Rohtext hatten + for task in tasks_data: + if task['row_num'] not in summaries: + summaries[task['row_num']] = "k.A. (Ungültiger Rohtext)" + + + return summaries + @retry_on_failure def scrape_website_details(url): """Extrahiert Title, Description, H1-H3 von einer Website.""" @@ -2211,21 +2318,29 @@ def _process_batch(sheet, batches, row_numbers): # Komplette Funktion process_website_batch (MIT Batched Google Sheet Updates) def process_website_batch(sheet_handler, start_row_index_in_sheet, end_row_index_in_sheet): """ - Batch-Prozess für Website-Scraping mit paralleler Verarbeitung. - Lädt Daten neu, prüft Timestamp AT, verarbeitet Websites parallel, - setzt AT + AP und sendet Sheet-Updates gebündelt. + Batch-Prozess für Website-Scraping mit gewünschtem Batch-Workflow: + 1. Sammle URLs für einen Verarbeitungs-Batch. + 2. Scrape Rohtexte parallel für diesen Batch. + 3. Sende gesammelte Rohtexte in einem Call an OpenAI. + 4. Schreibe alle Ergebnisse des Batches gebündelt ins Sheet. """ - debug_print(f"Starte Website-Scraping (Parallel Batch) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...") + debug_print(f"Starte Website-Scraping (Batch Workflow) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...") + # --- Konfiguration --- + PROCESSING_BATCH_SIZE = 15 # Wie viele Websites auf einmal holen/verarbeiten? + OPENAI_BATCH_SIZE_LIMIT = 8 # Wie viele Texte max. pro OpenAI Call (wegen Token Limits) + MAX_SCRAPING_WORKERS = 10 # Wie viele Threads für paralleles Scraping + + # --- Lade Initialdaten --- if not sheet_handler.load_data(): - debug_print("FEHLER beim Laden der Daten in process_website_batch.") + debug_print("FEHLER beim Laden der initialen Daten in process_website_batch.") return all_data = sheet_handler.get_all_data_with_headers() if not all_data or len(all_data) <= 5: - debug_print("FEHLER/WARNUNG: Keine Daten zum Verarbeiten in process_website_batch gefunden.") + debug_print("FEHLER/WARNUNG: Keine Daten zum Verarbeiten gefunden.") return - # Indizes und Buchstaben holen (wie zuvor) + # --- Indizes und Spaltenbuchstaben --- timestamp_col_key = "Website Scrape Timestamp" timestamp_col_index = COLUMN_MAP.get(timestamp_col_key) website_col_idx = COLUMN_MAP.get("CRM Website") @@ -2240,42 +2355,27 @@ def process_website_batch(sheet_handler, start_row_index_in_sheet, end_row_index summary_col_letter = sheet_handler._get_col_letter(summary_col_idx + 1) version_col_letter = sheet_handler._get_col_letter(version_col_idx + 1) - # --- Konfiguration für Parallelisierung --- - MAX_WORKERS = 10 # Anzahl gleichzeitiger Threads für Scraping/Summarization - # Semaphore, um OpenAI Calls zu limitieren (z.B. max 5 gleichzeitig) - openai_semaphore = threading.Semaphore(5) - - # --- Worker Funktion --- - def scrape_and_summarize_task(task_data): - row_num = task_data['row_num'] - url = task_data['url'] - debug_print(f"Task Zeile {row_num}: Starte Verarbeitung für {url}") + # --- Worker-Funktion nur für Scraping --- + def scrape_raw_text_task(task_info): + row_num = task_info['row_num'] + url = task_info['url'] raw_text = "k.A." - summary = "k.A." error = None try: - raw_text = get_website_raw(url) # Holt den Text - - # Limitiere OpenAI Calls mit Semaphore - with openai_semaphore: - debug_print(f"Task Zeile {row_num}: Rufe OpenAI für Zusammenfassung auf...") - # Füge hier einen kleinen Sleep hinzu, falls Rate-Limits auftreten - # time.sleep(0.5) # Optional: kleine Pause vor jedem Call - summary = summarize_website_content(raw_text) # Fasst zusammen - debug_print(f"Task Zeile {row_num}: OpenAI Zusammenfassung erhalten.") - + # debug_print(f"Scraping Task Zeile {row_num}: Hole {url}...") # Kann sehr laut werden + raw_text = get_website_raw(url) except Exception as e: - error = f"Fehler bei Verarbeitung Zeile {row_num}: {e}" + error = f"Fehler beim Scraping Zeile {row_num}: {e}" debug_print(error) - return {"row_num": row_num, "raw_text": raw_text, "summary": summary, "error": error} + return {"row_num": row_num, "raw_text": raw_text, "error": error} - # --- Hauptverarbeitungsschleife --- - all_sheet_updates = [] - processed_count = 0 - skipped_count = 0 - skipped_url_count = 0 - error_count = 0 - tasks_to_process = [] # Sammle Aufgaben für einen Thread-Pool-Batch + # --- Hauptverarbeitung --- + tasks_for_processing_batch = [] # Sammelt Tasks für einen kompletten Verarbeitungsbatch + all_sheet_updates = [] # Sammelt *alle* Sheet-Updates über mehrere Batches hinweg + processed_total_count = 0 + skipped_total_count = 0 + skipped_url_total_count = 0 + error_total_count = 0 for i in range(start_row_index_in_sheet, end_row_index_in_sheet + 1): row_index_in_list = i - 1 @@ -2288,71 +2388,118 @@ def process_website_batch(sheet_handler, start_row_index_in_sheet, end_row_index should_skip = True if should_skip: - skipped_count += 1 + skipped_total_count += 1 continue - # Prüfung auf gültige URL + # Gültige URL Prüfung website_url = row[website_col_idx] if len(row) > website_col_idx else "" if not website_url or website_url.strip().lower() == "k.a.": - skipped_url_count += 1 + skipped_url_total_count += 1 continue - # Füge Aufgabe zur Liste hinzu - tasks_to_process.append({"row_num": i, "url": website_url}) + # Aufgabe zum aktuellen Verarbeitungsbatch hinzufügen + tasks_for_processing_batch.append({"row_num": i, "url": website_url}) - # --- Parallele Ausführung --- - debug_print(f"Starte parallele Verarbeitung für {len(tasks_to_process)} Websites mit max. {MAX_WORKERS} Workern...") - results = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - # `map` behält die Reihenfolge der Ergebnisse bei (entsprechend tasks_to_process) - future_to_task = {executor.submit(scrape_and_summarize_task, task): task for task in tasks_to_process} - for future in concurrent.futures.as_completed(future_to_task): - task = future_to_task[future] - try: - result = future.result() - results.append(result) - if result["error"]: - error_count += 1 + # --- Verarbeitungs-Batch ausführen, wenn voll oder letzte Zeile --- + if len(tasks_for_processing_batch) >= PROCESSING_BATCH_SIZE or i == end_row_index_in_sheet: + if tasks_for_processing_batch: + debug_print(f"\n--- Starte Verarbeitungs-Batch für {len(tasks_for_processing_batch)} Zeilen (Start: {tasks_for_processing_batch[0]['row_num']}, Ende: {tasks_for_processing_batch[-1]['row_num']}) ---") + + # --- Schritt 2: Rohtexte parallel scrapen --- + scraping_results_dict = {} # {row_num: {'raw_text': '...', 'error': None}, ...} + debug_print(f" Scrape {len(tasks_for_processing_batch)} Websites parallel (max {MAX_SCRAPING_WORKERS} worker)...") + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_SCRAPING_WORKERS) as executor: + future_to_task = {executor.submit(scrape_raw_text_task, task): task for task in tasks_for_processing_batch} + for future in concurrent.futures.as_completed(future_to_task): + task = future_to_task[future] + try: + result = future.result() + scraping_results_dict[result['row_num']] = {"raw_text": result['raw_text'], "error": result['error']} + except Exception as exc: + row_num = task['row_num'] + err_msg = f"Generischer Fehler in Scraping Task für Zeile {row_num}: {exc}" + debug_print(err_msg) + scraping_results_dict[row_num] = {"raw_text": "k.A.", "error": err_msg} + error_total_count +=1 + + debug_print(f" Scraping für Batch beendet.") + + # --- Schritt 3: OpenAI Batch(es) vorbereiten und ausführen --- + openai_tasks = [] + tasks_without_valid_text = [] + for task_info in tasks_for_processing_batch: + row_num = task_info['row_num'] + scrape_result = scraping_results_dict.get(row_num) + if scrape_result and not scrape_result.get('error') and scrape_result.get('raw_text', 'k.A.') not in ['k.A.', 'k.A. (Nur Cookie-Banner erkannt)'] and str(scrape_result.get('raw_text')).strip(): + openai_tasks.append({'row_num': row_num, 'raw_text': scrape_result['raw_text']}) + else: + # Speichere Zeilen ohne gültigen Text für späteres Update + tasks_without_valid_text.append({'row_num': row_num, 'raw_text': scrape_result.get('raw_text', 'k.A.') if scrape_result else 'k.A. (Scraping fehlgeschlagen)'}) + + + all_summaries = {} # Sammelt Ergebnisse aller OpenAI Batches für diesen Verarbeitungsbatch + openai_task_batches = [openai_tasks[j:j + OPENAI_BATCH_SIZE_LIMIT] for j in range(0, len(openai_tasks), OPENAI_BATCH_SIZE_LIMIT)] + + if openai_task_batches: + debug_print(f" Bereite {len(openai_task_batches)} OpenAI Batch(es) vor (Größe max. {OPENAI_BATCH_SIZE_LIMIT})...") + for num, openai_batch in enumerate(openai_task_batches): + debug_print(f" Verarbeite OpenAI Batch {num+1}/{len(openai_task_batches)}...") + summaries_result = summarize_batch_openai(openai_batch) + all_summaries.update(summaries_result) + time.sleep(1) # Kurze Pause zwischen OpenAI Batches + debug_print(f" OpenAI Batch-Verarbeitung beendet.") else: - processed_count +=1 # Zähle nur erfolgreiche - except Exception as exc: - row_num = task['row_num'] - debug_print(f"Fehler in Task für Zeile {row_num}: {exc}") - results.append({"row_num": row_num, "raw_text": "k.A.", "summary": "k.A.", "error": str(exc)}) - error_count += 1 + debug_print(" Keine gültigen Rohtexte für OpenAI Batch-Verarbeitung vorhanden.") - debug_print(f"Parallele Verarbeitung abgeschlossen. {processed_count} erfolgreich, {error_count} mit Fehlern.") - # --- Ergebnisse verarbeiten und Sheet Updates vorbereiten --- - if results: - debug_print("Bereite Sheet-Updates vor...") - current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Ein Timestamp für den gesamten Batch - current_version = Config.VERSION - for res in results: - i = res["row_num"] - # Füge Updates auch bei Fehlern hinzu, um Timestamp zu setzen? Oder nur bei Erfolg? - # Hier: Setze Timestamp auch bei Fehler, aber mit k.A. Werten - raw_text = res.get("raw_text", "k.A.") - summary = res.get("summary", "k.A.") - # Wenn ein Fehler aufgetreten ist, schreibe ggf. den Fehler in die Zusammenfassung? - # if res.get("error"): - # summary = f"FEHLER: {res['error'][:200]}" # Kürze Fehlermeldung + # --- Schritt 4: Sheet Updates für den kompletten Verarbeitungs-Batch vorbereiten --- + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + current_version = Config.VERSION + batch_sheet_updates = [] # Updates nur für diesen Verarbeitungsbatch - all_sheet_updates.append({'range': f'{rohtext_col_letter}{i}', 'values': [[raw_text]]}) - all_sheet_updates.append({'range': f'{summary_col_letter}{i}', 'values': [[summary]]}) - all_sheet_updates.append({'range': f'{ts_col_letter}{i}', 'values': [[current_timestamp]]}) # AT Timestamp - all_sheet_updates.append({'range': f'{version_col_letter}{i}', 'values': [[current_version]]}) # AP Version + for task_info in tasks_for_processing_batch: + row_num = task_info['row_num'] + raw_text_res = scraping_results_dict.get(row_num, {"raw_text": "k.A. (Fehler)"})["raw_text"] + summary_res = all_summaries.get(row_num, "k.A. (Keine Zusammenf.)") - # --- Ein einziger Batch-Update-Call für alle Ergebnisse --- - if all_sheet_updates: - debug_print(f"Sende Batch-Update für {len(results)} verarbeitete Zeilen ({len(all_sheet_updates)} Zellen)...") - success = sheet_handler.batch_update_cells(all_sheet_updates) - if success: - debug_print(f"Sheet-Update für verarbeitete Zeilen erfolgreich.") - else: - debug_print(f"FEHLER beim finalen Sheet-Update.") + row_updates = [ + {'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]}, + {'range': f'{summary_col_letter}{row_num}', 'values': [[summary_res]]}, + {'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]}, # AT Timestamp + {'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]} # AP Version + ] + batch_sheet_updates.extend(row_updates) + processed_total_count += 1 # Zähle alle Zeilen, die diesen Punkt erreichen - debug_print(f"Website-Scraping (Parallel Batch) abgeschlossen. {processed_count} erfolgreich verarbeitet, {error_count} Fehler, {skipped_count} Zeilen wg. Timestamp übersprungen, {skipped_url_count} Zeilen ohne URL übersprungen.") + # Füge Updates dieses Verarbeitungs-Batches zur Gesamtliste hinzu + all_sheet_updates.extend(batch_sheet_updates) + debug_print(f" {len(batch_sheet_updates)} Sheet-Updates für diesen Batch vorbereitet.") + + # Sheet-Updates gebündelt senden (optional, könnte auch am Ende erfolgen) + # Hier: Senden nach jedem Verarbeitungs-Batch (Kompromiss) + if all_sheet_updates: + debug_print(f" Sende {len(all_sheet_updates)} Sheet-Updates für abgeschlossenen Batch...") + success = sheet_handler.batch_update_cells(all_sheet_updates) + if success: + debug_print(f" Sheet-Update für Batch bis Zeile {i} erfolgreich.") + else: + debug_print(f" FEHLER beim Sheet-Update für Batch bis Zeile {i}.") + all_sheet_updates = [] # Liste für den nächsten großen Sheet-Update-Batch leeren + + # Verarbeitungs-Batch leeren + tasks_for_processing_batch = [] + # Kurze Pause nach Verarbeitung eines kompletten Batches + time.sleep(2) + + # --- Ende der if tasks_for_processing_batch --- + # --- Ende der for-Schleife über alle Zeilen --- + + # Ggf. letzten, nicht gesendeten Sheet-Update Batch senden (sollte nicht nötig sein, wenn nach jedem Batch gesendet wird) + # if all_sheet_updates: + # debug_print(f"Sende LETZTES Sheet-Update für {len(all_sheet_updates)} Zellen...") + # sheet_handler.batch_update_cells(all_sheet_updates) + + debug_print(f"Website-Scraping (Batch Workflow) abgeschlossen. {processed_total_count} Zeilen verarbeitet (inkl. Fehler), {error_total_count} Fehler aufgetreten, {skipped_count} Zeilen wg. Timestamp übersprungen, {skipped_url_count} Zeilen ohne URL übersprungen.") # Komplette Funktion process_branch_batch (prüft jetzt Timestamp AO mit erzwungenem Debugging) # Komplette Funktion process_branch_batch (MIT Korrektur und Prüfung auf AO)