This commit is contained in:
2025-04-17 09:02:42 +00:00
parent e8b5944e8f
commit b955ee1862

View File

@@ -34,6 +34,8 @@ from sklearn.tree import DecisionTreeClassifier, export_text
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import json # Zum Speichern der Muster als JSON
import pickle # Zum Speichern des trainierten Modells und Imputers
import concurrent.futures
import threading
# Optional: tiktoken für Token-Zählung (Modus 8)
try:
@@ -2209,11 +2211,11 @@ def _process_batch(sheet, batches, row_numbers):
# Komplette Funktion process_website_batch (MIT Batched Google Sheet Updates)
def process_website_batch(sheet_handler, start_row_index_in_sheet, end_row_index_in_sheet):
"""
Batch-Prozess für Website-Scraping. Lädt Daten neu, prüft für jede Zeile
im Bereich, ob Timestamp AT bereits gesetzt ist und überspringt diese ggf.
Setzt AT + AP für bearbeitete Zeilen. Sendet Updates in Batches an Google Sheets.
Batch-Prozess für Website-Scraping mit paralleler Verarbeitung.
Lädt Daten neu, prüft Timestamp AT, verarbeitet Websites parallel,
setzt AT + AP und sendet Sheet-Updates gebündelt.
"""
debug_print(f"Starte Website-Scraping (Batch) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...")
debug_print(f"Starte Website-Scraping (Parallel Batch) für Zeilen {start_row_index_in_sheet} bis {end_row_index_in_sheet}...")
if not sheet_handler.load_data():
debug_print("FEHLER beim Laden der Daten in process_website_batch.")
@@ -2223,107 +2225,134 @@ def process_website_batch(sheet_handler, start_row_index_in_sheet, end_row_index
debug_print("FEHLER/WARNUNG: Keine Daten zum Verarbeiten in process_website_batch gefunden.")
return
# Hole Indizes und Spaltenbuchstaben (wie zuvor)
# Indizes und Buchstaben holen (wie zuvor)
timestamp_col_key = "Website Scrape Timestamp"
timestamp_col_index = COLUMN_MAP.get(timestamp_col_key)
website_col_idx = COLUMN_MAP.get("CRM Website")
rohtext_col_idx = COLUMN_MAP.get("Website Rohtext")
summary_col_idx = COLUMN_MAP.get("Website Zusammenfassung")
version_col_idx = COLUMN_MAP.get("Version")
if None in [timestamp_col_index, website_col_idx, rohtext_col_idx, summary_col_idx, version_col_idx]:
debug_print(f"FEHLER: Mindestens ein benötigter Spaltenindex für process_website_batch fehlt in COLUMN_MAP.")
return
ts_col_letter = sheet_handler._get_col_letter(timestamp_col_index + 1)
rohtext_col_letter = sheet_handler._get_col_letter(rohtext_col_idx + 1)
summary_col_letter = sheet_handler._get_col_letter(summary_col_idx + 1)
version_col_letter = sheet_handler._get_col_letter(version_col_idx + 1)
# --- NEU: Liste für gesammelte Updates ---
all_sheet_updates = []
rows_in_current_batch = 0
update_batch_size = Config.BATCH_SIZE * 4 # Da wir 4 Zellen pro Zeile updaten, Batch entsprechend anpassen? Oder einfach 50?
# Nehmen wir eine feste Anzahl von Zeilen, deren Updates wir sammeln
update_batch_row_limit = 50 # Sammle Updates für 50 Zeilen, bevor gesendet wird
# --- Konfiguration für Parallelisierung ---
MAX_WORKERS = 10 # Anzahl gleichzeitiger Threads für Scraping/Summarization
# Semaphore, um OpenAI Calls zu limitieren (z.B. max 5 gleichzeitig)
openai_semaphore = threading.Semaphore(5)
# --- Worker Funktion ---
def scrape_and_summarize_task(task_data):
row_num = task_data['row_num']
url = task_data['url']
debug_print(f"Task Zeile {row_num}: Starte Verarbeitung für {url}")
raw_text = "k.A."
summary = "k.A."
error = None
try:
raw_text = get_website_raw(url) # Holt den Text
# Limitiere OpenAI Calls mit Semaphore
with openai_semaphore:
debug_print(f"Task Zeile {row_num}: Rufe OpenAI für Zusammenfassung auf...")
# Füge hier einen kleinen Sleep hinzu, falls Rate-Limits auftreten
# time.sleep(0.5) # Optional: kleine Pause vor jedem Call
summary = summarize_website_content(raw_text) # Fasst zusammen
debug_print(f"Task Zeile {row_num}: OpenAI Zusammenfassung erhalten.")
except Exception as e:
error = f"Fehler bei Verarbeitung Zeile {row_num}: {e}"
debug_print(error)
return {"row_num": row_num, "raw_text": raw_text, "summary": summary, "error": error}
# --- Hauptverarbeitungsschleife ---
all_sheet_updates = []
processed_count = 0
skipped_count = 0
skipped_url_count = 0
error_count = 0
tasks_to_process = [] # Sammle Aufgaben für einen Thread-Pool-Batch
for i in range(start_row_index_in_sheet, end_row_index_in_sheet + 1):
row_index_in_list = i - 1
if row_index_in_list >= len(all_data): continue
row = all_data[row_index_in_list]
# --- Timestamp-Prüfung (AT) ---
# Timestamp-Prüfung (AT)
should_skip = False
if len(row) > timestamp_col_index:
if str(row[timestamp_col_index]).strip():
should_skip = True
# Optional: Debugging der Prüfung reduzieren
# if i % 100 == 0: debug_print(f"Zeile {i} (Website Check): Prüfe TS {ts_col_letter}. Überspringen? -> {should_skip}")
if len(row) > timestamp_col_index and str(row[timestamp_col_index]).strip():
should_skip = True
if should_skip:
skipped_count += 1
continue
else:
# --- Verarbeitung (wenn nicht übersprungen) ---
website_url = row[website_col_idx] if len(row) > website_col_idx else ""
if not website_url or website_url.strip().lower() == "k.a.":
skipped_url_count += 1
continue
# Scrapen und Zusammenfassen
raw_text = get_website_raw(website_url)
summary = summarize_website_content(raw_text)
processed_count += 1
# Prüfung auf gültige URL
website_url = row[website_col_idx] if len(row) > website_col_idx else ""
if not website_url or website_url.strip().lower() == "k.a.":
skipped_url_count += 1
continue
# Einzelne Updates für diese Zeile vorbereiten
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_version = Config.VERSION
row_updates = [
{'range': f'{rohtext_col_letter}{i}', 'values': [[raw_text]]},
{'range': f'{summary_col_letter}{i}', 'values': [[summary]]},
{'range': f'{ts_col_letter}{i}', 'values': [[current_timestamp]]}, # AT Timestamp
{'range': f'{version_col_letter}{i}', 'values': [[current_version]]} # AP Version
]
# Füge Aufgabe zur Liste hinzu
tasks_to_process.append({"row_num": i, "url": website_url})
# --- NEU: Updates sammeln ---
all_sheet_updates.extend(row_updates)
rows_in_current_batch += 1
# --- Parallele Ausführung ---
debug_print(f"Starte parallele Verarbeitung für {len(tasks_to_process)} Websites mit max. {MAX_WORKERS} Workern...")
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# `map` behält die Reihenfolge der Ergebnisse bei (entsprechend tasks_to_process)
future_to_task = {executor.submit(scrape_and_summarize_task, task): task for task in tasks_to_process}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result()
results.append(result)
if result["error"]:
error_count += 1
else:
processed_count +=1 # Zähle nur erfolgreiche
except Exception as exc:
row_num = task['row_num']
debug_print(f"Fehler in Task für Zeile {row_num}: {exc}")
results.append({"row_num": row_num, "raw_text": "k.A.", "summary": "k.A.", "error": str(exc)})
error_count += 1
# --- NEU: Batch senden, wenn Limit erreicht oder letzte Zeile ---
# Prüfe, ob die Anzahl der *Zeilen*, deren Updates gesammelt wurden, das Limit erreicht
if rows_in_current_batch >= update_batch_row_limit or i == end_row_index_in_sheet:
if all_sheet_updates: # Nur senden, wenn Updates vorhanden sind
debug_print(f"Sende Batch-Update für {rows_in_current_batch} Zeilen (insgesamt {len(all_sheet_updates)} Zellen)...")
success = sheet_handler.batch_update_cells(all_sheet_updates)
if success:
debug_print(f"Sheet-Update für Zeilen bis {i} erfolgreich.")
else:
debug_print(f"FEHLER beim Sheet-Update für Zeilen bis {i}.")
# Optional: Hier überlegen, ob man abbricht oder weitermacht
debug_print(f"Parallele Verarbeitung abgeschlossen. {processed_count} erfolgreich, {error_count} mit Fehlern.")
# Liste und Zähler zurücksetzen
all_sheet_updates = []
rows_in_current_batch = 0
# --- Ergebnisse verarbeiten und Sheet Updates vorbereiten ---
if results:
debug_print("Bereite Sheet-Updates vor...")
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Ein Timestamp für den gesamten Batch
current_version = Config.VERSION
for res in results:
i = res["row_num"]
# Füge Updates auch bei Fehlern hinzu, um Timestamp zu setzen? Oder nur bei Erfolg?
# Hier: Setze Timestamp auch bei Fehler, aber mit k.A. Werten
raw_text = res.get("raw_text", "k.A.")
summary = res.get("summary", "k.A.")
# Wenn ein Fehler aufgetreten ist, schreibe ggf. den Fehler in die Zusammenfassung?
# if res.get("error"):
# summary = f"FEHLER: {res['error'][:200]}" # Kürze Fehlermeldung
# Kurze Pause nach jeder Website-Verarbeitung (oder nur nach Batch-Update?)
# Besser hier lassen, um APIs nicht zu überlasten
time.sleep(Config.RETRY_DELAY)
# --- Ende ELSE-Block ---
all_sheet_updates.append({'range': f'{rohtext_col_letter}{i}', 'values': [[raw_text]]})
all_sheet_updates.append({'range': f'{summary_col_letter}{i}', 'values': [[summary]]})
all_sheet_updates.append({'range': f'{ts_col_letter}{i}', 'values': [[current_timestamp]]}) # AT Timestamp
all_sheet_updates.append({'range': f'{version_col_letter}{i}', 'values': [[current_version]]}) # AP Version
# --- NEU: Ggf. letzte gesammelte Updates senden ---
if all_sheet_updates:
debug_print(f"Sende letztes Batch-Update für {rows_in_current_batch} Zeilen ({len(all_sheet_updates)} Zellen)...")
success = sheet_handler.batch_update_cells(all_sheet_updates)
if success:
debug_print(f"Letztes Sheet-Update erfolgreich.")
else:
debug_print(f"FEHLER beim letzten Sheet-Update.")
# --- Ein einziger Batch-Update-Call für alle Ergebnisse ---
if all_sheet_updates:
debug_print(f"Sende Batch-Update für {len(results)} verarbeitete Zeilen ({len(all_sheet_updates)} Zellen)...")
success = sheet_handler.batch_update_cells(all_sheet_updates)
if success:
debug_print(f"Sheet-Update für verarbeitete Zeilen erfolgreich.")
else:
debug_print(f"FEHLER beim finalen Sheet-Update.")
debug_print(f"Website-Scraping (Batch) abgeschlossen. {processed_count} Websites gescraped, {skipped_count} Zeilen wg. Timestamp übersprungen, {skipped_url_count} Zeilen ohne URL übersprungen.")
debug_print(f"Website-Scraping (Parallel Batch) abgeschlossen. {processed_count} erfolgreich verarbeitet, {error_count} Fehler, {skipped_count} Zeilen wg. Timestamp übersprungen, {skipped_url_count} Zeilen ohne URL übersprungen.")
# Komplette Funktion process_branch_batch (prüft jetzt Timestamp AO mit erzwungenem Debugging)
# Komplette Funktion process_branch_batch (MIT Korrektur und Prüfung auf AO)