diff --git a/brancheneinstufung.py b/brancheneinstufung.py index 2bfde7a2..e0da1eb0 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -34,6 +34,8 @@ from sklearn.tree import DecisionTreeClassifier, export_text from sklearn.metrics import accuracy_score, classification_report, confusion_matrix import json # Zum Speichern der Muster als JSON import pickle # Zum Speichern des trainierten Modells und Imputers +import concurrent.futures +import threading # Optional: tiktoken für Token-Zählung (Modus 8) try: @@ -2209,11 +2211,11 @@ def _process_batch(sheet, batches, row_numbers): # Komplette Funktion process_website_batch (MIT Batched Google Sheet Updates) def process_website_batch(sheet_handler, start_row_index_in_sheet, end_row_index_in_sheet): """ - Batch-Prozess für Website-Scraping. Lädt Daten neu, prüft für jede Zeile - im Bereich, ob Timestamp AT bereits gesetzt ist und überspringt diese ggf. - Setzt AT + AP für bearbeitete Zeilen. Sendet Updates in Batches an Google Sheets. + Batch-Prozess für Website-Scraping mit paralleler Verarbeitung. + Lädt Daten neu, prüft Timestamp AT, verarbeitet Websites parallel, + setzt AT + AP und sendet Sheet-Updates gebündelt. """ - debug_print(f"Starte Website-Scraping (Batch) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...") + debug_print(f"Starte Website-Scraping (Parallel Batch) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...") if not sheet_handler.load_data(): debug_print("FEHLER beim Laden der Daten in process_website_batch.") @@ -2223,107 +2225,134 @@ def process_website_batch(sheet_handler, start_row_index_in_sheet, end_row_index debug_print("FEHLER/WARNUNG: Keine Daten zum Verarbeiten in process_website_batch gefunden.") return - # Hole Indizes und Spaltenbuchstaben (wie zuvor) + # Indizes und Buchstaben holen (wie zuvor) timestamp_col_key = "Website Scrape Timestamp" timestamp_col_index = COLUMN_MAP.get(timestamp_col_key) website_col_idx = COLUMN_MAP.get("CRM Website") rohtext_col_idx = COLUMN_MAP.get("Website Rohtext") summary_col_idx = COLUMN_MAP.get("Website Zusammenfassung") version_col_idx = COLUMN_MAP.get("Version") - if None in [timestamp_col_index, website_col_idx, rohtext_col_idx, summary_col_idx, version_col_idx]: debug_print(f"FEHLER: Mindestens ein benötigter Spaltenindex für process_website_batch fehlt in COLUMN_MAP.") return - ts_col_letter = sheet_handler._get_col_letter(timestamp_col_index + 1) rohtext_col_letter = sheet_handler._get_col_letter(rohtext_col_idx + 1) summary_col_letter = sheet_handler._get_col_letter(summary_col_idx + 1) version_col_letter = sheet_handler._get_col_letter(version_col_idx + 1) - # --- NEU: Liste für gesammelte Updates --- - all_sheet_updates = [] - rows_in_current_batch = 0 - update_batch_size = Config.BATCH_SIZE * 4 # Da wir 4 Zellen pro Zeile updaten, Batch entsprechend anpassen? Oder einfach 50? - # Nehmen wir eine feste Anzahl von Zeilen, deren Updates wir sammeln - update_batch_row_limit = 50 # Sammle Updates für 50 Zeilen, bevor gesendet wird + # --- Konfiguration für Parallelisierung --- + MAX_WORKERS = 10 # Anzahl gleichzeitiger Threads für Scraping/Summarization + # Semaphore, um OpenAI Calls zu limitieren (z.B. max 5 gleichzeitig) + openai_semaphore = threading.Semaphore(5) + # --- Worker Funktion --- + def scrape_and_summarize_task(task_data): + row_num = task_data['row_num'] + url = task_data['url'] + debug_print(f"Task Zeile {row_num}: Starte Verarbeitung für {url}") + raw_text = "k.A." + summary = "k.A." + error = None + try: + raw_text = get_website_raw(url) # Holt den Text + + # Limitiere OpenAI Calls mit Semaphore + with openai_semaphore: + debug_print(f"Task Zeile {row_num}: Rufe OpenAI für Zusammenfassung auf...") + # Füge hier einen kleinen Sleep hinzu, falls Rate-Limits auftreten + # time.sleep(0.5) # Optional: kleine Pause vor jedem Call + summary = summarize_website_content(raw_text) # Fasst zusammen + debug_print(f"Task Zeile {row_num}: OpenAI Zusammenfassung erhalten.") + + except Exception as e: + error = f"Fehler bei Verarbeitung Zeile {row_num}: {e}" + debug_print(error) + return {"row_num": row_num, "raw_text": raw_text, "summary": summary, "error": error} + + # --- Hauptverarbeitungsschleife --- + all_sheet_updates = [] processed_count = 0 skipped_count = 0 skipped_url_count = 0 + error_count = 0 + tasks_to_process = [] # Sammle Aufgaben für einen Thread-Pool-Batch for i in range(start_row_index_in_sheet, end_row_index_in_sheet + 1): row_index_in_list = i - 1 if row_index_in_list >= len(all_data): continue row = all_data[row_index_in_list] - # --- Timestamp-Prüfung (AT) --- + # Timestamp-Prüfung (AT) should_skip = False - if len(row) > timestamp_col_index: - if str(row[timestamp_col_index]).strip(): - should_skip = True - # Optional: Debugging der Prüfung reduzieren - # if i % 100 == 0: debug_print(f"Zeile {i} (Website Check): Prüfe TS {ts_col_letter}. Überspringen? -> {should_skip}") + if len(row) > timestamp_col_index and str(row[timestamp_col_index]).strip(): + should_skip = True if should_skip: skipped_count += 1 continue - else: - # --- Verarbeitung (wenn nicht übersprungen) --- - website_url = row[website_col_idx] if len(row) > website_col_idx else "" - if not website_url or website_url.strip().lower() == "k.a.": - skipped_url_count += 1 - continue - # Scrapen und Zusammenfassen - raw_text = get_website_raw(website_url) - summary = summarize_website_content(raw_text) - processed_count += 1 + # Prüfung auf gültige URL + website_url = row[website_col_idx] if len(row) > website_col_idx else "" + if not website_url or website_url.strip().lower() == "k.a.": + skipped_url_count += 1 + continue - # Einzelne Updates für diese Zeile vorbereiten - current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - current_version = Config.VERSION - row_updates = [ - {'range': f'{rohtext_col_letter}{i}', 'values': [[raw_text]]}, - {'range': f'{summary_col_letter}{i}', 'values': [[summary]]}, - {'range': f'{ts_col_letter}{i}', 'values': [[current_timestamp]]}, # AT Timestamp - {'range': f'{version_col_letter}{i}', 'values': [[current_version]]} # AP Version - ] + # Füge Aufgabe zur Liste hinzu + tasks_to_process.append({"row_num": i, "url": website_url}) - # --- NEU: Updates sammeln --- - all_sheet_updates.extend(row_updates) - rows_in_current_batch += 1 + # --- Parallele Ausführung --- + debug_print(f"Starte parallele Verarbeitung für {len(tasks_to_process)} Websites mit max. {MAX_WORKERS} Workern...") + results = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # `map` behält die Reihenfolge der Ergebnisse bei (entsprechend tasks_to_process) + future_to_task = {executor.submit(scrape_and_summarize_task, task): task for task in tasks_to_process} + for future in concurrent.futures.as_completed(future_to_task): + task = future_to_task[future] + try: + result = future.result() + results.append(result) + if result["error"]: + error_count += 1 + else: + processed_count +=1 # Zähle nur erfolgreiche + except Exception as exc: + row_num = task['row_num'] + debug_print(f"Fehler in Task für Zeile {row_num}: {exc}") + results.append({"row_num": row_num, "raw_text": "k.A.", "summary": "k.A.", "error": str(exc)}) + error_count += 1 - # --- NEU: Batch senden, wenn Limit erreicht oder letzte Zeile --- - # Prüfe, ob die Anzahl der *Zeilen*, deren Updates gesammelt wurden, das Limit erreicht - if rows_in_current_batch >= update_batch_row_limit or i == end_row_index_in_sheet: - if all_sheet_updates: # Nur senden, wenn Updates vorhanden sind - debug_print(f"Sende Batch-Update für {rows_in_current_batch} Zeilen (insgesamt {len(all_sheet_updates)} Zellen)...") - success = sheet_handler.batch_update_cells(all_sheet_updates) - if success: - debug_print(f"Sheet-Update für Zeilen bis {i} erfolgreich.") - else: - debug_print(f"FEHLER beim Sheet-Update für Zeilen bis {i}.") - # Optional: Hier überlegen, ob man abbricht oder weitermacht + debug_print(f"Parallele Verarbeitung abgeschlossen. {processed_count} erfolgreich, {error_count} mit Fehlern.") - # Liste und Zähler zurücksetzen - all_sheet_updates = [] - rows_in_current_batch = 0 + # --- Ergebnisse verarbeiten und Sheet Updates vorbereiten --- + if results: + debug_print("Bereite Sheet-Updates vor...") + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Ein Timestamp für den gesamten Batch + current_version = Config.VERSION + for res in results: + i = res["row_num"] + # Füge Updates auch bei Fehlern hinzu, um Timestamp zu setzen? Oder nur bei Erfolg? + # Hier: Setze Timestamp auch bei Fehler, aber mit k.A. Werten + raw_text = res.get("raw_text", "k.A.") + summary = res.get("summary", "k.A.") + # Wenn ein Fehler aufgetreten ist, schreibe ggf. den Fehler in die Zusammenfassung? + # if res.get("error"): + # summary = f"FEHLER: {res['error'][:200]}" # Kürze Fehlermeldung - # Kurze Pause nach jeder Website-Verarbeitung (oder nur nach Batch-Update?) - # Besser hier lassen, um APIs nicht zu überlasten - time.sleep(Config.RETRY_DELAY) - # --- Ende ELSE-Block --- + all_sheet_updates.append({'range': f'{rohtext_col_letter}{i}', 'values': [[raw_text]]}) + all_sheet_updates.append({'range': f'{summary_col_letter}{i}', 'values': [[summary]]}) + all_sheet_updates.append({'range': f'{ts_col_letter}{i}', 'values': [[current_timestamp]]}) # AT Timestamp + all_sheet_updates.append({'range': f'{version_col_letter}{i}', 'values': [[current_version]]}) # AP Version - # --- NEU: Ggf. letzte gesammelte Updates senden --- - if all_sheet_updates: - debug_print(f"Sende letztes Batch-Update für {rows_in_current_batch} Zeilen ({len(all_sheet_updates)} Zellen)...") - success = sheet_handler.batch_update_cells(all_sheet_updates) - if success: - debug_print(f"Letztes Sheet-Update erfolgreich.") - else: - debug_print(f"FEHLER beim letzten Sheet-Update.") + # --- Ein einziger Batch-Update-Call für alle Ergebnisse --- + if all_sheet_updates: + debug_print(f"Sende Batch-Update für {len(results)} verarbeitete Zeilen ({len(all_sheet_updates)} Zellen)...") + success = sheet_handler.batch_update_cells(all_sheet_updates) + if success: + debug_print(f"Sheet-Update für verarbeitete Zeilen erfolgreich.") + else: + debug_print(f"FEHLER beim finalen Sheet-Update.") - debug_print(f"Website-Scraping (Batch) abgeschlossen. {processed_count} Websites gescraped, {skipped_count} Zeilen wg. Timestamp übersprungen, {skipped_url_count} Zeilen ohne URL übersprungen.") + debug_print(f"Website-Scraping (Parallel Batch) abgeschlossen. {processed_count} erfolgreich verarbeitet, {error_count} Fehler, {skipped_count} Zeilen wg. Timestamp übersprungen, {skipped_url_count} Zeilen ohne URL übersprungen.") # Komplette Funktion process_branch_batch (prüft jetzt Timestamp AO mit erzwungenem Debugging) # Komplette Funktion process_branch_batch (MIT Korrektur und Prüfung auf AO)