From 1fde11db9afcfcd903cd77ad9cfc10a4220cd1a0 Mon Sep 17 00:00:00 2001 From: Floke Date: Thu, 24 Apr 2025 18:00:24 +0000 Subject: [PATCH] bugfix --- brancheneinstufung.py | 252 +++++++++++++++++++++++++++++++++--------- 1 file changed, 200 insertions(+), 52 deletions(-) diff --git a/brancheneinstufung.py b/brancheneinstufung.py index 867fd153..cf31767d 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -4073,67 +4073,215 @@ class DataProcessor: f"{processed_count} Zeilen in Batches verarbeitet." ) - # process_website_batch Methode def process_website_batch(self, limit=None): - """ - Batch-Prozess NUR für Website-Scraping (Rohtext AR, Timestamp AT). - Findet Startzeile ab erster Zelle mit leerem AT. - """ - logging.info(f"Starte Website-Scraping Batch. Limit: {limit if limit is not None else 'Unbegrenzt'}") - if not self.sheet_handler.load_data(): return logging.error("FEHLER beim Laden der Daten.") - all_data = self.sheet_handler.get_all_data_with_headers(); header_rows = 5 - if not all_data or len(all_data) <= header_rows: return logging.warning("Keine Daten gefunden.") + """ + Batch-Prozess NUR für Website-Scraping (Rohtext AR, Timestamp AT). + Findet Startzeile ab erster Zelle mit leerem AT. + """ + logging.info( + f"Starte Website-Scraping Batch. " + f"Limit: {limit if limit is not None else 'Unbegrenzt'}" + ) + if not self.sheet_handler.load_data(): + return logging.error("FEHLER beim Laden der Daten.") - rohtext_col_key = "Website Rohtext"; rohtext_col_index = COLUMN_MAP.get(rohtext_col_key) - website_col_idx = COLUMN_MAP.get("CRM Website"); version_col_idx = COLUMN_MAP.get("Version") - timestamp_col_key = "Website Scrape Timestamp"; timestamp_col_index = COLUMN_MAP.get(timestamp_col_key) - if None in [rohtext_col_index, website_col_idx, version_col_idx, timestamp_col_index]: return logging.critical(f"FEHLER: Benötigte Indizes fehlen."); - rohtext_col_letter = self.sheet_handler._get_col_letter(rohtext_col_index + 1); version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1); ts_col_letter = self.sheet_handler._get_col_letter(timestamp_col_index + 1) + all_data = self.sheet_handler.get_all_data_with_headers() + header_rows = 5 + if not all_data or len(all_data) <= header_rows: + return logging.warning("Keine Daten gefunden.") - start_data_index = self.sheet_handler.get_start_row_index(check_column_key=timestamp_col_key, min_sheet_row=header_rows + 1); if start_data_index == -1: return logging.error(f"FEHLER bei Startzeilensuche."); if start_data_index >= len(self.sheet_handler.get_data()): logging.info("Alle Zeilen mit Timestamp gefüllt. Nichts zu tun."); return + # Spalten-Indizes holen + rohtext_col_key = "Website Rohtext" + rohtext_col_index = COLUMN_MAP.get(rohtext_col_key) + website_col_idx = COLUMN_MAP.get("CRM Website") + version_col_idx = COLUMN_MAP.get("Version") + timestamp_col_key = "Website Scrape Timestamp" + timestamp_col_index = COLUMN_MAP.get(timestamp_col_key) - start_sheet_row = start_data_index + header_rows + 1; total_sheet_rows = len(all_data); end_sheet_row = total_sheet_rows - if limit is not None and limit >= 0: end_sheet_row = min(start_sheet_row + limit - 1, total_sheet_rows); if limit == 0: logging.info("Limit 0."); return - if start_sheet_row > end_sheet_row: logging.warning("Start nach Ende (Limit)."); return - logging.info(f"Verarbeite Sheet-Zeilen {start_sheet_row} bis {end_sheet_row} für Website Scraping (Batch).") + # Prüfen, dass alle Indizes da sind + if None in [rohtext_col_index, website_col_idx, version_col_idx, timestamp_col_index]: + return logging.critical("FEHLER: Benötigte Indizes fehlen.") - # Worker-Funktion für Scraping (Globale Helferfunktion) - def scrape_raw_text_task(task_info): # Needs access to get_website_raw (global) - row_num = task_info['row_num']; url = task_info['url']; raw_text = "k.A."; error = None - try: raw_text = get_website_raw(url); # Annahme: get_website_raw ist global mit Retry - except Exception as e: error = f"Scraping Fehler Zeile {row_num}: {e}"; logging.error(error); - return {"row_num": row_num, "raw_text": raw_text, "error": error} + # Spaltenbuchstaben ermitteln + rohtext_col_letter = self.sheet_handler._get_col_letter(rohtext_col_index + 1) + version_col_letter = self.sheet_handler._get_col_letter(version_col_idx + 1) + ts_col_letter = self.sheet_handler._get_col_letter(timestamp_col_index + 1) - tasks_for_processing_batch = []; all_sheet_updates = []; processed_count = 0; skipped_url_count = 0 - processing_batch_size = Config.PROCESSING_BATCH_SIZE; max_scraping_workers = Config.MAX_SCRAPING_WORKERS; + # Erste zu bearbeitende Zeile finden (erstes leeres Timestamp-Feld) + start_data_index = self.sheet_handler.get_start_row_index( + check_column_key=timestamp_col_key, + min_sheet_row=header_rows + 1 + ) + if start_data_index == -1: + return logging.error("FEHLER bei Startzeilensuche.") + if start_data_index >= len(self.sheet_handler.get_data()): + logging.info("Alle Zeilen mit Timestamp gefüllt. Nichts zu tun.") + return - for i in range(start_sheet_row, end_sheet_row + 1): - row_index_in_list = i - 1; row = all_data[row_index_in_list] - website_url = row[website_col_idx] if len(row) > website_col_idx else ""; if not website_url or website_url.strip().lower() == "k.A.": skipped_url_count += 1; continue - tasks_for_processing_batch.append({"row_num": i, "url": website_url}); processed_count += 1 + start_sheet_row = start_data_index + header_rows + 1 + total_sheet_rows = len(all_data) + end_sheet_row = total_sheet_rows - if len(tasks_for_processing_batch) >= processing_batch_size or i == end_sheet_row: - if tasks_for_processing_batch: - batch_start_row = tasks_for_processing_batch[0]['row_num']; batch_end_row = tasks_for_processing_batch[-1]['row_num']; batch_task_count = len(tasks_for_processing_batch) - logging.info(f"\n--- Starte Scraping-Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") - scraping_results = {}; batch_error_count = 0; logging.info(f" Scrape {batch_task_count} Websites parallel (max {max_scraping_workers} worker)...") - with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor: - future_to_task = {executor.submit(scrape_raw_text_task, task): task for task in tasks_for_processing_batch} - for future in concurrent.futures.as_completed(future_to_task): - task = future_to_task[future]; try: result = future.result(); scraping_results[result['row_num']] = result['raw_text']; if result['error']: batch_error_count += 1; - except Exception as exc: row_num = task['row_num']; err_msg = f"Generischer Fehler Scraping Task Zeile {row_num}: {exc}"; logging.error(err_msg); scraping_results[row_num] = "k.A. (Fehler)"; batch_error_count += 1; - logging.info(f" Scraping für Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") - if scraping_results: - current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S"); batch_sheet_updates = []; - for row_num, raw_text_res in scraping_results.items(): - batch_sheet_updates.extend([ {'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]}, {'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]} ]) - all_sheet_updates.extend(batch_sheet_updates); - if all_sheet_updates: logging.info(f" Sende Sheet-Update für {len(all_sheet_updates)} Zellen für Batch {batch_start_row}-{batch_end_row}..."); success = self.sheet_handler.batch_update_cells(all_sheet_updates); if success: logging.info(f" Sheet-Update erfolgreich."); else: logging.error(f" FEHLER beim Sheet-Update."); all_sheet_updates = []; - logging.debug(" Warte nach Batch..."); time.sleep(Config.RETRY_DELAY); - tasks_for_processing_batch = []; + # Limit auswerten + if limit is not None and limit >= 0: + end_sheet_row = min(start_sheet_row + limit - 1, total_sheet_rows) + if limit == 0: + logging.info("Limit 0.") + return + if start_sheet_row > end_sheet_row: + logging.warning("Start nach Ende (Limit).") + return - if all_sheet_updates: logging.info(f"Sende finalen Sheet-Update ({len(all_sheet_updates)} Zellen)..."); self.sheet_handler.batch_update_cells(all_sheet_updates); - logging.info(f"Website-Scraping Batch abgeschlossen. {processed_count} Tasks erstellt, {skipped_url_count} Zeilen ohne URL übersprungen.") + logging.info( + f"Verarbeite Sheet-Zeilen {start_sheet_row} bis {end_sheet_row} " + "für Website-Scraping (Batch)." + ) + + # Helfer-Funktion fürs parallele Scraping + def scrape_raw_text_task(task_info): + row_num = task_info["row_num"] + url = task_info["url"] + raw_text = "k.A." + error = None + try: + raw_text = get_website_raw(url) + except Exception as e: + error = f"Scraping Fehler Zeile {row_num}: {e}" + logging.error(error) + return { + "row_num": row_num, + "raw_text": raw_text, + "error": error + } + + tasks_for_processing_batch = [] + all_sheet_updates = [] + processed_count = 0 + skipped_url_count = 0 + processing_batch_size = Config.PROCESSING_BATCH_SIZE + max_scraping_workers = Config.MAX_SCRAPING_WORKERS + + # Durch alle Zeilen iterieren + for i in range(start_sheet_row, end_sheet_row + 1): + row_index_in_list = i - 1 + row = all_data[row_index_in_list] + + # URL auslesen und überspringen, falls k.A. + website_url = "" + if len(row) > website_col_idx: + website_url = row[website_col_idx] + + if not website_url or website_url.strip().lower() == "k.a.": + skipped_url_count += 1 + continue + + # Job anlegen + tasks_for_processing_batch.append({ + "row_num": i, + "url": website_url + }) + processed_count += 1 + + # Batch abarbeiten, wenn voll oder am Ende + if (len(tasks_for_processing_batch) >= processing_batch_size + or i == end_sheet_row): + + if tasks_for_processing_batch: + batch_start_row = tasks_for_processing_batch[0]["row_num"] + batch_end_row = tasks_for_processing_batch[-1]["row_num"] + batch_task_count = len(tasks_for_processing_batch) + + logging.info( + f"--- Starte Scraping-Batch ({batch_task_count} Tasks, " + f"Zeilen {batch_start_row}-{batch_end_row}) ---" + ) + logging.info( + f"Scrape {batch_task_count} Websites parallel " + f"(max {max_scraping_workers} workers)..." + ) + + scraping_results = {} + batch_error_count = 0 + + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_scraping_workers + ) as executor: + future_to_task = { + executor.submit(scrape_raw_text_task, task): task + for task in tasks_for_processing_batch + } + for future in concurrent.futures.as_completed(future_to_task): + task = future_to_task[future] + try: + result = future.result() + scraping_results[result["row_num"]] = result["raw_text"] + if result["error"]: + batch_error_count += 1 + except Exception as exc: + row_num = task["row_num"] + err_msg = ( + f"Generischer Fehler Scraping Task Zeile {row_num}: {exc}" + ) + logging.error(err_msg) + scraping_results[row_num] = "k.A. (Fehler)" + batch_error_count += 1 + + logging.info( + f"Scraping für Batch beendet: " + f"{len(scraping_results)} Ergebnisse, " + f"{batch_error_count} Fehler." + ) + + # Updates vorbereiten + if scraping_results: + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + batch_sheet_updates = [] + for row_num, raw_text_res in scraping_results.items(): + batch_sheet_updates.extend([ + { + "range": f"{rohtext_col_letter}{row_num}", + "values": [[raw_text_res]] + }, + { + "range": f"{ts_col_letter}{row_num}", + "values": [[current_timestamp]] + } + ]) + all_sheet_updates.extend(batch_sheet_updates) + + # Batch-Update senden + if all_sheet_updates: + logging.info( + f"Sende Sheet-Update für " + f"{len(all_sheet_updates)} Zellen " + f"(Batch {batch_start_row}-{batch_end_row})..." + ) + success = self.sheet_handler.batch_update_cells(all_sheet_updates) + if success: + logging.info("Sheet-Update erfolgreich.") + else: + logging.error("FEHLER beim Sheet-Update.") + all_sheet_updates = [] + + logging.debug("Warte nach Batch…") + time.sleep(Config.RETRY_DELAY) + + # nächste Runde vorbereiten + tasks_for_processing_batch = [] + + # finaler Update-Push, falls noch Reste da sind + if all_sheet_updates: + logging.info( + f"Sende finalen Sheet-Update " + f"({len(all_sheet_updates)} Zellen)…" + ) + self.sheet_handler.batch_update_cells(all_sheet_updates) + + logging.info( + f"Website-Scraping Batch abgeschlossen. " + f"{processed_count} Tasks erstellt, " + f"{skipped_url_count} Zeilen ohne URL übersprungen." + ) # process_summarization_batch Methode # Kopieren Sie die Logik aus Ihrer globalen process_website_summarization_batch Funktion hierher und passen Sie sie an self an.