This commit is contained in:
2025-04-24 18:00:24 +00:00
parent 7928c2dcd5
commit 1fde11db9a

View File

@@ -4073,67 +4073,215 @@ class DataProcessor:
f"{processed_count} Zeilen in Batches verarbeitet." f"{processed_count} Zeilen in Batches verarbeitet."
) )
# process_website_batch Methode
def process_website_batch(self, limit=None): def process_website_batch(self, limit=None):
""" """
Batch-Prozess NUR für Website-Scraping (Rohtext AR, Timestamp AT). Batch-Prozess NUR für Website-Scraping (Rohtext AR, Timestamp AT).
Findet Startzeile ab erster Zelle mit leerem AT. Findet Startzeile ab erster Zelle mit leerem AT.
""" """
logging.info(f"Starte Website-Scraping Batch. Limit: {limit if limit is not None else 'Unbegrenzt'}") logging.info(
if not self.sheet_handler.load_data(): return logging.error("FEHLER beim Laden der Daten.") f"Starte Website-Scraping Batch. "
all_data = self.sheet_handler.get_all_data_with_headers(); header_rows = 5 f"Limit: {limit if limit is not None else 'Unbegrenzt'}"
if not all_data or len(all_data) <= header_rows: return logging.warning("Keine Daten gefunden.") )
if not self.sheet_handler.load_data():
return logging.error("FEHLER beim Laden der Daten.")
rohtext_col_key = "Website Rohtext"; rohtext_col_index = COLUMN_MAP.get(rohtext_col_key) all_data = self.sheet_handler.get_all_data_with_headers()
website_col_idx = COLUMN_MAP.get("CRM Website"); version_col_idx = COLUMN_MAP.get("Version") header_rows = 5
timestamp_col_key = "Website Scrape Timestamp"; timestamp_col_index = COLUMN_MAP.get(timestamp_col_key) if not all_data or len(all_data) <= header_rows:
if None in [rohtext_col_index, website_col_idx, version_col_idx, timestamp_col_index]: return logging.critical(f"FEHLER: Benötigte Indizes fehlen."); return logging.warning("Keine Daten gefunden.")
rohtext_col_letter = self.sheet_handler._get_col_letter(rohtext_col_index + 1); version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1); ts_col_letter = self.sheet_handler._get_col_letter(timestamp_col_index + 1)
start_data_index = self.sheet_handler.get_start_row_index(check_column_key=timestamp_col_key, min_sheet_row=header_rows + 1); if start_data_index == -1: return logging.error(f"FEHLER bei Startzeilensuche."); if start_data_index >= len(self.sheet_handler.get_data()): logging.info("Alle Zeilen mit Timestamp gefüllt. Nichts zu tun."); return # Spalten-Indizes holen
rohtext_col_key = "Website Rohtext"
rohtext_col_index = COLUMN_MAP.get(rohtext_col_key)
website_col_idx = COLUMN_MAP.get("CRM Website")
version_col_idx = COLUMN_MAP.get("Version")
timestamp_col_key = "Website Scrape Timestamp"
timestamp_col_index = COLUMN_MAP.get(timestamp_col_key)
start_sheet_row = start_data_index + header_rows + 1; total_sheet_rows = len(all_data); end_sheet_row = total_sheet_rows # Prüfen, dass alle Indizes da sind
if limit is not None and limit >= 0: end_sheet_row = min(start_sheet_row + limit - 1, total_sheet_rows); if limit == 0: logging.info("Limit 0."); return if None in [rohtext_col_index, website_col_idx, version_col_idx, timestamp_col_index]:
if start_sheet_row > end_sheet_row: logging.warning("Start nach Ende (Limit)."); return return logging.critical("FEHLER: Benötigte Indizes fehlen.")
logging.info(f"Verarbeite Sheet-Zeilen {start_sheet_row} bis {end_sheet_row} für Website Scraping (Batch).")
# Worker-Funktion für Scraping (Globale Helferfunktion) # Spaltenbuchstaben ermitteln
def scrape_raw_text_task(task_info): # Needs access to get_website_raw (global) rohtext_col_letter = self.sheet_handler._get_col_letter(rohtext_col_index + 1)
row_num = task_info['row_num']; url = task_info['url']; raw_text = "k.A."; error = None version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1)
try: raw_text = get_website_raw(url); # Annahme: get_website_raw ist global mit Retry ts_col_letter = self.sheet_handler._get_col_letter(timestamp_col_index + 1)
except Exception as e: error = f"Scraping Fehler Zeile {row_num}: {e}"; logging.error(error);
return {"row_num": row_num, "raw_text": raw_text, "error": error}
tasks_for_processing_batch = []; all_sheet_updates = []; processed_count = 0; skipped_url_count = 0 # Erste zu bearbeitende Zeile finden (erstes leeres Timestamp-Feld)
processing_batch_size = Config.PROCESSING_BATCH_SIZE; max_scraping_workers = Config.MAX_SCRAPING_WORKERS; start_data_index = self.sheet_handler.get_start_row_index(
check_column_key=timestamp_col_key,
min_sheet_row=header_rows + 1
)
if start_data_index == -1:
return logging.error("FEHLER bei Startzeilensuche.")
if start_data_index >= len(self.sheet_handler.get_data()):
logging.info("Alle Zeilen mit Timestamp gefüllt. Nichts zu tun.")
return
for i in range(start_sheet_row, end_sheet_row + 1): start_sheet_row = start_data_index + header_rows + 1
row_index_in_list = i - 1; row = all_data[row_index_in_list] total_sheet_rows = len(all_data)
website_url = row[website_col_idx] if len(row) > website_col_idx else ""; if not website_url or website_url.strip().lower() == "k.A.": skipped_url_count += 1; continue end_sheet_row = total_sheet_rows
tasks_for_processing_batch.append({"row_num": i, "url": website_url}); processed_count += 1
if len(tasks_for_processing_batch) >= processing_batch_size or i == end_sheet_row: # Limit auswerten
if tasks_for_processing_batch: if limit is not None and limit >= 0:
batch_start_row = tasks_for_processing_batch[0]['row_num']; batch_end_row = tasks_for_processing_batch[-1]['row_num']; batch_task_count = len(tasks_for_processing_batch) end_sheet_row = min(start_sheet_row + limit - 1, total_sheet_rows)
logging.info(f"\n--- Starte Scraping-Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") if limit == 0:
scraping_results = {}; batch_error_count = 0; logging.info(f" Scrape {batch_task_count} Websites parallel (max {max_scraping_workers} worker)...") logging.info("Limit 0.")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor: return
future_to_task = {executor.submit(scrape_raw_text_task, task): task for task in tasks_for_processing_batch} if start_sheet_row > end_sheet_row:
for future in concurrent.futures.as_completed(future_to_task): logging.warning("Start nach Ende (Limit).")
task = future_to_task[future]; try: result = future.result(); scraping_results[result['row_num']] = result['raw_text']; if result['error']: batch_error_count += 1; return
except Exception as exc: row_num = task['row_num']; err_msg = f"Generischer Fehler Scraping Task Zeile {row_num}: {exc}"; logging.error(err_msg); scraping_results[row_num] = "k.A. (Fehler)"; batch_error_count += 1;
logging.info(f" Scraping für Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).")
if scraping_results:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S"); batch_sheet_updates = [];
for row_num, raw_text_res in scraping_results.items():
batch_sheet_updates.extend([ {'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]}, {'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]} ])
all_sheet_updates.extend(batch_sheet_updates);
if all_sheet_updates: logging.info(f" Sende Sheet-Update für {len(all_sheet_updates)} Zellen für Batch {batch_start_row}-{batch_end_row}..."); success = self.sheet_handler.batch_update_cells(all_sheet_updates); if success: logging.info(f" Sheet-Update erfolgreich."); else: logging.error(f" FEHLER beim Sheet-Update."); all_sheet_updates = [];
logging.debug(" Warte nach Batch..."); time.sleep(Config.RETRY_DELAY);
tasks_for_processing_batch = [];
if all_sheet_updates: logging.info(f"Sende finalen Sheet-Update ({len(all_sheet_updates)} Zellen)..."); self.sheet_handler.batch_update_cells(all_sheet_updates); logging.info(
logging.info(f"Website-Scraping Batch abgeschlossen. {processed_count} Tasks erstellt, {skipped_url_count} Zeilen ohne URL übersprungen.") f"Verarbeite Sheet-Zeilen {start_sheet_row} bis {end_sheet_row} "
"für Website-Scraping (Batch)."
)
# Helfer-Funktion fürs parallele Scraping
def scrape_raw_text_task(task_info):
row_num = task_info["row_num"]
url = task_info["url"]
raw_text = "k.A."
error = None
try:
raw_text = get_website_raw(url)
except Exception as e:
error = f"Scraping Fehler Zeile {row_num}: {e}"
logging.error(error)
return {
"row_num": row_num,
"raw_text": raw_text,
"error": error
}
tasks_for_processing_batch = []
all_sheet_updates = []
processed_count = 0
skipped_url_count = 0
processing_batch_size = Config.PROCESSING_BATCH_SIZE
max_scraping_workers = Config.MAX_SCRAPING_WORKERS
# Durch alle Zeilen iterieren
for i in range(start_sheet_row, end_sheet_row + 1):
row_index_in_list = i - 1
row = all_data[row_index_in_list]
# URL auslesen und überspringen, falls k.A.
website_url = ""
if len(row) > website_col_idx:
website_url = row[website_col_idx]
if not website_url or website_url.strip().lower() == "k.a.":
skipped_url_count += 1
continue
# Job anlegen
tasks_for_processing_batch.append({
"row_num": i,
"url": website_url
})
processed_count += 1
# Batch abarbeiten, wenn voll oder am Ende
if (len(tasks_for_processing_batch) >= processing_batch_size
or i == end_sheet_row):
if tasks_for_processing_batch:
batch_start_row = tasks_for_processing_batch[0]["row_num"]
batch_end_row = tasks_for_processing_batch[-1]["row_num"]
batch_task_count = len(tasks_for_processing_batch)
logging.info(
f"--- Starte Scraping-Batch ({batch_task_count} Tasks, "
f"Zeilen {batch_start_row}-{batch_end_row}) ---"
)
logging.info(
f"Scrape {batch_task_count} Websites parallel "
f"(max {max_scraping_workers} workers)..."
)
scraping_results = {}
batch_error_count = 0
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_scraping_workers
) as executor:
future_to_task = {
executor.submit(scrape_raw_text_task, task): task
for task in tasks_for_processing_batch
}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result()
scraping_results[result["row_num"]] = result["raw_text"]
if result["error"]:
batch_error_count += 1
except Exception as exc:
row_num = task["row_num"]
err_msg = (
f"Generischer Fehler Scraping Task Zeile {row_num}: {exc}"
)
logging.error(err_msg)
scraping_results[row_num] = "k.A. (Fehler)"
batch_error_count += 1
logging.info(
f"Scraping für Batch beendet: "
f"{len(scraping_results)} Ergebnisse, "
f"{batch_error_count} Fehler."
)
# Updates vorbereiten
if scraping_results:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
batch_sheet_updates = []
for row_num, raw_text_res in scraping_results.items():
batch_sheet_updates.extend([
{
"range": f"{rohtext_col_letter}{row_num}",
"values": [[raw_text_res]]
},
{
"range": f"{ts_col_letter}{row_num}",
"values": [[current_timestamp]]
}
])
all_sheet_updates.extend(batch_sheet_updates)
# Batch-Update senden
if all_sheet_updates:
logging.info(
f"Sende Sheet-Update für "
f"{len(all_sheet_updates)} Zellen "
f"(Batch {batch_start_row}-{batch_end_row})..."
)
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
if success:
logging.info("Sheet-Update erfolgreich.")
else:
logging.error("FEHLER beim Sheet-Update.")
all_sheet_updates = []
logging.debug("Warte nach Batch…")
time.sleep(Config.RETRY_DELAY)
# nächste Runde vorbereiten
tasks_for_processing_batch = []
# finaler Update-Push, falls noch Reste da sind
if all_sheet_updates:
logging.info(
f"Sende finalen Sheet-Update "
f"({len(all_sheet_updates)} Zellen)…"
)
self.sheet_handler.batch_update_cells(all_sheet_updates)
logging.info(
f"Website-Scraping Batch abgeschlossen. "
f"{processed_count} Tasks erstellt, "
f"{skipped_url_count} Zeilen ohne URL übersprungen."
)
# process_summarization_batch Methode # process_summarization_batch Methode
# Kopieren Sie die Logik aus Ihrer globalen process_website_summarization_batch Funktion hierher und passen Sie sie an self an. # Kopieren Sie die Logik aus Ihrer globalen process_website_summarization_batch Funktion hierher und passen Sie sie an self an.