This commit is contained in:
2025-04-24 14:39:50 +00:00
parent c36d2cf300
commit 42de5dee50

View File

@@ -2835,94 +2835,94 @@ class DataProcessor:
logging.info(f"Verarbeite Sheet-Zeilen {start_sheet_row} bis {end_sheet_row} für Website Scraping (Batch).")
# Worker-Funktion für Scraping (Kann global bleiben oder private statische Methode)
# Bleibt global, da sie keine self benötigt.
def scrape_raw_text_task(task_info):
row_num = task_info['row_num']; url = task_info['url']; raw_text = "k.A."; error = None
try: raw_text = get_website_raw(url) # Annahme: get_website_raw ist global mit Retry
except Exception as e: error = f"Scraping Fehler Zeile {row_num}: {e}"; logging.error(error) # Logge Fehler im Worker
return {"row_num": row_num, "raw_text": raw_text, "error": error}
# Worker-Funktion für Scraping (Kann global bleiben oder private statische Methode)
# Bleibt global, da sie keine self benötigt.
def scrape_raw_text_task(task_info):
row_num = task_info['row_num']; url = task_info['url']; raw_text = "k.A."; error = None
try: raw_text = get_website_raw(url) # Annahme: get_website_raw ist global mit Retry
except Exception as e: error = f"Scraping Fehler Zeile {row_num}: {e}"; logging.error(error) # Logge Fehler im Worker
return {"row_num": row_num, "raw_text": raw_text, "error": error}
tasks_for_processing_batch = []
all_sheet_updates = []
processed_count = 0 # Zählt Zeilen, für die Task erstellt wird
skipped_url_count = 0
tasks_for_processing_batch = []
all_sheet_updates = []
processed_count = 0 # Zählt Zeilen, für die Task erstellt wird
skipped_url_count = 0
processing_batch_size = Config.PROCESSING_BATCH_SIZE
max_scraping_workers = Config.MAX_SCRAPING_WORKERS
processing_batch_size = Config.PROCESSING_BATCH_SIZE
max_scraping_workers = Config.MAX_SCRAPING_WORKERS
for i in range(start_sheet_row, end_sheet_row + 1):
row_index_in_list = i - 1
row = all_data[row_index_in_list]
for i in range(start_sheet_row, end_sheet_row + 1):
row_index_in_list = i - 1
row = all_data[row_index_in_list]
# URL Prüfung (immer nötig, auch wenn AT fehlt)
website_url = row[website_col_idx] if len(row) > website_col_idx else ""
if not website_url or website_url.strip().lower() == "k.A.":
skipped_url_count += 1
continue
# URL Prüfung (immer nötig, auch wenn AT fehlt)
website_url = row[website_col_idx] if len(row) > website_col_idx else ""
if not website_url or website_url.strip().lower() == "k.A.":
skipped_url_count += 1
continue
# Kein AT Timestamp -> Task erstellen
tasks_for_processing_batch.append({"row_num": i, "url": website_url})
processed_count += 1
# Kein AT Timestamp -> Task erstellen
tasks_for_processing_batch.append({"row_num": i, "url": website_url})
processed_count += 1
# Verarbeitungs-Batch ausführen
if len(tasks_for_processing_batch) >= processing_batch_size or i == end_sheet_row:
if tasks_for_processing_batch:
batch_start_row = tasks_for_processing_batch[0]['row_num']
batch_end_row = tasks_for_processing_batch[-1]['row_num']
batch_task_count = len(tasks_for_processing_batch)
logging.info(f"\n--- Starte Scraping-Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---")
# Verarbeitungs-Batch ausführen
if len(tasks_for_processing_batch) >= processing_batch_size or i == end_sheet_row:
if tasks_for_processing_batch:
batch_start_row = tasks_for_processing_batch[0]['row_num']
batch_end_row = tasks_for_processing_batch[-1]['row_num']
batch_task_count = len(tasks_for_processing_batch)
logging.info(f"\n--- Starte Scraping-Batch ({batch_task_count} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---")
scraping_results = {} # {'row_num': raw_text}
batch_error_count = 0
logging.info(f" Scrape {batch_task_count} Websites parallel (max {max_scraping_workers} worker)...")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
future_to_task = {executor.submit(scrape_raw_text_task, task): task for task in tasks_for_processing_batch}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result()
scraping_results[result['row_num']] = result['raw_text']
if result['error']: batch_error_count += 1
except Exception as exc:
row_num = task['row_num']
err_msg = f"Generischer Fehler Scraping Task Zeile {row_num}: {exc}"
logging.error(err_msg)
scraping_results[row_num] = "k.A. (Fehler)"
batch_error_count += 1
scraping_results = {} # {'row_num': raw_text}
batch_error_count = 0
logging.info(f" Scrape {batch_task_count} Websites parallel (max {max_scraping_workers} worker)...")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
future_to_task = {executor.submit(scrape_raw_text_task, task): task for task in tasks_for_processing_batch}
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result()
scraping_results[result['row_num']] = result['raw_text']
if result['error']: batch_error_count += 1
except Exception as exc:
row_num = task['row_num']
err_msg = f"Generischer Fehler Scraping Task Zeile {row_num}: {exc}"
logging.error(err_msg)
scraping_results[row_num] = "k.A. (Fehler)"
batch_error_count += 1
logging.info(f" Scraping für Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).")
logging.info(f" Scraping für Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).")
# Sheet Updates vorbereiten (AR und AT)
if scraping_results:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
batch_sheet_updates = []
for row_num, raw_text_res in scraping_results.items():
batch_sheet_updates.extend([
{'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]},
{'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]} # Setze AT Timestamp
])
all_sheet_updates.extend(batch_sheet_updates)
# Sheet Updates vorbereiten (AR und AT)
if scraping_results:
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
batch_sheet_updates = []
for row_num, raw_text_res in scraping_results.items():
batch_sheet_updates.extend([
{'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]},
{'range': f'{ts_col_letter}{row_num}', 'values': [[current_timestamp]]} # Setze AT Timestamp
])
all_sheet_updates.extend(batch_sheet_updates)
# Sheet Updates senden für diesen Batch
if all_sheet_updates:
logging.info(f" Sende Sheet-Update für {len(all_sheet_updates)} Zellen für Batch {batch_start_row}-{batch_end_row}...")
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
if success: logging.info(f" Sheet-Update erfolgreich.")
else: logging.error(f" FEHLER beim Sheet-Update.")
all_sheet_updates = [] # Zurücksetzen nach Senden
# Sheet Updates senden für diesen Batch
if all_sheet_updates:
logging.info(f" Sende Sheet-Update für {len(all_sheet_updates)} Zellen für Batch {batch_start_row}-{batch_end_row}...")
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
if success: logging.info(f" Sheet-Update erfolgreich.")
else: logging.error(f" FEHLER beim Sheet-Update.")
all_sheet_updates = [] # Zurücksetzen nach Senden
# Pause nach jedem Batch
logging.debug(" Warte nach Batch...")
time.sleep(Config.RETRY_DELAY)
# Pause nach jedem Batch
logging.debug(" Warte nach Batch...")
time.sleep(Config.RETRY_DELAY)
# Finaler Sheet Update Batch senden (falls Reste übrig)
if all_sheet_updates:
logging.info(f"Sende finalen Sheet-Update ({len(all_sheet_updates)} Zellen)...")
self.sheet_handler.batch_update_cells(all_sheet_updates)
# Finaler Sheet Update Batch senden (falls Reste übrig)
if all_sheet_updates:
logging.info(f"Sende finalen Sheet-Update ({len(all_sheet_updates)} Zellen)...")
self.sheet_handler.batch_update_cells(all_sheet_updates)
logging.info(f"Website-Scraping Batch abgeschlossen. {processed_count} Tasks erstellt, {skipped_url_count} Zeilen ohne URL übersprungen.")
logging.info(f"Website-Scraping Batch abgeschlossen. {processed_count} Tasks erstellt, {skipped_url_count} Zeilen ohne URL übersprungen.")
# process_summarization_batch Methode
@@ -3270,7 +3270,7 @@ class DataProcessor:
except KeyError as e: logging.critical(f"FEHLER: Benötigte Spalte '{e}' fehlt."); return
except Exception as e: logging.critical(f"FEHLER beim Holen der Spaltenbuchstaben: {e}"); return
for i, row in enumerate(data_rows): # <= for-Schleife beginnt hier
for i, row in enumerate(data_rows): # <= for-Schleife beginnt hier
row_num_in_sheet = i + header_rows + 1
if limit is not None and rows_processed_count >= limit:
@@ -3307,7 +3307,7 @@ for i, row in enumerate(data_rows): # <= for-Schleife beginnt hier
# <= Hier endet die for-Schleife. Die folgenden Blöcke müssen auf dieser Ebene (derselben wie for) eingerückt sein.
if updates: # <= DIESER BLOCK muss auf derselben Ebene wie die for-Schleife beginnen
if updates: # <= DIESER BLOCK muss auf derselben Ebene wie die for-Schleife beginnen
logging.info(f"Sende Batch-Update für {len(updates)} Zellen ({rows_processed_count} Zeilen geprüft)...")
success = self.sheet_handler.batch_update_cells(updates)
if success:
@@ -3315,10 +3315,10 @@ if updates: # <= DIESER BLOCK muss auf derselben Ebene wie die for-Schleife begi
else: # <= Dieses else gehört zum if success:
logging.error(f"FEHLER beim Batch-Update.")
else: # <= DIESER BLOCK gehört zum if updates:
else: # <= DIESER BLOCK gehört zum if updates:
logging.info("Keine fehlenden Websites gefunden oder keine Updates nötig.")
logging.info(f"Modus 'website_lookup' abgeschlossen. {rows_processed_count} Zeilen geprüft.") # <= Diese Zeile gehört zur Methode, auf derselben Ebene wie das if updates:
logging.info(f"Modus 'website_lookup' abgeschlossen. {rows_processed_count} Zeilen geprüft.") # <= Diese Zeile gehört zur Methode, auf derselben Ebene wie das if updates:
# process_find_wiki_serp Methode