diff --git a/brancheneinstufung.py b/brancheneinstufung.py index 516b7406..5cf854b7 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -2835,94 +2835,94 @@ class DataProcessor: logging.info(f"Verarbeite Sheet-Zeilen {start_sheet_row} bis {end_sheet_row} für Website Scraping (Batch).") - # Worker-Funktion für Scraping (Kann global bleiben oder private statische Methode) - # Bleibt global, da sie keine self benötigt. - def scrape_raw_text_task(task_info): - row_num = task_info['row_num']; url = task_info['url']; raw_text = "k.A."; error = None - try: raw_text = get_website_raw(url) # Annahme: get_website_raw ist global mit Retry - except Exception as e: error = f"Scraping Fehler Zeile {row_num}: {e}"; logging.error(error) # Logge Fehler im Worker - return {"row_num": row_num, "raw_text": raw_text, "error": error} + # Worker-Funktion für Scraping (Kann global bleiben oder private statische Methode) + # Bleibt global, da sie keine self benötigt. + def scrape_raw_text_task(task_info): + row_num = task_info['row_num']; url = task_info['url']; raw_text = "k.A."; error = None + try: raw_text = get_website_raw(url) # Annahme: get_website_raw ist global mit Retry + except Exception as e: error = f"Scraping Fehler Zeile {row_num}: {e}"; logging.error(error) # Logge Fehler im Worker + return {"row_num": row_num, "raw_text": raw_text, "error": error} - tasks_for_processing_batch = [] - all_sheet_updates = [] - processed_count = 0 # Zählt Zeilen, für die Task erstellt wird - skipped_url_count = 0 + tasks_for_processing_batch = [] + all_sheet_updates = [] + processed_count = 0 # Zählt Zeilen, für die Task erstellt wird + skipped_url_count = 0 - processing_batch_size = Config.PROCESSING_BATCH_SIZE - max_scraping_workers = Config.MAX_SCRAPING_WORKERS + processing_batch_size = Config.PROCESSING_BATCH_SIZE + max_scraping_workers = Config.MAX_SCRAPING_WORKERS - for i in range(start_sheet_row, end_sheet_row + 1): - row_index_in_list = i - 1 - row = all_data[row_index_in_list] + for i in range(start_sheet_row, end_sheet_row + 1): + row_index_in_list = i - 1 + row = all_data[row_index_in_list] - # URL Prüfung (immer nötig, auch wenn AT fehlt) - website_url = row[website_col_idx] if len(row) > website_col_idx else "" - if not website_url or website_url.strip().lower() == "k.A.": - skipped_url_count += 1 - continue + # URL Prüfung (immer nötig, auch wenn AT fehlt) + website_url = row[website_col_idx] if len(row) > website_col_idx else "" + if not website_url or website_url.strip().lower() == "k.A.": + skipped_url_count += 1 + continue - # Kein AT Timestamp -> Task erstellen - tasks_for_processing_batch.append({"row_num": i, "url": website_url}) - processed_count += 1 + # Kein AT Timestamp -> Task erstellen + tasks_for_processing_batch.append({"row_num": i, "url": website_url}) + processed_count += 1 - # Verarbeitungs-Batch ausführen - if len(tasks_for_processing_batch) >= processing_batch_size or i == end_sheet_row: - if tasks_for_processing_batch: - batch_start_row = tasks_for_processing_batch[0]['row_num'] - batch_end_row = tasks_for_processing_batch[-1]['row_num'] - batch_task_count = len(tasks_for_processing_batch) - logging.info(f"\n--- Starte Scraping-Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") + # Verarbeitungs-Batch ausführen + if len(tasks_for_processing_batch) >= processing_batch_size or i == end_sheet_row: + if tasks_for_processing_batch: + batch_start_row = tasks_for_processing_batch[0]['row_num'] + batch_end_row = tasks_for_processing_batch[-1]['row_num'] + batch_task_count = len(tasks_for_processing_batch) + logging.info(f"\n--- Starte Scraping-Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") - scraping_results = {} # {'row_num': raw_text} - batch_error_count = 0 - logging.info(f" Scrape {batch_task_count} Websites parallel (max {max_scraping_workers} worker)...") - with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor: - future_to_task = {executor.submit(scrape_raw_text_task, task): task for task in tasks_for_processing_batch} - for future in concurrent.futures.as_completed(future_to_task): - task = future_to_task[future] - try: - result = future.result() - scraping_results[result['row_num']] = result['raw_text'] - if result['error']: batch_error_count += 1 - except Exception as exc: - row_num = task['row_num'] - err_msg = f"Generischer Fehler Scraping Task Zeile {row_num}: {exc}" - logging.error(err_msg) - scraping_results[row_num] = "k.A. (Fehler)" - batch_error_count += 1 + scraping_results = {} # {'row_num': raw_text} + batch_error_count = 0 + logging.info(f" Scrape {batch_task_count} Websites parallel (max {max_scraping_workers} worker)...") + with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor: + future_to_task = {executor.submit(scrape_raw_text_task, task): task for task in tasks_for_processing_batch} + for future in concurrent.futures.as_completed(future_to_task): + task = future_to_task[future] + try: + result = future.result() + scraping_results[result['row_num']] = result['raw_text'] + if result['error']: batch_error_count += 1 + except Exception as exc: + row_num = task['row_num'] + err_msg = f"Generischer Fehler Scraping Task Zeile {row_num}: {exc}" + logging.error(err_msg) + scraping_results[row_num] = "k.A. (Fehler)" + batch_error_count += 1 - logging.info(f" Scraping für Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") + logging.info(f" Scraping für Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") - # Sheet Updates vorbereiten (AR und AT) - if scraping_results: - current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - batch_sheet_updates = [] - for row_num, raw_text_res in scraping_results.items(): - batch_sheet_updates.extend([ - {'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]}, - {'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]} # Setze AT Timestamp - ]) - all_sheet_updates.extend(batch_sheet_updates) + # Sheet Updates vorbereiten (AR und AT) + if scraping_results: + current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + batch_sheet_updates = [] + for row_num, raw_text_res in scraping_results.items(): + batch_sheet_updates.extend([ + {'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]}, + {'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]} # Setze AT Timestamp + ]) + all_sheet_updates.extend(batch_sheet_updates) - # Sheet Updates senden für diesen Batch - if all_sheet_updates: - logging.info(f" Sende Sheet-Update für {len(all_sheet_updates)} Zellen für Batch {batch_start_row}-{batch_end_row}...") - success = self.sheet_handler.batch_update_cells(all_sheet_updates) - if success: logging.info(f" Sheet-Update erfolgreich.") - else: logging.error(f" FEHLER beim Sheet-Update.") - all_sheet_updates = [] # Zurücksetzen nach Senden + # Sheet Updates senden für diesen Batch + if all_sheet_updates: + logging.info(f" Sende Sheet-Update für {len(all_sheet_updates)} Zellen für Batch {batch_start_row}-{batch_end_row}...") + success = self.sheet_handler.batch_update_cells(all_sheet_updates) + if success: logging.info(f" Sheet-Update erfolgreich.") + else: logging.error(f" FEHLER beim Sheet-Update.") + all_sheet_updates = [] # Zurücksetzen nach Senden - # Pause nach jedem Batch - logging.debug(" Warte nach Batch...") - time.sleep(Config.RETRY_DELAY) + # Pause nach jedem Batch + logging.debug(" Warte nach Batch...") + time.sleep(Config.RETRY_DELAY) - # Finaler Sheet Update Batch senden (falls Reste übrig) - if all_sheet_updates: - logging.info(f"Sende finalen Sheet-Update ({len(all_sheet_updates)} Zellen)...") - self.sheet_handler.batch_update_cells(all_sheet_updates) + # Finaler Sheet Update Batch senden (falls Reste übrig) + if all_sheet_updates: + logging.info(f"Sende finalen Sheet-Update ({len(all_sheet_updates)} Zellen)...") + self.sheet_handler.batch_update_cells(all_sheet_updates) - logging.info(f"Website-Scraping Batch abgeschlossen. {processed_count} Tasks erstellt, {skipped_url_count} Zeilen ohne URL übersprungen.") + logging.info(f"Website-Scraping Batch abgeschlossen. {processed_count} Tasks erstellt, {skipped_url_count} Zeilen ohne URL übersprungen.") # process_summarization_batch Methode @@ -3270,7 +3270,7 @@ class DataProcessor: except KeyError as e: logging.critical(f"FEHLER: Benötigte Spalte '{e}' fehlt."); return except Exception as e: logging.critical(f"FEHLER beim Holen der Spaltenbuchstaben: {e}"); return -for i, row in enumerate(data_rows): # <= for-Schleife beginnt hier + for i, row in enumerate(data_rows): # <= for-Schleife beginnt hier row_num_in_sheet = i + header_rows + 1 if limit is not None and rows_processed_count >= limit: @@ -3307,7 +3307,7 @@ for i, row in enumerate(data_rows): # <= for-Schleife beginnt hier # <= Hier endet die for-Schleife. Die folgenden Blöcke müssen auf dieser Ebene (derselben wie for) eingerückt sein. -if updates: # <= DIESER BLOCK muss auf derselben Ebene wie die for-Schleife beginnen + if updates: # <= DIESER BLOCK muss auf derselben Ebene wie die for-Schleife beginnen logging.info(f"Sende Batch-Update für {len(updates)} Zellen ({rows_processed_count} Zeilen geprüft)...") success = self.sheet_handler.batch_update_cells(updates) if success: @@ -3315,10 +3315,10 @@ if updates: # <= DIESER BLOCK muss auf derselben Ebene wie die for-Schleife begi else: # <= Dieses else gehört zum if success: logging.error(f"FEHLER beim Batch-Update.") -else: # <= DIESER BLOCK gehört zum if updates: + else: # <= DIESER BLOCK gehört zum if updates: logging.info("Keine fehlenden Websites gefunden oder keine Updates nötig.") -logging.info(f"Modus 'website_lookup' abgeschlossen. {rows_processed_count} Zeilen geprüft.") # <= Diese Zeile gehört zur Methode, auf derselben Ebene wie das if updates: + logging.info(f"Modus 'website_lookup' abgeschlossen. {rows_processed_count} Zeilen geprüft.") # <= Diese Zeile gehört zur Methode, auf derselben Ebene wie das if updates: # process_find_wiki_serp Methode