data_processor.py aktualisiert
This commit is contained in:
@@ -813,6 +813,143 @@ class DataProcessor:
|
|||||||
# ==========================================================================
|
# ==========================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def process_website_scraping(self, start_sheet_row=None, end_sheet_row=None, limit=None):
|
||||||
|
"""
|
||||||
|
Batch-Prozess NUR für Website-Scraping (Rohtext & Meta-Details).
|
||||||
|
Diese Version ist stabilisiert, nutzt einen robusten Worker (_scrape_website_task_batch)
|
||||||
|
und verarbeitet strukturierte Dictionary-Ergebnisse, um Fehler zu vermeiden.
|
||||||
|
"""
|
||||||
|
self.logger.info(f"Starte Website-Scraping (Batch, v2.0.1). Bereich: {start_sheet_row or 'Start'}-{end_sheet_row or 'Ende'}, Limit: {limit or 'Unbegrenzt'}")
|
||||||
|
|
||||||
|
# --- 1. Daten laden und Startzeile ermitteln ---
|
||||||
|
if start_sheet_row is None:
|
||||||
|
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren 'Website Scrape Timestamp'...")
|
||||||
|
start_data_idx = self.sheet_handler.get_start_row_index(check_column_key="Website Scrape Timestamp")
|
||||||
|
if start_data_idx == -1:
|
||||||
|
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.")
|
||||||
|
return
|
||||||
|
start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1
|
||||||
|
self.logger.info(f"Automatisch ermittelte Startzeile: {start_sheet_row}")
|
||||||
|
|
||||||
|
if not self.sheet_handler.load_data():
|
||||||
|
self.logger.error("FEHLER beim Laden der Daten für Batch-Verarbeitung.")
|
||||||
|
return
|
||||||
|
|
||||||
|
all_data = self.sheet_handler.get_all_data_with_headers()
|
||||||
|
header_rows = self.sheet_handler._header_rows
|
||||||
|
total_sheet_rows = len(all_data)
|
||||||
|
effective_end_row = end_sheet_row if end_sheet_row is not None else total_sheet_rows
|
||||||
|
|
||||||
|
self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {effective_end_row}.")
|
||||||
|
if start_sheet_row > effective_end_row:
|
||||||
|
self.logger.info("Start liegt nach dem Ende. Keine Zeilen zu verarbeiten.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# --- 2. Spalten-Indizes und Buchstaben vorbereiten ---
|
||||||
|
rohtext_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)
|
||||||
|
metadetails_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Meta-Details") + 1)
|
||||||
|
version_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Version") + 1)
|
||||||
|
timestamp_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)
|
||||||
|
|
||||||
|
# --- 3. Tasks sammeln ---
|
||||||
|
processing_batch_size = getattr(Config, 'PROCESSING_BATCH_SIZE', 20)
|
||||||
|
max_scraping_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10)
|
||||||
|
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50)
|
||||||
|
|
||||||
|
tasks_for_processing_batch = []
|
||||||
|
all_sheet_updates = []
|
||||||
|
processed_count = 0
|
||||||
|
skipped_count = 0
|
||||||
|
|
||||||
|
for i in range(start_sheet_row, effective_end_row + 1):
|
||||||
|
row_index_in_list = i - 1
|
||||||
|
if row_index_in_list >= total_sheet_rows: break
|
||||||
|
|
||||||
|
row = all_data[row_index_in_list]
|
||||||
|
if not any(cell and str(cell).strip() for cell in row):
|
||||||
|
skipped_count += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
if self._needs_website_processing(row, force_reeval=False):
|
||||||
|
website_url = self._get_cell_value_safe(row, "CRM Website").strip()
|
||||||
|
company_name = self._get_cell_value_safe(row, "CRM Name").strip()
|
||||||
|
|
||||||
|
if website_url and website_url.lower() not in ["k.a.", "http:"]:
|
||||||
|
if limit is not None and processed_count >= limit:
|
||||||
|
self.logger.info(f"Verarbeitungslimit ({limit}) erreicht.")
|
||||||
|
break
|
||||||
|
|
||||||
|
tasks_for_processing_batch.append({"row_num": i, "url": website_url, "company_name": company_name})
|
||||||
|
processed_count += 1
|
||||||
|
else:
|
||||||
|
skipped_count += 1
|
||||||
|
else:
|
||||||
|
skipped_count += 1
|
||||||
|
|
||||||
|
# --- 4. Batch-Verarbeitung auslösen ---
|
||||||
|
if len(tasks_for_processing_batch) >= processing_batch_size or (i == effective_end_row and tasks_for_processing_batch):
|
||||||
|
self.logger.info(f"--- Starte Website-Scraping Batch für {len(tasks_for_processing_batch)} Tasks (max. {max_scraping_workers} Worker) ---")
|
||||||
|
|
||||||
|
# Dictionary zum Speichern der kompletten Ergebnis-Dicts
|
||||||
|
scraping_results = {}
|
||||||
|
batch_error_count = 0
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
|
||||||
|
# Die neue, robuste Worker-Funktion wird hier aufgerufen
|
||||||
|
future_to_task = {executor.submit(self._scrape_website_task_batch, task): task for task in tasks_for_processing_batch}
|
||||||
|
|
||||||
|
for future in as_completed(future_to_task):
|
||||||
|
task = future_to_task[future]
|
||||||
|
try:
|
||||||
|
# Das Ergebnis ist garantiert ein Dictionary
|
||||||
|
result_dict = future.result()
|
||||||
|
|
||||||
|
if isinstance(result_dict, dict) and 'row_num' in result_dict:
|
||||||
|
scraping_results[result_dict['row_num']] = result_dict
|
||||||
|
if result_dict.get('error'):
|
||||||
|
batch_error_count += 1
|
||||||
|
self.logger.warning(f"Worker meldete Fehler für Zeile {result_dict['row_num']}: {result_dict.get('status_message')}")
|
||||||
|
else:
|
||||||
|
# Fallback, falls doch etwas Unerwartetes passiert
|
||||||
|
self.logger.error(f"Inkonsistentes Ergebnis für Zeile {task['row_num']}: Erwartete dict mit 'row_num', bekam {type(result_dict)}. Überspringe.")
|
||||||
|
scraping_results[task['row_num']] = {'raw_text': "FEHLER (Inkonsistenter Rückgabetyp)", 'meta_details': 'k.A.', 'error': True}
|
||||||
|
batch_error_count += 1
|
||||||
|
except Exception as exc:
|
||||||
|
self.logger.error(f"Unerwarteter Fehler bei Ergebnisabfrage für Zeile {task['row_num']}: {exc}", exc_info=True)
|
||||||
|
scraping_results[task['row_num']] = {'raw_text': "FEHLER (Task Exception)", 'meta_details': 'k.A.', 'error': True}
|
||||||
|
batch_error_count += 1
|
||||||
|
|
||||||
|
self.logger.info(f" -> Scraping für Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} davon mit Fehlern).")
|
||||||
|
|
||||||
|
# --- 5. Updates für das Google Sheet vorbereiten ---
|
||||||
|
if scraping_results:
|
||||||
|
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
current_version = getattr(Config, 'VERSION', 'unknown')
|
||||||
|
|
||||||
|
for row_num, res_dict in scraping_results.items():
|
||||||
|
# Rohtext, Meta-Details, Timestamp und Version werden zum Update hinzugefügt
|
||||||
|
all_sheet_updates.append({'range': f'{rohtext_col_letter}{row_num}', 'values': [[res_dict.get('raw_text', 'k.A.')]]})
|
||||||
|
all_sheet_updates.append({'range': f'{metadetails_col_letter}{row_num}', 'values': [[res_dict.get('meta_details', 'k.A.')]]})
|
||||||
|
all_sheet_updates.append({'range': f'{timestamp_col_letter}{row_num}', 'values': [[current_timestamp]]})
|
||||||
|
all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]})
|
||||||
|
|
||||||
|
tasks_for_processing_batch = [] # Batch leeren
|
||||||
|
|
||||||
|
# --- 6. Sheet-Update auslösen, wenn Update-Batch voll ist ---
|
||||||
|
# Pro Zeile gibt es 4 Updates (Rohtext, Meta, TS, Version)
|
||||||
|
if len(all_sheet_updates) >= (update_batch_row_limit * 4):
|
||||||
|
self.logger.info(f"Sende gesammelte Sheet-Updates ({len(all_sheet_updates) // 4} Zeilen)...")
|
||||||
|
self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||||
|
all_sheet_updates = []
|
||||||
|
time.sleep(1) # Kurze Pause nach einem großen Update
|
||||||
|
|
||||||
|
# --- 7. Finale Updates senden ---
|
||||||
|
if all_sheet_updates:
|
||||||
|
self.logger.info(f"Sende finale gesammelte Sheet-Updates ({len(all_sheet_updates) // 4} Zeilen)...")
|
||||||
|
self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||||
|
|
||||||
|
self.logger.info(f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen zur Verarbeitung ausgewählt, {skipped_count} Zeilen übersprungen.")
|
||||||
|
|
||||||
def _scrape_website_task_batch(self, task_info):
|
def _scrape_website_task_batch(self, task_info):
|
||||||
"""
|
"""
|
||||||
Robuste Worker-Funktion für das parallele Scrapen von Websites im Batch-Modus.
|
Robuste Worker-Funktion für das parallele Scrapen von Websites im Batch-Modus.
|
||||||
|
|||||||
Reference in New Issue
Block a user