data_processor.py aktualisiert
This commit is contained in:
@@ -2354,90 +2354,69 @@ class DataProcessor:
|
|||||||
# Die Pause kommt erst nach dem Batch-Update (oder am Ende des Modus).
|
# Die Pause kommt erst nach dem Batch-Update (oder am Ende des Modus).
|
||||||
# time.sleep(0.1) # Optionale kurze Pause
|
# time.sleep(0.1) # Optionale kurze Pause
|
||||||
|
|
||||||
# --- Verarbeitung des letzten unvollstaendigen Scraping-Batches nach der Schleife ---
|
# --- Verarbeitung des letzten unvollstaendigen Scraping-Batches nach der Schleife ---
|
||||||
# Fuehre den letzten Batch aus, wenn nach der Hauptschleife noch Tasks
|
|
||||||
# in der Batch-Liste sind.
|
|
||||||
if tasks_for_processing_batch:
|
if tasks_for_processing_batch:
|
||||||
# Logge den Start des finalen Batches
|
|
||||||
batch_start_row = tasks_for_processing_batch[0]['row_num']
|
batch_start_row = tasks_for_processing_batch[0]['row_num']
|
||||||
batch_end_row = tasks_for_processing_batch[-1]['row_num']
|
batch_end_row = tasks_for_processing_batch[-1]['row_num']
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
f"\n--- Starte FINALEN Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
|
f"\n--- Starte FINALEN Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---")
|
||||||
|
|
||||||
scraping_results = {} # Dictionary fuer die Ergebnisse
|
scraping_results = {}
|
||||||
batch_error_count = 0 # Fehlerzaehler
|
batch_error_count = 0
|
||||||
|
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
f" Scrape {len(tasks_for_processing_batch)} Websites parallel (max {max_scraping_workers} worker)...") # <<< GEÄNDERT
|
f" Scrape {len(tasks_for_processing_batch)} Websites parallel (max {max_scraping_workers} worker)...")
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
|
||||||
# Map tasks to futures. Ruft die INTERNE Worker-Funktion auf.
|
|
||||||
future_to_task = {
|
future_to_task = {
|
||||||
executor.submit(
|
executor.submit(
|
||||||
self._scrape_raw_text_task,
|
self._scrape_raw_text_task,
|
||||||
task,
|
task,
|
||||||
get_website_raw): task for task in tasks_for_processing_batch} # <<< Korrigiert: interne Methode
|
get_website_raw): task for task in tasks_for_processing_batch}
|
||||||
|
|
||||||
# Verarbeite die Ergebnisse
|
|
||||||
for future in concurrent.futures.as_completed(future_to_task):
|
for future in concurrent.futures.as_completed(future_to_task):
|
||||||
# Holen Sie die urspruenglichen Task-Daten
|
|
||||||
task = future_to_task[future]
|
task = future_to_task[future]
|
||||||
try:
|
try:
|
||||||
result = future.result() # Holen Sie das Ergebnis
|
result = future.result()
|
||||||
scraping_results[result['row_num']
|
# HINWEIS: Hier speichern wir das ganze dict, nicht nur den Text
|
||||||
] = result['raw_text']
|
scraping_results[result['row_num']] = result
|
||||||
# Pruefe, ob der Worker einen Fehler gemeldet hat
|
|
||||||
if result.get('error'):
|
if result.get('error'):
|
||||||
batch_error_count += 1
|
batch_error_count += 1
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
# Faengt unerwartete Fehler bei der Ergebnisabfrage ab
|
|
||||||
row_num = task['row_num']
|
row_num = task['row_num']
|
||||||
# Gekuerzt loggen
|
|
||||||
err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage Scraping Task Zeile {row_num} ({task['url'][:100]}): {type(exc).__name__} - {exc}"
|
err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage Scraping Task Zeile {row_num} ({task['url'][:100]}): {type(exc).__name__} - {exc}"
|
||||||
self.logger.error(err_msg) # <<< GEÄNDERT
|
self.logger.error(err_msg)
|
||||||
# Setze einen Standard-Fehlerwert
|
scraping_results[row_num] = {"raw_text": "k.A. (Unerwarteter Fehler Task)", "meta_details": "k.A.", "error": True}
|
||||||
scraping_results[row_num] = "k.A. (Unerwarteter Fehler Task)"
|
|
||||||
batch_error_count += 1
|
batch_error_count += 1
|
||||||
|
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
f" FINALER Scraping Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler).") # <<< GEÄNDERT
|
f" FINALER Scraping Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler).")
|
||||||
|
|
||||||
# Sammle Sheet Updates (AR, AT, AP) fuer diesen finalen Batch.
|
|
||||||
if scraping_results:
|
if scraping_results:
|
||||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
current_version = getattr(
|
current_version = getattr(Config, 'VERSION', 'unknown')
|
||||||
Config, 'VERSION', 'unknown') # Block 1 Config Attribut
|
batch_sheet_updates = []
|
||||||
batch_sheet_updates = [] # Updates fuer diesen spezifischen Batch
|
|
||||||
# Iteriere ueber die Zeilennummern im Batch, fuer die
|
# ANPASSUNG AN NEUE LOGIK
|
||||||
# Ergebnisse vorliegen.
|
for row_num, result_dict in scraping_results.items():
|
||||||
for row_num, result_dict in scraping_results.items():
|
batch_sheet_updates.extend([
|
||||||
# Füge Updates für Rohtext, Meta-Details, Timestamp und Version hinzu
|
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Rohtext"] + 1)}{row_num}', 'values': [[result_dict.get('raw_text', 'k.A.')]]},
|
||||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Rohtext"] + 1)}{row_num}', 'values': [[result_dict['raw_text']]]})
|
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Meta-Details"] + 1)}{row_num}', 'values': [[result_dict.get('meta_details', 'k.A.')]]},
|
||||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Meta-Details"] + 1)}{row_num}', 'values': [[result_dict['meta_details']]]})
|
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Scrape Timestamp"] + 1)}{row_num}', 'values': [[current_timestamp]]},
|
||||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Scrape Timestamp"] + 1)}{row_num}', 'values': [[current_timestamp]]})
|
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Version"] + 1)}{row_num}', 'values': [[current_version]]}
|
||||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(col_indices["Version"] + 1)}{row_num}', 'values': [[current_version]]}) # Block 1 Column Map
|
])
|
||||||
# Fuege diese Updates zur globalen Liste hinzu (wird dann nur
|
|
||||||
# noch einmal gesendet)
|
|
||||||
all_sheet_updates.extend(batch_sheet_updates)
|
all_sheet_updates.extend(batch_sheet_updates)
|
||||||
|
|
||||||
# --- Finale Sheet Updates senden ---
|
# --- Finale Sheet Updates senden ---
|
||||||
# Sende alle verbleibenden gesammelten Updates in einem letzten
|
|
||||||
# Batch-Update.
|
|
||||||
if all_sheet_updates:
|
if all_sheet_updates:
|
||||||
rows_in_final_update_batch = len(
|
rows_in_final_update_batch = len(all_sheet_updates) // 4 # 4 Updates pro Zeile
|
||||||
all_sheet_updates) // 3 # Updates pro Zeile ist 3
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
f"Sende FINALE gesammelte Sheet-Updates ({rows_in_final_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT
|
f"Sende FINALE gesammelte Sheet-Updates ({rows_in_final_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...")
|
||||||
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block
|
|
||||||
# 14) mit Retry.
|
|
||||||
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
|
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||||
if success:
|
if success:
|
||||||
# <<< GEÄNDERT
|
self.logger.info("FINALES Sheet-Update erfolgreich.")
|
||||||
self.logger.info(f"FINALES Sheet-Update erfolgreich.")
|
|
||||||
# Der Fehlerfall wird von batch_update_cells geloggt
|
|
||||||
|
|
||||||
# Logge den Abschluss des Modus
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen.") # <<< GEÄNDERT
|
f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.")
|
||||||
|
|
||||||
def process_summarization_batch(
|
def process_summarization_batch(
|
||||||
self,
|
self,
|
||||||
|
|||||||
Reference in New Issue
Block a user