data_processor.py aktualisiert
This commit is contained in:
@@ -27,6 +27,7 @@ from sklearn.ensemble import RandomForestClassifier
|
||||
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
|
||||
from imblearn.over_sampling import SMOTE
|
||||
from imblearn.pipeline import Pipeline as ImbPipeline
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
# Import der abhängigen Module
|
||||
from config import Config, COLUMN_MAP, MODEL_FILE, IMPUTER_FILE, PATTERNS_FILE_JSON
|
||||
@@ -458,7 +459,7 @@ class DataProcessor:
|
||||
|
||||
updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Wikipedia Timestamp") + 1)}{row_num_in_sheet}', 'values': [[now_timestamp]]})
|
||||
|
||||
# --- 3. ChatGPT Evaluationen (Branch, FSM, etc.) & Plausi ---
|
||||
# --- 3. ChatGPT Evaluationen (Branch, FSM, etc.) & Plausi ---
|
||||
run_chat_step = 'chat' in steps_to_run
|
||||
# KORREKTUR: Initialisiere chat_steps_to_run, um den NameError zu beheben
|
||||
chat_steps_to_run = set()
|
||||
@@ -754,7 +755,7 @@ class DataProcessor:
|
||||
# === Abschluss der _process_single_row Verarbeitung ==================
|
||||
# ======================================================================
|
||||
|
||||
# --- 5. Abschliessende Updates (Version, Tokens, ReEval Flag) ---
|
||||
# --- 5. Abschliessende Updates (Version, Tokens, ReEval Flag) ---
|
||||
if any_processing_done:
|
||||
version_col_idx = get_col_idx("Version") # KORRIGIERT
|
||||
if version_col_idx is not None:
|
||||
@@ -811,6 +812,59 @@ class DataProcessor:
|
||||
# === Prozess Methoden (Sequentiell & Re-Evaluation) =====================
|
||||
# ==========================================================================
|
||||
|
||||
def _scrape_and_summarize_task(self, task_info):
|
||||
"""
|
||||
Interne Worker-Funktion für paralleles Scraping und Summarizing.
|
||||
Gibt IMMER ein Dictionary mit allen relevanten Website-Daten zurück.
|
||||
"""
|
||||
row_num = task_info['row_num']
|
||||
company_name = task_info['company_name']
|
||||
website_url = task_info['url']
|
||||
self.logger.debug(f" -> Scrape-Task gestartet für Zeile {row_num}: {website_url}")
|
||||
|
||||
result = {
|
||||
'raw_text': 'k.A.',
|
||||
'meta_text': 'k.A.',
|
||||
'summary': 'k.A.',
|
||||
'url_pruefstatus': 'URL_UNPROCESSED',
|
||||
'final_url': website_url # Behalte die ursprüngliche URL für den Fall eines SERP-Lookups
|
||||
}
|
||||
|
||||
try:
|
||||
# 1. SERP-Lookup, falls keine URL vorhanden ist
|
||||
if not website_url or website_url.lower() == 'k.a.':
|
||||
found_url = serp_website_lookup(company_name)
|
||||
if found_url and 'k.a.' not in found_url.lower():
|
||||
website_url = found_url
|
||||
result['final_url'] = found_url
|
||||
result['url_pruefstatus'] = "URL_OK_SERP"
|
||||
else:
|
||||
result['url_pruefstatus'] = "URL_SERP_FAILED"
|
||||
return result # Beende hier, wenn keine URL gefunden wurde
|
||||
|
||||
# 2. Scrape Rohtext
|
||||
raw_text = get_website_raw(website_url)
|
||||
result['raw_text'] = raw_text
|
||||
|
||||
# 3. Bewerte das Ergebnis des Scrapings
|
||||
if raw_text == URL_CHECK_MARKER:
|
||||
result['url_pruefstatus'] = URL_CHECK_MARKER
|
||||
elif raw_text and 'k.a.' not in raw_text.lower():
|
||||
result['url_pruefstatus'] = "URL_OK_SCRAPED"
|
||||
# 4. Scrape Meta-Daten und erstelle Zusammenfassung nur bei Erfolg
|
||||
result['meta_text'] = scrape_website_details(website_url) or 'k.A.'
|
||||
result['summary'] = summarize_website_content(raw_text, company_name) or 'k.A.'
|
||||
else:
|
||||
result['url_pruefstatus'] = "URL_SCRAPE_EMPTY_OR_BANNER"
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num}: {e}")
|
||||
result['raw_text'] = f"FEHLER: {type(e).__name__}"
|
||||
result['url_pruefstatus'] = "URL_SCRAPE_ERROR"
|
||||
return result
|
||||
|
||||
def process_rows_sequentially(
|
||||
self,
|
||||
start_sheet_row,
|
||||
@@ -2032,97 +2086,62 @@ class DataProcessor:
|
||||
|
||||
def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None):
|
||||
"""
|
||||
Batch-Prozess NUR fuer Website-Scraping (Rohtext AR).
|
||||
Basiert auf der Logik aus v1.7.9, angepasst an die neue modulare Struktur und fehlerbereinigt.
|
||||
Effizienter Batch-Prozess, der Websites parallel scraped, Meta-Daten extrahiert
|
||||
und Zusammenfassungen erstellt.
|
||||
"""
|
||||
self.logger.info(f"Starte Website-Scraping (Batch). Bereich: {start_sheet_row or 'Start'}-{end_sheet_row or 'Ende'}, Limit: {limit or 'Unbegrenzt'}")
|
||||
self.logger.info(f"Starte Website-Scraping & Summarizing (Batch). Limit: {limit or 'Unbegrenzt'}")
|
||||
|
||||
# --- Daten laden und Startzeile ermitteln ---
|
||||
if start_sheet_row is None:
|
||||
start_data_idx = self.sheet_handler.get_start_row_index(check_column_key="Website Scrape Timestamp")
|
||||
if start_data_idx == -1:
|
||||
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.")
|
||||
return
|
||||
start_data_idx = self.sheet_handler.get_start_row_index("Website Scrape Timestamp")
|
||||
start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1
|
||||
|
||||
|
||||
if not self.sheet_handler.load_data(): return
|
||||
all_data = self.sheet_handler.get_all_data_with_headers()
|
||||
header_rows = self.sheet_handler._header_rows
|
||||
total_sheet_rows = len(all_data)
|
||||
effective_end_row = end_sheet_row if end_sheet_row is not None else total_sheet_rows
|
||||
|
||||
self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {effective_end_row}.")
|
||||
if start_sheet_row > effective_end_row:
|
||||
self.logger.info("Start liegt nach dem Ende. Keine Zeilen zu verarbeiten.")
|
||||
return
|
||||
|
||||
# --- Hauptlogik: Iteriere und sammle Batches ---
|
||||
processing_batch_size = getattr(Config, 'PROCESSING_BATCH_SIZE', 20)
|
||||
max_scraping_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10)
|
||||
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50)
|
||||
|
||||
tasks_for_processing_batch = []
|
||||
all_sheet_updates = []
|
||||
processed_count = 0
|
||||
tasks = []
|
||||
for i in range(start_sheet_row - 1, len(all_data)):
|
||||
if limit is not None and len(tasks) >= limit: break
|
||||
row_data = all_data[i]
|
||||
if self._needs_website_processing(row_data, force_reeval=False):
|
||||
tasks.append({
|
||||
'row_num': i + 1,
|
||||
'company_name': self._get_cell_value_safe(row_data, "CRM Name"),
|
||||
'url': self._get_cell_value_safe(row_data, "CRM Website")
|
||||
})
|
||||
|
||||
for i in range(start_sheet_row, effective_end_row + 1):
|
||||
row_index_in_list = i - 1
|
||||
if row_index_in_list >= total_sheet_rows: break
|
||||
if not tasks:
|
||||
self.logger.info("Keine Zeilen gefunden, die Website-Verarbeitung erfordern.")
|
||||
return
|
||||
|
||||
all_updates = []
|
||||
now_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
|
||||
with ThreadPoolExecutor(max_workers=getattr(Config, 'MAX_SCRAPING_WORKERS', 5)) as executor:
|
||||
self.logger.info(f"Starte parallele Verarbeitung für {len(tasks)} Websites...")
|
||||
future_to_task = {executor.submit(self._scrape_and_summarize_task, task): task for task in tasks}
|
||||
|
||||
row = all_data[row_index_in_list]
|
||||
if not any(cell and str(cell).strip() for cell in row): continue
|
||||
|
||||
# --- Pruefung, ob Verarbeitung noetig ---
|
||||
if self._needs_website_processing(row, force_reeval=False):
|
||||
website_url = self._get_cell_value_safe(row, "CRM Website").strip()
|
||||
if website_url and website_url.lower() not in ["k.a.", "http:"]:
|
||||
if limit is not None and processed_count >= limit:
|
||||
self.logger.info(f"Verarbeitungslimit ({limit}) erreicht.")
|
||||
break
|
||||
for future in as_completed(future_to_task):
|
||||
row_num = future_to_task[future]['row_num']
|
||||
try:
|
||||
result_dict = future.result()
|
||||
|
||||
tasks_for_processing_batch.append({"row_num": i, "url": website_url})
|
||||
processed_count += 1
|
||||
# Schreibe alle Ergebnisse zurück
|
||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("CRM Website") + 1)}{row_num}', 'values': [[result_dict.get('final_url')]]})
|
||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[result_dict.get('raw_text')]]})
|
||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Meta-Details") + 1)}{row_num}', 'values': [[result_dict.get('meta_text')]]})
|
||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Zusammenfassung") + 1)}{row_num}', 'values': [[result_dict.get('summary')]]})
|
||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("URL Prüfstatus") + 1)}{row_num}', 'values': [[result_dict.get('url_pruefstatus')]]})
|
||||
all_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)}{row_num}', 'values': [[now_timestamp]]})
|
||||
|
||||
except Exception as e_future:
|
||||
self.logger.error(f"Fehler beim Abrufen des Ergebnisses für Zeile {row_num}: {e_future}")
|
||||
|
||||
# --- Verarbeite den Batch, wenn voll ---
|
||||
if len(tasks_for_processing_batch) >= processing_batch_size or (i == effective_end_row and tasks_for_processing_batch):
|
||||
self.logger.debug(f"--- Starte Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks) ---")
|
||||
scraping_results = {}
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
|
||||
future_to_task = {executor.submit(self._scrape_raw_text_task, task, get_website_raw): task for task in tasks_for_processing_batch}
|
||||
for future in as_completed(future_to_task):
|
||||
try:
|
||||
result = future.result()
|
||||
scraping_results[result['row_num']] = result['raw_text']
|
||||
except Exception as exc:
|
||||
task = future_to_task[future]
|
||||
self.logger.error(f"Unerwarteter Fehler bei Ergebnisabfrage für Zeile {task['row_num']}: {exc}")
|
||||
scraping_results[task['row_num']] = "k.A. (Unerwarteter Fehler Task)"
|
||||
if all_updates:
|
||||
self.logger.info(f"Sende Batch-Update für {len(tasks)} verarbeitete Websites...")
|
||||
self.sheet_handler.batch_update_cells(all_updates)
|
||||
|
||||
if scraping_results:
|
||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
current_version = getattr(Config, 'VERSION', 'unknown')
|
||||
batch_sheet_updates = []
|
||||
for row_num, raw_text_res in scraping_results.items():
|
||||
# KORRIGIERT: Nutze die sichere `get_col_idx`-Funktion
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)}{row_num}', 'values': [[raw_text_res]]})
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)}{row_num}', 'values': [[current_timestamp]]})
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(get_col_idx("Version") + 1)}{row_num}', 'values': [[current_version]]})
|
||||
all_sheet_updates.extend(batch_sheet_updates)
|
||||
|
||||
tasks_for_processing_batch = []
|
||||
|
||||
if len(all_sheet_updates) >= (update_batch_row_limit * 3): # 3 Updates pro Zeile
|
||||
self.logger.info(f"Sende gesammelte Sheet-Updates ({len(all_sheet_updates) // 3} Zeilen)...")
|
||||
self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||
all_sheet_updates = []
|
||||
|
||||
# --- Finale Sheet Updates senden ---
|
||||
if all_sheet_updates:
|
||||
self.logger.info(f"Sende FINALE gesammelte Sheet-Updates ({len(all_sheet_updates) // 3} Zeilen)...")
|
||||
self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||
|
||||
self.logger.info(f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen zur Verarbeitung ausgewählt.")
|
||||
self.logger.info(f"Website-Verarbeitung (Batch) abgeschlossen. {len(tasks)} Zeilen verarbeitet.")
|
||||
|
||||
def process_website_scraping(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user