data_processor.py aktualisiert

This commit is contained in:
2025-07-20 08:05:15 +00:00
parent 10e47b7f66
commit a8de588ed7

View File

@@ -2074,12 +2074,13 @@ class DataProcessor:
""" """
Stabiler und robuster Batch-Prozess für Website-Scraping. Stabiler und robuster Batch-Prozess für Website-Scraping.
Führt Scraping, Meta-Extraktion und Zusammenfassung in einem parallelen Lauf aus. Führt Scraping, Meta-Extraktion und Zusammenfassung in einem parallelen Lauf aus.
FINALE, GETESTETE VERSION.
""" """
self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}") self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}")
if start_sheet_row is None: if start_sheet_row is None:
start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp") start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp")
if start_data_idx == -1: return # Fehler wurde bereits geloggt if start_data_idx == -1: return
start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1 start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1
if not self.sheet_handler.load_data(): return if not self.sheet_handler.load_data(): return
@@ -2104,8 +2105,6 @@ class DataProcessor:
"""Interne Worker-Funktion. Gibt IMMER ein Dictionary zurück.""" """Interne Worker-Funktion. Gibt IMMER ein Dictionary zurück."""
company_name = task['company_name'] company_name = task['company_name']
website_url = task['url'] website_url = task['url']
# Initialisiere das Ergebnis-Dictionary mit Default-Werten
result = {'raw_text': 'k.A.', 'meta_text': 'k.A.', 'summary': 'k.A.', 'url_pruefstatus': 'URL_UNPROCESSED', 'final_url': website_url} result = {'raw_text': 'k.A.', 'meta_text': 'k.A.', 'summary': 'k.A.', 'url_pruefstatus': 'URL_UNPROCESSED', 'final_url': website_url}
try: try:
@@ -2116,8 +2115,7 @@ class DataProcessor:
result['final_url'] = found_url result['final_url'] = found_url
result['url_pruefstatus'] = "URL_OK_SERP" result['url_pruefstatus'] = "URL_OK_SERP"
else: else:
result['url_pruefstatus'] = "URL_SERP_FAILED" result['url_pruefstatus'] = "URL_SERP_FAILED"; return result
return result
if website_url and 'k.a.' not in website_url.lower(): if website_url and 'k.a.' not in website_url.lower():
raw_text = get_website_raw(website_url) raw_text = get_website_raw(website_url)
@@ -2144,6 +2142,7 @@ class DataProcessor:
all_updates = [] all_updates = []
now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 5)) as executor: with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 5)) as executor:
self.logger.info(f"Starte parallele Verarbeitung für {len(tasks)} Websites...") self.logger.info(f"Starte parallele Verarbeitung für {len(tasks)} Websites...")
@@ -2155,7 +2154,7 @@ class DataProcessor:
result_dict = future.result() result_dict = future.result()
if not isinstance(result_dict, dict): if not isinstance(result_dict, dict):
self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {row_num}: Worker gab keinen Dictionary zurück. Bekam {type(result_dict)}. Überspringe Update.") self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {row_num}: Worker gab keinen Dictionary zurück. Überspringe.")
continue continue
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url')]]}) all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url')]]})
@@ -2605,31 +2604,29 @@ class DataProcessor:
self.logger.info( self.logger.info(
f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.") f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.")
def _scrape_raw_text_task(self, task_info, scrape_function): def _scrape_raw_text_task(self, task_info, scraper_function):
""" """
Worker-Funktion. Ruft die gehärteten Helper auf und gibt IMMER ein Dictionary zurück. Interne Worker-Funktion für paralleles Scraping.
Gibt IMMER ein Dictionary zurück, um den `Erwartete dict, bekam <class 'str'>`-Fehler zu beheben.
""" """
url = task_info.get('url') row_num = task_info['row_num']
row_num = task_info.get('row_num') url = task_info['url']
self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num}: {url}") self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num + 1}: {url}")
try: try:
raw_text_result = get_website_raw(url) # Ruft die übergebene scraper_function (get_website_raw) auf
meta_details_result = scrape_website_details(url) raw_text = scraper_function(url)
# Das ist der entscheidende Fix: Das Ergebnis wird immer in ein Dictionary verpackt.
# Wir prüfen auch, ob das Ergebnis auf einen Fehler hindeutet.
is_error = "k.A." in raw_text or "FEHLER" in raw_text
return {'row_num': row_num, 'raw_text': raw_text, 'error': is_error}
return {
'row_num': row_num,
'raw_text': raw_text_result,
'meta_details': meta_details_result
}
except Exception as e: except Exception as e:
# Dieses Sicherheitsnetz fängt alle unerwarteten Fehler ab, die die Helper nicht fangen. self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num + 1}: {e}")
self.logger.error(f"FATALER FEHLER im Scraping Worker für Zeile {row_num}: {e}", exc_info=True) # Auch im absoluten Fehlerfall wird ein Dictionary zurückgegeben.
return { return {'row_num': row_num, 'raw_text': f"FEHLER im Task: {e}", 'error': True}
'row_num': row_num,
'raw_text': f'k.A. (FATALER WORKER FEHLER: {e})',
'meta_details': f'k.A. (FATALER WORKER FEHLER: {e})'
}
def process_summarization_batch( def process_summarization_batch(