data_processor.py aktualisiert
This commit is contained in:
@@ -2078,6 +2078,7 @@ class DataProcessor:
|
|||||||
"""
|
"""
|
||||||
self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}")
|
self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}")
|
||||||
|
|
||||||
|
# --- 1. Tasks sammeln ---
|
||||||
if start_sheet_row is None:
|
if start_sheet_row is None:
|
||||||
start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp")
|
start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp")
|
||||||
if start_data_idx == -1: return
|
if start_data_idx == -1: return
|
||||||
@@ -2090,6 +2091,7 @@ class DataProcessor:
|
|||||||
for i in range(start_sheet_row - 1, len(all_data)):
|
for i in range(start_sheet_row - 1, len(all_data)):
|
||||||
if limit is not None and len(tasks) >= limit: break
|
if limit is not None and len(tasks) >= limit: break
|
||||||
row_data = all_data[i]
|
row_data = all_data[i]
|
||||||
|
# VERWENDET IHRE EXAKTE LOGIK:
|
||||||
if self._needs_website_processing(row_data, force_reeval=False):
|
if self._needs_website_processing(row_data, force_reeval=False):
|
||||||
tasks.append({
|
tasks.append({
|
||||||
'row_num': i + 1,
|
'row_num': i + 1,
|
||||||
@@ -2101,10 +2103,12 @@ class DataProcessor:
|
|||||||
self.logger.info("Keine Zeilen gefunden, die Website-Verarbeitung erfordern.")
|
self.logger.info("Keine Zeilen gefunden, die Website-Verarbeitung erfordern.")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# --- 2. Worker-Funktion definieren ---
|
||||||
def _scrape_worker(task):
|
def _scrape_worker(task):
|
||||||
"""Interne Worker-Funktion. Gibt IMMER ein Dictionary zurück."""
|
"""Interne Worker-Funktion. Gibt IMMER ein Dictionary zurück."""
|
||||||
company_name = task['company_name']
|
company_name = task['company_name']
|
||||||
website_url = task['url']
|
website_url = task['url']
|
||||||
|
|
||||||
result = {'raw_text': 'k.A.', 'meta_text': 'k.A.', 'summary': 'k.A.', 'url_pruefstatus': 'URL_UNPROCESSED', 'final_url': website_url}
|
result = {'raw_text': 'k.A.', 'meta_text': 'k.A.', 'summary': 'k.A.', 'url_pruefstatus': 'URL_UNPROCESSED', 'final_url': website_url}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -2139,6 +2143,7 @@ class DataProcessor:
|
|||||||
result['url_pruefstatus'] = "URL_SCRAPE_ERROR"
|
result['url_pruefstatus'] = "URL_SCRAPE_ERROR"
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
# --- 3. Parallele Ausführung & Ergebnisse sammeln ---
|
||||||
all_updates = []
|
all_updates = []
|
||||||
now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
@@ -2154,7 +2159,7 @@ class DataProcessor:
|
|||||||
result_dict = future.result()
|
result_dict = future.result()
|
||||||
|
|
||||||
if not isinstance(result_dict, dict):
|
if not isinstance(result_dict, dict):
|
||||||
self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {row_num}: Worker gab keinen Dictionary zurück. Überspringe.")
|
self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {row_num}: Worker gab keinen Dictionary zurück. Bekam {type(result_dict)}. Überspringe.")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url')]]})
|
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url')]]})
|
||||||
@@ -2167,6 +2172,7 @@ class DataProcessor:
|
|||||||
except Exception as e_future:
|
except Exception as e_future:
|
||||||
self.logger.error(f"Fehler beim Abrufen des Ergebnisses für Zeile {row_num}: {e_future}", exc_info=True)
|
self.logger.error(f"Fehler beim Abrufen des Ergebnisses für Zeile {row_num}: {e_future}", exc_info=True)
|
||||||
|
|
||||||
|
# --- 4. Finales Schreiben ---
|
||||||
if all_updates:
|
if all_updates:
|
||||||
self.logger.info(f"Sende Batch-Update für {len(tasks)} verarbeitete Websites...")
|
self.logger.info(f"Sende Batch-Update für {len(tasks)} verarbeitete Websites...")
|
||||||
self.sheet_handler.batch_update_cells(all_updates)
|
self.sheet_handler.batch_update_cells(all_updates)
|
||||||
@@ -2606,23 +2612,17 @@ class DataProcessor:
|
|||||||
|
|
||||||
def _scrape_raw_text_task(self, task_info, scraper_function):
|
def _scrape_raw_text_task(self, task_info, scraper_function):
|
||||||
"""
|
"""
|
||||||
Interne Worker-Funktion für paralleles Scraping.
|
Worker-Funktion für Threading. Gibt IMMER ein Dictionary zurück,
|
||||||
Gibt IMMER ein Dictionary zurück, um den `Erwartete dict, bekam <class 'str'>`-Fehler zu beheben.
|
um den `Erwartete dict, bekam <class 'str'>`-Fehler zu beheben.
|
||||||
"""
|
"""
|
||||||
row_num = task_info['row_num']
|
row_num = task_info['row_num']
|
||||||
url = task_info['url']
|
url = task_info['url']
|
||||||
self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num + 1}: {url}")
|
self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num + 1}: {url}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Ruft die übergebene scraper_function (get_website_raw) auf
|
|
||||||
raw_text = scraper_function(url)
|
raw_text = scraper_function(url)
|
||||||
|
|
||||||
# Das ist der entscheidende Fix: Das Ergebnis wird immer in ein Dictionary verpackt.
|
# Das ist der entscheidende Fix: Das Ergebnis wird immer in ein Dictionary verpackt.
|
||||||
# Wir prüfen auch, ob das Ergebnis auf einen Fehler hindeutet.
|
|
||||||
is_error = "k.A." in raw_text or "FEHLER" in raw_text
|
is_error = "k.A." in raw_text or "FEHLER" in raw_text
|
||||||
|
|
||||||
return {'row_num': row_num, 'raw_text': raw_text, 'error': is_error}
|
return {'row_num': row_num, 'raw_text': raw_text, 'error': is_error}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num + 1}: {e}")
|
self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num + 1}: {e}")
|
||||||
# Auch im absoluten Fehlerfall wird ein Dictionary zurückgegeben.
|
# Auch im absoluten Fehlerfall wird ein Dictionary zurückgegeben.
|
||||||
@@ -3197,14 +3197,33 @@ class DataProcessor:
|
|||||||
self.evaluate_branch_task,
|
self.evaluate_branch_task,
|
||||||
task,
|
task,
|
||||||
openai_sem): task for task in batch_tasks_to_run}
|
openai_sem): task for task in batch_tasks_to_run}
|
||||||
for future in concurrent.futures.as_completed(future_map):
|
for future in concurrent.futures.as_completed(future_to_task):
|
||||||
task_info = future_map[future]
|
task = future_to_task[future]
|
||||||
try:
|
try:
|
||||||
res_data = future.result()
|
# HIER KOMMT JETZT GARANTIERT EIN DICTIONARY AN
|
||||||
current_batch_results.append(res_data)
|
result_dict = future.result()
|
||||||
if res_data.get('error'):
|
|
||||||
current_batch_errors += 1
|
# Prüfe explizit, ob es wirklich ein Dictionary ist (doppelte Sicherheit)
|
||||||
except Exception as exc_future:
|
if not isinstance(result_dict, dict):
|
||||||
|
self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {task['row_num']}: Worker gab keinen Dictionary zurück. Bekam {type(result_dict)}. Überspringe.")
|
||||||
|
batch_error_count += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Lese die Werte aus dem Dictionary, anstatt das ganze Objekt zu speichern
|
||||||
|
row_num_from_result = result_dict['row_num']
|
||||||
|
raw_text_res = result_dict['raw_text']
|
||||||
|
|
||||||
|
scraping_results[row_num_from_result] = raw_text_res
|
||||||
|
|
||||||
|
if result_dict.get('error'):
|
||||||
|
batch_error_count += 1
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
row_num = task['row_num']
|
||||||
|
err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage für Zeile {row_num} ({task['url'][:100]}): {exc}"
|
||||||
|
self.logger.error(err_msg)
|
||||||
|
scraping_results[row_num] = "k.A. (Unerwarteter Fehler Task)"
|
||||||
|
batch_error_count += 1
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
f"Exception im Future für Zeile {task_info['row_num']}: {exc_future}")
|
f"Exception im Future für Zeile {task_info['row_num']}: {exc_future}")
|
||||||
current_batch_results.append(
|
current_batch_results.append(
|
||||||
|
|||||||
Reference in New Issue
Block a user