großes rework, vieles gelöscht
- Refactors the website scraping batch process to fix critical stability issues. - Replaces multiple redundant and conflicting scraping functions (`_scrape_website_task`, `_scrape_raw_text_task`, `_scrape_and_summarize_task`) with a single, robust worker function: `_scrape_website_task_batch`. - The new worker function now consistently returns a structured dictionary, resolving the `TypeError` that prevented results from being written to the sheet. - The main batch function `process_website_scraping_batch` is updated to correctly handle this new dictionary structure, including error states. - Functionality is now aligned with the single-row processing mode by also fetching meta-details in the batch process, not just raw text. - The two large, duplicated, and now obsolete `process_website_scraping` functions have been removed to improve code clarity and maintainability.
This commit is contained in:
@@ -812,59 +812,67 @@ class DataProcessor:
|
||||
# === Prozess Methoden (Sequentiell & Re-Evaluation) =====================
|
||||
# ==========================================================================
|
||||
|
||||
def _scrape_and_summarize_task(self, task_info):
|
||||
|
||||
def _scrape_website_task_batch(self, task_info):
|
||||
"""
|
||||
Interne Worker-Funktion für paralleles Scraping und Summarizing.
|
||||
Gibt IMMER ein Dictionary mit allen relevanten Website-Daten zurück.
|
||||
Robuste Worker-Funktion für das parallele Scrapen von Websites im Batch-Modus.
|
||||
Diese Funktion holt Rohtext sowie Meta-Details und gibt IMMER ein strukturiertes
|
||||
Dictionary zurück, um eine konsistente Verarbeitung im Hauptthread zu gewährleisten.
|
||||
Sie kapselt die Fehlerlogik, die ursprünglich in `get_website_raw` lag.
|
||||
"""
|
||||
row_num = task_info['row_num']
|
||||
company_name = task_info['company_name']
|
||||
website_url = task_info['url']
|
||||
self.logger.debug(f" -> Scrape-Task gestartet für Zeile {row_num}: {website_url}")
|
||||
url = task_info['url']
|
||||
company_name = task_info.get('company_name', 'einem Unternehmen')
|
||||
self.logger.debug(f" -> Batch-Scrape-Task gestartet für Zeile {row_num}: {url}")
|
||||
|
||||
result = {
|
||||
'raw_text': 'k.A.',
|
||||
'meta_text': 'k.A.',
|
||||
'summary': 'k.A.',
|
||||
'url_pruefstatus': 'URL_UNPROCESSED',
|
||||
'final_url': website_url # Behalte die ursprüngliche URL für den Fall eines SERP-Lookups
|
||||
'row_num': row_num,
|
||||
'raw_text': 'k.A. (Fehler im Task)',
|
||||
'meta_details': 'k.A. (Fehler im Task)',
|
||||
'error': True,
|
||||
'status_message': 'Unbekannter Task-Fehler'
|
||||
}
|
||||
|
||||
try:
|
||||
# 1. SERP-Lookup, falls keine URL vorhanden ist
|
||||
if not website_url or website_url.lower() == 'k.a.':
|
||||
found_url = serp_website_lookup(company_name)
|
||||
if found_url and 'k.a.' not in found_url.lower():
|
||||
website_url = found_url
|
||||
result['final_url'] = found_url
|
||||
result['url_pruefstatus'] = "URL_OK_SERP"
|
||||
else:
|
||||
result['url_pruefstatus'] = "URL_SERP_FAILED"
|
||||
return result # Beende hier, wenn keine URL gefunden wurde
|
||||
# 1. Rohtext abrufen (get_website_raw aus helpers.py)
|
||||
raw_text_result = get_website_raw(url)
|
||||
|
||||
# 2. Scrape Rohtext
|
||||
raw_text = get_website_raw(website_url)
|
||||
result['raw_text'] = raw_text
|
||||
|
||||
# 3. Bewerte das Ergebnis des Scrapings
|
||||
if raw_text == URL_CHECK_MARKER:
|
||||
result['url_pruefstatus'] = URL_CHECK_MARKER
|
||||
elif raw_text and 'k.a.' not in raw_text.lower():
|
||||
result['url_pruefstatus'] = "URL_OK_SCRAPED"
|
||||
# 4. Scrape Meta-Daten und erstelle Zusammenfassung nur bei Erfolg
|
||||
result['meta_text'] = scrape_website_details(website_url) or 'k.A.'
|
||||
result['summary'] = summarize_website_content(raw_text, company_name) or 'k.A.'
|
||||
else:
|
||||
result['url_pruefstatus'] = "URL_SCRAPE_EMPTY_OR_BANNER"
|
||||
# 2. Ergebnis des Rohtext-Abrufs auswerten
|
||||
if raw_text_result and not str(raw_text_result).strip().lower().startswith('k.a.'):
|
||||
result['raw_text'] = raw_text_result
|
||||
result['error'] = False
|
||||
result['status_message'] = 'Erfolgreich gescraped'
|
||||
|
||||
# 3. Bei Erfolg auch Meta-Details abrufen
|
||||
meta_details_result = scrape_website_details(url)
|
||||
result['meta_details'] = meta_details_result if meta_details_result else "k.A. (Keine Meta-Details)"
|
||||
|
||||
# 4. Spezifische Fehler-Strings von get_website_raw behandeln
|
||||
elif str(raw_text_result).strip().lower().startswith('k.a.'):
|
||||
result['raw_text'] = raw_text_result # Fehlerstring übernehmen
|
||||
result['meta_details'] = "k.A."
|
||||
result['error'] = True
|
||||
# Extrahiere den Grund aus dem String, z.B. "Timeout"
|
||||
match = re.search(r'\((.*?)\)', raw_text_result)
|
||||
result['status_message'] = match.group(1) if match else "Scraping fehlgeschlagen"
|
||||
|
||||
# 5. Fallback für unerwartete leere Ergebnisse
|
||||
else:
|
||||
result['raw_text'] = 'k.A. (Extraktion leer)'
|
||||
result['meta_details'] = 'k.A.'
|
||||
result['error'] = True
|
||||
result['status_message'] = 'Extraktion lieferte leeren Text'
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num}: {e}")
|
||||
result['raw_text'] = f"FEHLER: {type(e).__name__}"
|
||||
result['url_pruefstatus'] = "URL_SCRAPE_ERROR"
|
||||
self.logger.error(f" -> Kritischer Fehler im Worker-Task `_scrape_website_task_batch` für Zeile {row_num}: {e}")
|
||||
result['status_message'] = f"Kritischer Task-Fehler: {type(e).__name__}"
|
||||
# Das `result` Dictionary wird mit den initialen Fehlerwerten zurückgegeben.
|
||||
return result
|
||||
|
||||
|
||||
|
||||
def process_rows_sequentially(
|
||||
self,
|
||||
start_sheet_row,
|
||||
@@ -2049,591 +2057,6 @@ class DataProcessor:
|
||||
self.logger.info(
|
||||
f"Wikipedia-Verifizierungs-Batch abgeschlossen. {processed_count} Zeilen verarbeitet (in Batch aufgenommen), {skipped_count} Zeilen uebersprungen ({skipped_no_wiki_url} wegen fehlender M-URL).") # <<< GEÄNDERT
|
||||
|
||||
def _scrape_website_task(self, task_info):
|
||||
"""
|
||||
Worker-Funktion für das parallele Scrapen von Websites.
|
||||
Ruft die "gehärteten" Helper-Funktionen auf und gibt IMMER ein Dictionary zurück.
|
||||
"""
|
||||
url = task_info.get('url')
|
||||
row_num = task_info.get('row_num')
|
||||
self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num}: {url}")
|
||||
|
||||
# Rufe die gehärteten Helper-Funktionen auf.
|
||||
# Diese geben garantiert immer einen String zurück.
|
||||
raw_text_result = get_website_raw(url)
|
||||
meta_details_result = scrape_website_details(url)
|
||||
|
||||
# Gib immer ein Dictionary zurück, um den AttributeError im Hauptthread zu vermeiden.
|
||||
return {
|
||||
'raw_text': raw_text_result,
|
||||
'meta_details': meta_details_result
|
||||
}
|
||||
|
||||
|
||||
def process_website_scraping_batch(self, start_sheet_row=None, end_sheet_row=None, limit=None):
|
||||
"""
|
||||
Batch-Prozess NUR für Website-Scraping (Rohtext). Basiert auf der
|
||||
bewährten Logik aus v1.7.9, angepasst an die neue modulare Struktur und fehlerbereinigt.
|
||||
"""
|
||||
self.logger.info(f"Starte Website-Scraping (Batch). Bereich: {start_sheet_row or 'Start'}-{end_sheet_row or 'Ende'}, Limit: {limit or 'Unbegrenzt'}")
|
||||
|
||||
# --- Daten laden und Startzeile ermitteln ---
|
||||
if start_sheet_row is None:
|
||||
self.logger.info("Automatische Ermittlung der Startzeile basierend auf leeren Timestamp...")
|
||||
start_data_idx = self.sheet_handler.get_start_row_index(check_column_key="Website Scrape Timestamp")
|
||||
if start_data_idx == -1:
|
||||
self.logger.error("FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.")
|
||||
return
|
||||
start_sheet_row = start_data_idx + self.sheet_handler._header_rows + 1
|
||||
self.logger.info(f"Automatisch ermittelte Startzeile: {start_sheet_row}")
|
||||
|
||||
if not self.sheet_handler.load_data():
|
||||
self.logger.error("FEHLER beim Laden der Daten für Batch-Verarbeitung.")
|
||||
return
|
||||
|
||||
all_data = self.sheet_handler.get_all_data_with_headers()
|
||||
header_rows = self.sheet_handler._header_rows
|
||||
total_sheet_rows = len(all_data)
|
||||
effective_end_row = end_sheet_row if end_sheet_row is not None else total_sheet_rows
|
||||
|
||||
self.logger.info(f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {effective_end_row}.")
|
||||
if start_sheet_row > effective_end_row:
|
||||
self.logger.info("Start liegt nach dem Ende. Keine Zeilen zu verarbeiten.")
|
||||
return
|
||||
|
||||
# --- Indizes und Buchstaben ---
|
||||
rohtext_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Rohtext") + 1)
|
||||
version_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Version") + 1)
|
||||
timestamp_col_letter = self.sheet_handler._get_col_letter(get_col_idx("Website Scrape Timestamp") + 1)
|
||||
|
||||
# --- Hauptlogik: Iteriere und sammle Batches ---
|
||||
processing_batch_size = getattr(Config, 'PROCESSING_BATCH_SIZE', 20)
|
||||
max_scraping_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10)
|
||||
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50)
|
||||
|
||||
tasks_for_processing_batch = []
|
||||
all_sheet_updates = []
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
for i in range(start_sheet_row, effective_end_row + 1):
|
||||
row_index_in_list = i - 1
|
||||
if row_index_in_list >= total_sheet_rows: break
|
||||
|
||||
row = all_data[row_index_in_list]
|
||||
if not any(cell and str(cell).strip() for cell in row):
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
if self._needs_website_processing(row, force_reeval=False):
|
||||
website_url = self._get_cell_value_safe(row, "CRM Website").strip()
|
||||
if website_url and website_url.lower() not in ["k.a.", "http:"]:
|
||||
if limit is not None and processed_count >= limit:
|
||||
self.logger.info(f"Verarbeitungslimit ({limit}) erreicht.")
|
||||
break
|
||||
|
||||
# WICHTIG: row_num muss 1-basiert sein für die Ausgabe
|
||||
tasks_for_processing_batch.append({"row_num": i, "url": website_url})
|
||||
processed_count += 1
|
||||
else:
|
||||
skipped_count += 1
|
||||
else:
|
||||
skipped_count += 1
|
||||
|
||||
if len(tasks_for_processing_batch) >= processing_batch_size or (i == effective_end_row and tasks_for_processing_batch):
|
||||
self.logger.debug(f"--- Starte Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks) ---")
|
||||
scraping_results = {}
|
||||
batch_error_count = 0
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
|
||||
future_to_task = {executor.submit(self._scrape_raw_text_task, task, get_website_raw): task for task in tasks_for_processing_batch}
|
||||
for future in as_completed(future_to_task):
|
||||
try:
|
||||
result_dict = future.result()
|
||||
if isinstance(result_dict, dict):
|
||||
scraping_results[result_dict['row_num']] = result_dict['raw_text']
|
||||
if result_dict.get('error'):
|
||||
batch_error_count += 1
|
||||
else:
|
||||
task = future_to_task[future]
|
||||
self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {task['row_num']}: Erwartete dict, bekam {type(result_dict)}. Überspringe.")
|
||||
scraping_results[task['row_num']] = "FEHLER (Inkonsistenter Rückgabetyp)"
|
||||
batch_error_count += 1
|
||||
except Exception as exc:
|
||||
task = future_to_task[future]
|
||||
self.logger.error(f"Unerwarteter Fehler bei Ergebnisabfrage für Zeile {task['row_num']}: {exc}")
|
||||
scraping_results[task['row_num']] = "FEHLER (Task Exception)"
|
||||
batch_error_count += 1
|
||||
|
||||
self.logger.debug(f" Scraping für Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler).")
|
||||
|
||||
if scraping_results:
|
||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
current_version = getattr(Config, 'VERSION', 'unknown')
|
||||
for row_num, raw_text_res in scraping_results.items():
|
||||
all_sheet_updates.append({'range': f'{rohtext_col_letter}{row_num}', 'values': [[raw_text_res]]})
|
||||
all_sheet_updates.append({'range': f'{timestamp_col_letter}{row_num}', 'values': [[current_timestamp]]})
|
||||
all_sheet_updates.append({'range': f'{version_col_letter}{row_num}', 'values': [[current_version]]})
|
||||
|
||||
tasks_for_processing_batch = []
|
||||
|
||||
if len(all_sheet_updates) >= (update_batch_row_limit * 3):
|
||||
self.logger.info(f"Sende gesammelte Sheet-Updates ({len(all_sheet_updates) // 3} Zeilen)...")
|
||||
self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||
all_sheet_updates = []
|
||||
|
||||
if all_sheet_updates:
|
||||
self.logger.info(f"Sende FINALE gesammelte Sheet-Updates ({len(all_sheet_updates) // 3} Zeilen)...")
|
||||
self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||
|
||||
self.logger.info(f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen zur Verarbeitung ausgewählt, {skipped_count} Zeilen übersprungen.")
|
||||
|
||||
def process_website_scraping(
|
||||
self,
|
||||
start_sheet_row=None,
|
||||
end_sheet_row=None,
|
||||
limit=None):
|
||||
"""
|
||||
Batch-Prozess NUR fuer Website-Scraping.
|
||||
"""
|
||||
self.logger.info(
|
||||
f"Starte Website-Scraping (Batch). Bereich: {start_sheet_row if start_sheet_row else 'Start'}-{end_sheet_row if end_sheet_row else 'Ende'}, Limit: {limit if limit else 'Unbegrenzt'}")
|
||||
if start_sheet_row is None:
|
||||
start_data_index = self.sheet_handler.get_start_row_index(
|
||||
check_column_key="Website Scrape Timestamp")
|
||||
if start_data_index == -1:
|
||||
return
|
||||
start_sheet_row = start_data_index + self.sheet_handler._header_rows + 1
|
||||
else:
|
||||
if not self.sheet_handler.load_data():
|
||||
return
|
||||
"""
|
||||
Batch-Prozess NUR fuer Website-Scraping (Rohtext AR).
|
||||
Laedt Daten neu, prueft Spalte AR auf Inhalt ('', 'k.A.', etc.) und ueberspringt Zeilen mit Inhalt.
|
||||
Setzt AR + AT + AP fuer bearbeitete Zeilen. Sendet Updates gebuendelt.
|
||||
|
||||
Args:
|
||||
start_sheet_row (int, optional): Die 1-basierte Startzeile im Sheet. Defaults to None (automatische Ermittlung basierend auf leeren AT).
|
||||
end_sheet_row (int, optional): Die 1-basierte Endzeile im Sheet. Defaults to None (bis Ende Sheet).
|
||||
limit (int, optional): Maximale Anzahl ZU VERARBEITENDER (nicht uebersprungener) Zeilen. Defaults to None (Unbegrenzt).
|
||||
"""
|
||||
# Verwenden Sie logger, da das Logging jetzt konfiguriert ist
|
||||
# Logge die Konfiguration des Batch-Laufs
|
||||
self.logger.info(
|
||||
f"Starte Website-Scraping (Batch AR, AT, AP). Bereich: {start_sheet_row if start_sheet_row is not None else 'Start'}-{end_sheet_row if end_sheet_row is not None else 'Ende'}, Limit: {limit if limit is not None else 'Unbegrenzt'}...") # <<< GEÄNDERT
|
||||
|
||||
# --- Daten laden und Startzeile ermitteln ---
|
||||
# Automatische Ermittlung der Startzeile, wenn nicht manuell gesetzt
|
||||
if start_sheet_row is None:
|
||||
self.logger.info(
|
||||
"Automatische Ermittlung der Startzeile basierend auf leeren AT...") # <<< GEÄNDERT
|
||||
# Nutzt get_start_row_index des Sheet Handlers (Block 14). Prueft auf leeren AT (Block 1 Column Map).
|
||||
# Standardmaessig ab Zeile 7
|
||||
start_data_index_no_header = self.sheet_handler.get_start_row_index(
|
||||
check_column_key="Website Scrape Timestamp", min_sheet_row=7)
|
||||
|
||||
# Wenn get_start_row_index -1 zurueckgibt (Fehler)
|
||||
if start_data_index_no_header == -1:
|
||||
self.logger.error(
|
||||
"FEHLER bei automatischer Ermittlung der Startzeile. Breche Batch ab.") # <<< GEÄNDERT
|
||||
return # Beende die Methode
|
||||
|
||||
# Berechne die 1-basierte Sheet-Startzeile aus dem 0-basierten
|
||||
# Daten-Index
|
||||
start_sheet_row = start_data_index_no_header + \
|
||||
self.sheet_handler._header_rows + 1 # Block 14 SheetHandler Attribut
|
||||
self.logger.info(
|
||||
f"Automatisch ermittelte Startzeile (erste leere AT Zelle): {start_sheet_row}") # <<< GEÄNDERT
|
||||
else:
|
||||
# Wenn start_sheet_row manuell gesetzt wurde, laden Sie die Daten trotzdem neu, um aktuell zu sein.
|
||||
# Der load_data Aufruf ist mit retry_on_failure dekoriert (Block
|
||||
# 2).
|
||||
if not self.sheet_handler.load_data():
|
||||
self.logger.error(
|
||||
"FEHLER beim Laden der Daten fuer process_website_scraping_batch.") # <<< GEÄNDERT
|
||||
return # Beende die Methode, wenn das Laden fehlschlaegt
|
||||
|
||||
# Holen Sie die gesamte Datenliste (inklusive Header) aus dem
|
||||
# SheetHandler.
|
||||
all_data = self.sheet_handler.get_all_data_with_headers()
|
||||
# Annahme: header_rows ist als Attribut im SheetHandler verfuegbar
|
||||
# (Block 14).
|
||||
header_rows = self.sheet_handler._header_rows
|
||||
total_sheet_rows = len(all_data) # Gesamtzahl der Zeilen im Sheet
|
||||
|
||||
# Berechne Endzeile, wenn nicht manuell gesetzt
|
||||
if end_sheet_row is None:
|
||||
end_sheet_row = total_sheet_rows # Bis zur letzten Zeile
|
||||
|
||||
# Logge den verarbeitungsbereich
|
||||
self.logger.info(
|
||||
f"Verarbeitungsbereich: Sheet-Zeilen {start_sheet_row} bis {end_sheet_row}. Gesamtzeilen im Sheet: {total_sheet_rows}") # <<< GEÄNDERT
|
||||
|
||||
# Pruefe, ob der Bereich gueltig ist (Start <= Ende und Start nicht
|
||||
# ueber Gesamtzeilen)
|
||||
if start_sheet_row > end_sheet_row or start_sheet_row > total_sheet_rows:
|
||||
self.logger.info(
|
||||
"Berechneter Start liegt nach dem Ende des Bereichs oder Sheets. Keine Zeilen zu verarbeiten.") # <<< GEÄNDERT
|
||||
return # Beende die Methode, wenn der Bereich leer ist
|
||||
|
||||
# --- Indizes und Buchstaben ---
|
||||
# Stellen Sie sicher, dass alle benoetigten Spalten in COLUMN_MAP
|
||||
# (Block 1) vorhanden sind
|
||||
required_keys = [
|
||||
"Website Rohtext", "CRM Website", "Version", "Website Scrape Timestamp", "CRM Name", "Website Meta-Details"
|
||||
]
|
||||
# Erstellen Sie ein Dictionary mit Schluesseln und den korrekten Indizes
|
||||
col_indices = {key: COLUMN_MAP.get(key, {}).get('index') for key in required_keys}
|
||||
|
||||
# Pruefen Sie, ob alle benoetigten Schluessel in COLUMN_MAP gefunden
|
||||
# wurden
|
||||
if None in col_indices.values():
|
||||
missing = [k for k, v in col_indices.items() if v is None]
|
||||
self.logger.critical(
|
||||
f"FEHLER: Benoetigte Spaltenschluessel fehlen in COLUMN_MAP fuer process_website_scraping_batch: {missing}. Breche ab.") # <<< GEÄNDERT
|
||||
return # Beende die Methode bei kritischem Fehler
|
||||
|
||||
# Ermitteln Sie die Indizes und Buchstaben fuer Updates (AR, AT, AP)
|
||||
rohtext_col_idx = col_indices.get("Website Rohtext")
|
||||
website_col_idx = col_indices.get("CRM Website")
|
||||
version_col_idx = col_indices.get("Version")
|
||||
timestamp_col_idx = col_indices.get("Website Scrape Timestamp")
|
||||
name_col_idx = col_indices.get("CRM Name")
|
||||
|
||||
rohtext_col_letter = self.sheet_handler._get_col_letter(
|
||||
rohtext_col_idx + 1) # Block 14 _get_col_letter
|
||||
version_col_letter = self.sheet_handler._get_col_letter(
|
||||
version_col_idx + 1)
|
||||
timestamp_col_letter = self.sheet_handler._get_col_letter(
|
||||
timestamp_col_idx + 1)
|
||||
|
||||
# --- Hauptlogik: Iteriere und sammle Batches ---
|
||||
# Holen Sie die Batch-Groesse fuer Verarbeitung (Threading) aus Config
|
||||
# (Block 1)
|
||||
processing_batch_size = getattr(Config, 'PROCESSING_BATCH_SIZE', 20)
|
||||
# Holen Sie die maximale Anzahl Worker aus Config (Block 1)
|
||||
max_scraping_workers = getattr(Config, 'MAX_SCRAPING_WORKERS', 10)
|
||||
# Holen Sie die Batch-Groesse fuer Sheet-Updates aus Config (Block 1)
|
||||
update_batch_row_limit = getattr(Config, 'UPDATE_BATCH_ROW_LIMIT', 50)
|
||||
|
||||
# Tasks fuer den aktuellen Scraping-Batch (Liste von Dicts)
|
||||
tasks_for_processing_batch = []
|
||||
# 1-basierte Zeilennummern im aktuellen Batch
|
||||
rows_in_current_scraping_batch = []
|
||||
# Gesammelte Updates fuer Batch-Schreiben ins Sheet (Liste von Dicts)
|
||||
all_sheet_updates = []
|
||||
|
||||
# Zaehlt Zeilen, die fuer die Verarbeitung in Frage kommen und in den
|
||||
# Batch aufgenommen werden (im Rahmen des Limits).
|
||||
processed_count = 0
|
||||
# Zaehlt Zeilen, die uebersprungen wurden (wegen Inhalt oder fehlender
|
||||
# URL).
|
||||
skipped_count = 0
|
||||
# Zaehlt Zeilen, die speziell wegen fehlender URL uebersprungen wurden.
|
||||
skipped_no_url = 0
|
||||
|
||||
# Iteriere ueber die Sheet-Zeilen im definierten Bereich (1-basierte
|
||||
# Sheet-Zeilennummer)
|
||||
for i in range(start_sheet_row, end_sheet_row + 1):
|
||||
row_index_in_list = i - 1 # 0-basierter Index in der all_data Liste
|
||||
# Pruefen Sie, ob das Ende des Sheets erreicht wurde
|
||||
if row_index_in_list >= total_sheet_rows:
|
||||
break # Ende des Sheets erreicht
|
||||
|
||||
row = all_data[row_index_in_list] # Die Rohdaten fuer diese Zeile
|
||||
|
||||
# Stellen Sie sicher, dass die Zeile nicht leer ist
|
||||
if not any(cell and isinstance(cell, str) and cell.strip()
|
||||
for cell in row):
|
||||
# self.logger.debug(f"Zeile {i}: Uebersprungen (Leere Zeile).")
|
||||
# # Zu viel Laerm im Debug
|
||||
skipped_count += 1 # Zaehlen als uebersprungen
|
||||
continue # Springe zur naechsten Zeile
|
||||
|
||||
# --- Pruefung, ob Verarbeitung fuer diese Zeile noetig ist ---
|
||||
# Kriterium: Website Rohtext (AR) ist leer oder ein Standard-Fehlerwert.
|
||||
# UND Website URL (D) ist vorhanden und gueltig aussehend.
|
||||
|
||||
# Holen Sie den Wert aus Spalte AR (Website Rohtext) (nutzt interne
|
||||
# Helfer _get_cell_value_safe)
|
||||
cell_value_ar = self._get_cell_value_safe(
|
||||
row, "Website Rohtext") # Block 1 Column Map
|
||||
# Pruefen Sie, ob AR leer ist oder einen Standard-Fehlerwert
|
||||
# enthaelt.
|
||||
ar_is_empty_or_default = not cell_value_ar or (
|
||||
isinstance(
|
||||
cell_value_ar,
|
||||
str) and str(cell_value_ar).strip().lower() in [
|
||||
"k.a.",
|
||||
"k.a. (nur cookie-banner erkannt)",
|
||||
"k.a. (fehler)"])
|
||||
|
||||
# Holen Sie den Wert aus Spalte D (CRM Website) (nutzt interne
|
||||
# Helfer _get_cell_value_safe)
|
||||
website_url = self._get_cell_value_safe(
|
||||
row, "CRM Website").strip() # Block 1 Column Map
|
||||
# Pruefen Sie, ob die Website URL (D) vorhanden und gueltig
|
||||
# aussehend ist.
|
||||
website_url_is_valid_looking = website_url and isinstance(
|
||||
website_url,
|
||||
str) and website_url.lower() not in [
|
||||
"k.a.",
|
||||
"kein artikel gefunden",
|
||||
"fehler bei suche",
|
||||
"http:"] # Fuege "http:" hinzu basierend auf Log
|
||||
|
||||
# Verarbeitung ist noetig, wenn AR leer/default ist UND D
|
||||
# gefuellt/gueltig aussieht.
|
||||
processing_needed_for_row = ar_is_empty_or_default and website_url_is_valid_looking
|
||||
|
||||
# Loggen der Pruefergebnisse fuer diese Zeile auf Debug-Level
|
||||
log_check = (
|
||||
i < start_sheet_row +
|
||||
5) or (
|
||||
i %
|
||||
100 == 0) or (processing_needed_for_row)
|
||||
if log_check:
|
||||
company_name = self._get_cell_value_safe(
|
||||
row, "CRM Name").strip() # Block 1 Column Map
|
||||
self.logger.debug(
|
||||
f"Zeile {i} ({company_name[:50]}... Website Scraping Check): AR leer/default? {ar_is_empty_or_default}, D gueltig? {website_url_is_valid_looking}. Benötigt Verarbeitung? {processing_needed_for_row}") # <<< GEÄNDERT
|
||||
|
||||
# Wenn die Verarbeitung fuer diese Zeile nicht noetig ist
|
||||
if not processing_needed_for_row:
|
||||
skipped_count += 1 # Zaehlen als uebersprungene Zeile
|
||||
# Zaehlen Sie speziell, wenn die Zeile wegen fehlender
|
||||
# gueltiger URL uebersprungen wurde.
|
||||
if not website_url_is_valid_looking:
|
||||
skipped_no_url += 1
|
||||
continue # Springe zur naechsten Zeile
|
||||
|
||||
# --- Wenn Verarbeitung noetig: Fuege zur Batch-Liste hinzu ---
|
||||
# Zaehle die Zeile, die fuer die Verarbeitung in Frage kommt (im
|
||||
# Rahmen des Limits zaehlen)
|
||||
processed_count += 1
|
||||
|
||||
# Pruefe das Limit fuer verarbeitete Zeilen
|
||||
if limit is not None and isinstance(
|
||||
limit, int) and limit > 0 and processed_count > limit:
|
||||
# Wenn das Limit erreicht ist und es ein positives Limit gibt
|
||||
self.logger.info(
|
||||
f"Verarbeitungslimit ({limit}) fuer process_website_scraping_batch erreicht. Breche weitere Zeilenpruefung ab.") # <<< GEÄNDERT
|
||||
break # Brich die Schleife ab
|
||||
|
||||
# Fuege die benoetigten Daten fuer den Task hinzu (Zeilennummer und
|
||||
# URL)
|
||||
tasks_for_processing_batch.append(
|
||||
{"row_num": i, "url": website_url})
|
||||
# Fuege die Zeilennummer zur Liste der Zeilennummern im Batch hinzu
|
||||
rows_in_current_scraping_batch.append(i)
|
||||
|
||||
# --- Verarbeite den Batch, wenn voll ---
|
||||
# Pruefe, ob die aktuelle Batch-Liste die definierte Groesse erreicht hat.
|
||||
# scraping_batch_size wird aus Config geholt (Block 1).
|
||||
if len(tasks_for_processing_batch) >= processing_batch_size:
|
||||
# Logge den Start der Batch-Verarbeitung
|
||||
batch_start_row = tasks_for_processing_batch[0]['row_num']
|
||||
batch_end_row = tasks_for_processing_batch[-1]['row_num']
|
||||
self.logger.debug(
|
||||
f"\n--- Starte Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---") # <<< GEÄNDERT
|
||||
|
||||
scraping_results = {} # Dictionary zum Speichern der Ergebnisse {row_num: raw_text}
|
||||
batch_error_count = 0 # Fehlerzaehler fuer diesen spezifischen Batch
|
||||
|
||||
self.logger.debug(
|
||||
f" Scrape {len(tasks_for_processing_batch)} Websites parallel (max {max_scraping_workers} worker)...") # <<< GEÄNDERT
|
||||
# Nutzt concurrent.futures.ThreadPoolExecutor fuer paralleles Scraping.
|
||||
# max_workers wird aus Config geholt (Block 1).
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
|
||||
# Map tasks to futures. Ruft die INTERNE Worker-Funktion auf.
|
||||
# Uebergibt das task_info Dictionary und die globale
|
||||
# Funktion get_website_raw (Block 11) als Argument.
|
||||
future_to_task = {
|
||||
executor.submit(
|
||||
self._scrape_raw_text_task,
|
||||
task,
|
||||
get_website_raw): task for task in tasks_for_processing_batch} # <<< Korrigiert: interne Methode
|
||||
|
||||
# Verarbeite die Ergebnisse, sobald sie fertig sind.
|
||||
for future in concurrent.futures.as_completed(
|
||||
future_to_task):
|
||||
# Holen Sie die urspruenglichen Task-Daten (Dict)
|
||||
task = future_to_task[future]
|
||||
try:
|
||||
# Holen Sie das Ergebnis vom Future. Wenn die
|
||||
# Worker-Funktion eine Exception wirft, wird diese
|
||||
# hier gefangen.
|
||||
# Ergebnis ist ein Dictionary {'row_num': ...,
|
||||
# 'raw_text': ..., 'error': ...}
|
||||
result = future.result()
|
||||
# Speichere das Ergebnis im scraping_results
|
||||
# Dictionary
|
||||
scraping_results[result['row_num']
|
||||
] = result['raw_text']
|
||||
# Wenn der Worker einen Fehler gemeldet hat (z.B.
|
||||
# durch Fehlerstring im raw_text oder error-Feld)
|
||||
if result.get('error'):
|
||||
batch_error_count += 1 # Erhoehe den Fehlerzaehler fuer diesen Batch
|
||||
|
||||
except Exception as exc:
|
||||
# Dieser Block faengt unerwartete Fehler ab, die waehrend der Future-Ergebnis-Abfrage auftreten.
|
||||
# Die meisten Fehler sollten von get_website_raws
|
||||
# retry/logging behandelt werden.
|
||||
# Zeilennummer aus den Task-Daten
|
||||
row_num = task['row_num']
|
||||
# Gekuerzt loggen
|
||||
err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage Scraping Task Zeile {row_num} ({task['url'][:100]}): {type(exc).__name__} - {exc}"
|
||||
self.logger.error(err_msg) # <<< GEÄNDERT
|
||||
# Setze einen Standard-Fehlerwert fuer diese Zeile
|
||||
# im Ergebnis
|
||||
scraping_results[row_num] = "k.A. (Unerwarteter Fehler Task)"
|
||||
batch_error_count += 1 # Erhoehe den Fehlerzaehler
|
||||
|
||||
self.logger.debug(
|
||||
f" Scraping fuer Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler in diesem Batch).") # <<< GEÄNDERT
|
||||
|
||||
# Sammle Sheet Updates (AR, AT, AP) fuer diesen Batch.
|
||||
# Dies geschieht jetzt nach der parallelen Verarbeitung.
|
||||
if scraping_results:
|
||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
current_version = getattr(Config, 'VERSION', 'unknown')
|
||||
batch_sheet_updates = []
|
||||
# Iteriere über die Ergebnisse des finalen Batches
|
||||
for row_num, result_dict in scraping_results.items():
|
||||
# Sicherheitsprüfung: Stelle sicher, dass result_dict ein Dictionary ist.
|
||||
if not isinstance(result_dict, dict):
|
||||
self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {row_num}: Erwartete dict, bekam {type(result_dict)}. Überspringe Update für diese Zeile.")
|
||||
# Setze nur den Timestamp, um eine Endlosschleife zu verhindern
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Scrape Timestamp"] + 1)}{row_num}', 'values': [[current_timestamp]]})
|
||||
continue
|
||||
|
||||
# result_dict ist jetzt garantiert ein Dictionary
|
||||
batch_sheet_updates.extend([
|
||||
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Rohtext"] + 1)}{row_num}', 'values': [[result_dict.get('raw_text', 'k.A.')]]},
|
||||
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Meta-Details"] + 1)}{row_num}', 'values': [[result_dict.get('meta_details', 'k.A.')]]},
|
||||
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Scrape Timestamp"] + 1)}{row_num}', 'values': [[current_timestamp]]},
|
||||
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Version"] + 1)}{row_num}', 'values': [[current_version]]}
|
||||
])
|
||||
|
||||
all_sheet_updates.extend(batch_sheet_updates)
|
||||
|
||||
# Leere den Scraping-Batch fuer die naechste Iteration
|
||||
tasks_for_processing_batch = []
|
||||
rows_in_current_scraping_batch = []
|
||||
|
||||
# Sende gesammelte Sheet Updates, wenn das Update-Batch-Limit erreicht ist.
|
||||
# Updates pro Zeile sind 3 (AR, AT, AP). Anzahl der Zeilen =
|
||||
# len(all_sheet_updates) / 3.
|
||||
rows_in_update_batch = len(
|
||||
all_sheet_updates) // 3 # Ganzzahl-Division
|
||||
|
||||
if rows_in_update_batch >= update_batch_row_limit:
|
||||
self.logger.debug(
|
||||
f" Sende gesammelte Sheet-Updates ({rows_in_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...") # <<< GEÄNDERT
|
||||
# Nutzt die batch_update_cells Methode des Sheet Handlers (Block 14) mit Retry.
|
||||
# Wenn es fehlschlaegt, wird es intern geloggt.
|
||||
success = self.sheet_handler.batch_update_cells(
|
||||
all_sheet_updates)
|
||||
if success:
|
||||
self.logger.info(
|
||||
f" Sheet-Update fuer {rows_in_update_batch} Zeilen erfolgreich.") # <<< GEÄNDERT
|
||||
# Der Fehlerfall wird von batch_update_cells geloggt
|
||||
|
||||
# Leere die gesammelten Updates nach dem Senden.
|
||||
all_sheet_updates = []
|
||||
# rows_in_update_batch muss nicht explizit zurueckgesetzt
|
||||
# werden, da es aus len(all_sheet_updates) berechnet wird.
|
||||
|
||||
# Keine Pause hier nach jedem kleinen Scraping-Batch, da wir auf batch_update warten.
|
||||
# Die Pause kommt erst nach dem Batch-Update (oder am Ende des Modus).
|
||||
# time.sleep(0.1) # Optionale kurze Pause
|
||||
|
||||
# --- Verarbeitung des letzten unvollstaendigen Scraping-Batches nach der Schleife ---
|
||||
if tasks_for_processing_batch:
|
||||
batch_start_row = tasks_for_processing_batch[0]['row_num']
|
||||
batch_end_row = tasks_for_processing_batch[-1]['row_num']
|
||||
self.logger.debug(
|
||||
f"\n--- Starte FINALEN Website-Scraping Batch ({len(tasks_for_processing_batch)} Tasks, Zeilen {batch_start_row}-{batch_end_row}) ---")
|
||||
|
||||
scraping_results = {}
|
||||
batch_error_count = 0
|
||||
|
||||
self.logger.debug(
|
||||
f" Scrape {len(tasks_for_processing_batch)} Websites parallel (max {max_scraping_workers} worker)...")
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_scraping_workers) as executor:
|
||||
future_to_task = {
|
||||
executor.submit(
|
||||
self._scrape_raw_text_task,
|
||||
task,
|
||||
get_website_raw): task for task in tasks_for_processing_batch}
|
||||
|
||||
for future in concurrent.futures.as_completed(future_to_task):
|
||||
task = future_to_task[future]
|
||||
try:
|
||||
result = future.result()
|
||||
# HINWEIS: Hier speichern wir das ganze dict, nicht nur den Text
|
||||
scraping_results[result['row_num']] = result
|
||||
if result.get('error'):
|
||||
batch_error_count += 1
|
||||
except Exception as exc:
|
||||
row_num = task['row_num']
|
||||
err_msg = f"Unerwarteter Fehler bei Ergebnisabfrage Scraping Task Zeile {row_num} ({task['url'][:100]}): {type(exc).__name__} - {exc}"
|
||||
self.logger.error(err_msg)
|
||||
scraping_results[row_num] = {"raw_text": "k.A. (Unerwarteter Fehler Task)", "meta_details": "k.A.", "error": True}
|
||||
batch_error_count += 1
|
||||
|
||||
self.logger.debug(
|
||||
f" FINALER Scraping Batch beendet. {len(scraping_results)} Ergebnisse erhalten ({batch_error_count} Fehler).")
|
||||
|
||||
if scraping_results:
|
||||
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
current_version = getattr(Config, 'VERSION', 'unknown')
|
||||
batch_sheet_updates = []
|
||||
|
||||
# ANPASSUNG AN NEUE LOGIK
|
||||
for row_num, result_dict in scraping_results.items():
|
||||
# Sicherheitsprüfung: Stelle sicher, dass result_dict ein Dictionary ist.
|
||||
if not isinstance(result_dict, dict):
|
||||
self.logger.error(f"Fehlerhaftes Ergebnis für Zeile {row_num}: Erwartete dict, bekam {type(result_dict)}. Überspringe Update für diese Zeile.")
|
||||
# Setze nur den Timestamp, um eine Endlosschleife zu verhindern
|
||||
batch_sheet_updates.append({'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Scrape Timestamp"] + 1)}{row_num}', 'values': [[current_timestamp]]})
|
||||
continue
|
||||
|
||||
# result_dict ist jetzt garantiert ein Dictionary
|
||||
batch_sheet_updates.extend([
|
||||
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Rohtext"] + 1)}{row_num}', 'values': [[result_dict.get('raw_text', 'k.A.')]]},
|
||||
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Meta-Details"] + 1)}{row_num}', 'values': [[result_dict.get('meta_details', 'k.A.')]]},
|
||||
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Website Scrape Timestamp"] + 1)}{row_num}', 'values': [[current_timestamp]]},
|
||||
{'range': f'{self.sheet_handler._get_col_letter(col_indices["Version"] + 1)}{row_num}', 'values': [[current_version]]}
|
||||
])
|
||||
|
||||
# --- Finale Sheet Updates senden ---
|
||||
if all_sheet_updates:
|
||||
rows_in_final_update_batch = len(all_sheet_updates) // 4 # 4 Updates pro Zeile
|
||||
self.logger.info(
|
||||
f"Sende FINALE gesammelte Sheet-Updates ({rows_in_final_update_batch} Zeilen, {len(all_sheet_updates)} Zellen)...")
|
||||
success = self.sheet_handler.batch_update_cells(all_sheet_updates)
|
||||
if success:
|
||||
self.logger.info("FINALES Sheet-Update erfolgreich.")
|
||||
|
||||
self.logger.info(
|
||||
f"Website-Scraping (Batch) abgeschlossen. {processed_count} Zeilen verarbeitet, {skipped_count} Zeilen uebersprungen.")
|
||||
|
||||
def _scrape_raw_text_task(self, task_info, scraper_function):
|
||||
"""
|
||||
Worker-Funktion für Threading. Gibt IMMER ein Dictionary zurück.
|
||||
"""
|
||||
row_num = task_info['row_num']
|
||||
url = task_info['url']
|
||||
self.logger.debug(f" -> Scrape Task gestartet für Zeile {row_num}: {url}")
|
||||
try:
|
||||
raw_text = scraper_function(url)
|
||||
is_error = "k.A." in raw_text or "FEHLER" in raw_text
|
||||
return {'row_num': row_num, 'raw_text': raw_text, 'error': is_error}
|
||||
except Exception as e:
|
||||
self.logger.error(f" -> Kritischer Fehler im Scrape-Task für Zeile {row_num}: {e}")
|
||||
return {'row_num': row_num, 'raw_text': f"FEHLER im Task: {e}", 'error': True}
|
||||
|
||||
|
||||
def process_summarization_batch(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user