data_processor.py aktualisiert
This commit is contained in:
@@ -2069,27 +2069,28 @@ class DataProcessor:
|
|||||||
'meta_details': meta_details_result
|
'meta_details': meta_details_result
|
||||||
}
|
}
|
||||||
|
|
||||||
def _scrape_raw_text_task(self, task_info, scraper_function):
|
def _scrape_raw_text_task(self, task_info):
|
||||||
"""
|
"""
|
||||||
Worker-Funktion für Threading. Gibt IMMER ein Dictionary zurück.
|
Interne Worker-Funktion für paralleles Scraping, die IMMER ein Dictionary zurückgibt.
|
||||||
Basiert auf der Logik aus v1.7.9.
|
|
||||||
"""
|
"""
|
||||||
row_num = task_info['row_num']
|
row_num = task_info['row_num']
|
||||||
url = task_info['url']
|
url = task_info['url']
|
||||||
self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num + 1}: {url}")
|
self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num}: {url}")
|
||||||
try:
|
try:
|
||||||
raw_text = scraper_function(url)
|
# Ruft die robuste get_website_raw Funktion auf
|
||||||
return {'row_num': row_num, 'raw_text': raw_text, 'error': "k.A." in raw_text}
|
raw_text = get_website_raw(url)
|
||||||
|
# Gibt das Ergebnis immer als Dictionary zurück
|
||||||
|
return {'row_num': row_num, 'raw_text': raw_text}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num + 1}: {e}")
|
self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num}: {e}")
|
||||||
return {'row_num': row_num, 'raw_text': f"FEHLER: {e}", 'error': True}
|
return {'row_num': row_num, 'raw_text': f"FEHLER im Task: {e}"}
|
||||||
|
|
||||||
def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None):
|
def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None):
|
||||||
"""
|
"""
|
||||||
Effizienter Batch-Prozess, der Websites parallel scraped, Meta-Daten extrahiert
|
Batch-Prozess NUR für Website-Scraping (Rohtext). Basiert auf der
|
||||||
und Zusammenfassungen erstellt.
|
bewährten Logik aus v1.7.9, angepasst an die neue modulare Struktur und fehlerbereinigt.
|
||||||
"""
|
"""
|
||||||
self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}")
|
self.logger.info(f"Starte Website-Scraping (Batch). Limit: {limit or 'Unbegrenzt'}")
|
||||||
|
|
||||||
if start_sheet_row is None:
|
if start_sheet_row is None:
|
||||||
start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp")
|
start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp")
|
||||||
@@ -2099,50 +2100,53 @@ class DataProcessor:
|
|||||||
all_data = self.sheet_handler.get_all_data_with_headers()
|
all_data = self.sheet_handler.get_all_data_with_headers()
|
||||||
|
|
||||||
tasks = []
|
tasks = []
|
||||||
|
processed_count = 0
|
||||||
for i in range(start_sheet_row - 1, len(all_data)):
|
for i in range(start_sheet_row - 1, len(all_data)):
|
||||||
if limit is not None and len(tasks) >= limit: break
|
if limit is not None and processed_count >= limit:
|
||||||
|
break
|
||||||
|
|
||||||
row_data = all_data[i]
|
row_data = all_data[i]
|
||||||
if self._needs_website_processing(row_data, force_reeval=False):
|
if self._needs_website_processing(row_data, force_reeval=False):
|
||||||
tasks.append({
|
website_url = self._get_cell_value_safe(row_data, "CRM Website").strip()
|
||||||
'row_num': i + 1,
|
if website_url and website_url.lower() not in ["k.a.", "http:"]:
|
||||||
'company_name': self._get_cell_value_safe(row_data, "CRM Name"),
|
tasks.append({"row_num": i + 1, "url": website_url})
|
||||||
'url': self._get_cell_value_safe(row_data, "CRM Website")
|
processed_count += 1
|
||||||
})
|
|
||||||
|
|
||||||
if not tasks:
|
if not tasks:
|
||||||
self.logger.info("Keine Zeilen gefunden, die Website-Verarbeitung erfordern.")
|
self.logger.info("Keine Zeilen für Website-Scraping gefunden.")
|
||||||
return
|
return
|
||||||
|
|
||||||
all_updates = []
|
all_updates = []
|
||||||
now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
current_version = getattr(Config, 'VERSION', 'unknown')
|
||||||
|
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 10)) as executor:
|
||||||
|
self.logger.info(f"Starte paralleles Scraping für {len(tasks)} Websites...")
|
||||||
|
future_to_task = {executor.submit(self._scrape_raw_text_task, task): task for task in tasks}
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 5)) as executor:
|
|
||||||
self.logger.info(f"Starte parallele Verarbeitung für {len(tasks)} Websites...")
|
|
||||||
future_to_task = {executor.submit(self._scrape_and_summarize_task, task): task for task in tasks}
|
|
||||||
|
|
||||||
for future in as_completed(future_to_task):
|
for future in as_completed(future_to_task):
|
||||||
row_num = future_to_task[future]['row_num']
|
task = future_to_task[future]
|
||||||
|
row_num = task['row_num']
|
||||||
try:
|
try:
|
||||||
|
# Hier erhalten wir jetzt garantiert ein Dictionary
|
||||||
result_dict = future.result()
|
result_dict = future.result()
|
||||||
|
raw_text_res = result_dict['raw_text']
|
||||||
|
|
||||||
# Schreibe alle Ergebnisse zurück
|
# Updates vorbereiten
|
||||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url')]]})
|
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[raw_text_res]]})
|
||||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[result_dict.get('raw_text')]]})
|
|
||||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Meta-Details") + 1)}{row_num}', 'values': [[result_dict.get('meta_text')]]})
|
|
||||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Zusammenfassung") + 1)}{row_num}', 'values': [[result_dict.get('summary')]]})
|
|
||||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("URL Prüfstatus") + 1)}{row_num}', 'values': [[result_dict.get('url_pruefstatus')]]})
|
|
||||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)}{row_num}', 'values': [[now_timestamp]]})
|
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)}{row_num}', 'values': [[now_timestamp]]})
|
||||||
|
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Version") + 1)}{row_num}', 'values': [[current_version]]})
|
||||||
except Exception as e_future:
|
except Exception as e_future:
|
||||||
self.logger.error(f"Fehler beim Abrufen des Ergebnisses für Zeile {row_num}: {e_future}")
|
self.logger.error(f" -> Fehler beim Verarbeiten des Ergebnisses für Zeile {row_num}: {e_future}")
|
||||||
|
|
||||||
if all_updates:
|
if all_updates:
|
||||||
self.logger.info(f"Sende Batch-Update für {len(tasks)} verarbeitete Websites...")
|
self.logger.info(f"Sende Batch-Update für {len(tasks)} verarbeitete Websites...")
|
||||||
self.sheet_handler.batch_update_cells(all_updates)
|
self.sheet_handler.batch_update_cells(all_updates)
|
||||||
|
|
||||||
self.logger.info(f"Website-Verarbeitung (Batch) abgeschlossen. {len(tasks)} Zeilen verarbeitet.")
|
self.logger.info(f"Website-Scraping (Batch) abgeschlossen. {len(tasks)} Zeilen verarbeitet.")
|
||||||
|
|
||||||
def process_website_scraping(
|
def process_website_scraping(
|
||||||
self,
|
self,
|
||||||
start_sheet_row=None,
|
start_sheet_row=None,
|
||||||
|
|||||||
Reference in New Issue
Block a user