diff --git a/data_processor.py b/data_processor.py index 826c0365..935f9f64 100644 --- a/data_processor.py +++ b/data_processor.py @@ -2069,27 +2069,28 @@ class DataProcessor: 'meta_details': meta_details_result } - def _scrape_raw_text_task(self, task_info, scraper_function): + def _scrape_raw_text_task(self, task_info): """ - Worker-Funktion für Threading. Gibt IMMER ein Dictionary zurück. - Basiert auf der Logik aus v1.7.9. + Interne Worker-Funktion für paralleles Scraping, die IMMER ein Dictionary zurückgibt. """ row_num = task_info['row_num'] url = task_info['url'] - self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num + 1}: {url}") + self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num}: {url}") try: - raw_text = scraper_function(url) - return {'row_num': row_num, 'raw_text': raw_text, 'error': "k.A." in raw_text} + # Ruft die robuste get_website_raw Funktion auf + raw_text = get_website_raw(url) + # Gibt das Ergebnis immer als Dictionary zurück + return {'row_num': row_num, 'raw_text': raw_text} except Exception as e: - self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num + 1}: {e}") - return {'row_num': row_num, 'raw_text': f"FEHLER: {e}", 'error': True} + self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num}: {e}") + return {'row_num': row_num, 'raw_text': f"FEHLER im Task: {e}"} - def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None): + def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None): """ - Effizienter Batch-Prozess, der Websites parallel scraped, Meta-Daten extrahiert - und Zusammenfassungen erstellt. + Batch-Prozess NUR für Website-Scraping (Rohtext). Basiert auf der + bewährten Logik aus v1.7.9, angepasst an die neue modulare Struktur und fehlerbereinigt. """ - self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}") + self.logger.info(f"Starte Website-Scraping (Batch). Limit: {limit or 'Unbegrenzt'}") if start_sheet_row is None: start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp") @@ -2099,50 +2100,53 @@ class DataProcessor: all_data = self.sheet_handler.get_all_data_with_headers() tasks = [] + processed_count = 0 for i in range(start_sheet_row - 1, len(all_data)): - if limit is not None and len(tasks) >= limit: break + if limit is not None and processed_count >= limit: + break + row_data = all_data[i] if self._needs_website_processing(row_data, force_reeval=False): - tasks.append({ - 'row_num': i + 1, - 'company_name': self._get_cell_value_safe(row_data, "CRM Name"), - 'url': self._get_cell_value_safe(row_data, "CRM Website") - }) + website_url = self._get_cell_value_safe(row_data, "CRM Website").strip() + if website_url and website_url.lower() not in ["k.a.", "http:"]: + tasks.append({"row_num": i + 1, "url": website_url}) + processed_count += 1 if not tasks: - self.logger.info("Keine Zeilen gefunden, die Website-Verarbeitung erfordern.") + self.logger.info("Keine Zeilen für Website-Scraping gefunden.") return all_updates = [] now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - + current_version = getattr(Config, 'VERSION', 'unknown') + + from concurrent.futures import ThreadPoolExecutor, as_completed + + with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 10)) as executor: + self.logger.info(f"Starte paralleles Scraping für {len(tasks)} Websites...") + future_to_task = {executor.submit(self._scrape_raw_text_task, task): task for task in tasks} - with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 5)) as executor: - self.logger.info(f"Starte parallele Verarbeitung für {len(tasks)} Websites...") - future_to_task = {executor.submit(self._scrape_and_summarize_task, task): task for task in tasks} - for future in as_completed(future_to_task): - row_num = future_to_task[future]['row_num'] + task = future_to_task[future] + row_num = task['row_num'] try: + # Hier erhalten wir jetzt garantiert ein Dictionary result_dict = future.result() + raw_text_res = result_dict['raw_text'] - # Schreibe alle Ergebnisse zurück - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url')]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[result_dict.get('raw_text')]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Meta-Details") + 1)}{row_num}', 'values': [[result_dict.get('meta_text')]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Zusammenfassung") + 1)}{row_num}', 'values': [[result_dict.get('summary')]]}) - all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("URL Prüfstatus") + 1)}{row_num}', 'values': [[result_dict.get('url_pruefstatus')]]}) + # Updates vorbereiten + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[raw_text_res]]}) all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)}{row_num}', 'values': [[now_timestamp]]}) - + all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Version") + 1)}{row_num}', 'values': [[current_version]]}) except Exception as e_future: - self.logger.error(f"Fehler beim Abrufen des Ergebnisses für Zeile {row_num}: {e_future}") + self.logger.error(f" -> Fehler beim Verarbeiten des Ergebnisses für Zeile {row_num}: {e_future}") if all_updates: self.logger.info(f"Sende Batch-Update für {len(tasks)} verarbeitete Websites...") self.sheet_handler.batch_update_cells(all_updates) - self.logger.info(f"Website-Verarbeitung (Batch) abgeschlossen. {len(tasks)} Zeilen verarbeitet.") - + self.logger.info(f"Website-Scraping (Batch) abgeschlossen. {len(tasks)} Zeilen verarbeitet.") + def process_website_scraping( self, start_sheet_row=None,