- fixed Year-Prefix Bug in MetricParser - added metric_confidence and metric_proof_text to database - added Entity-Check and Annual-Priority to LLM prompt - improved UI: added confidence traffic light and mouse-over proof tooltip - restored missing API endpoints (create, bulk, wiki-override)
294 lines
13 KiB
Python
294 lines
13 KiB
Python
import logging
|
|
import requests
|
|
import random
|
|
import re
|
|
import json
|
|
from urllib.parse import urljoin, urlparse
|
|
from bs4 import BeautifulSoup
|
|
from typing import Optional, Dict
|
|
from ..lib.core_utils import clean_text, retry_on_failure, call_gemini_flash, clean_json_response
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
USER_AGENTS = [
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15',
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0'
|
|
]
|
|
|
|
class ScraperService:
|
|
def __init__(self, timeout: int = 15):
|
|
self.timeout = timeout
|
|
|
|
@retry_on_failure(max_retries=2)
|
|
def scrape_url(self, url: str) -> Dict[str, str]:
|
|
"""
|
|
Fetches a URL and returns cleaned text content + meta info.
|
|
Also attempts to find and scrape the Impressum (Imprint).
|
|
"""
|
|
if not url.startswith("http"):
|
|
url = "https://" + url
|
|
|
|
try:
|
|
headers = {'User-Agent': random.choice(USER_AGENTS)}
|
|
# verify=False is risky but often needed for poorly configured corporate sites
|
|
response = requests.get(url, headers=headers, timeout=self.timeout, verify=False)
|
|
response.raise_for_status()
|
|
|
|
# Check Content Type
|
|
logger.debug(f"Response status: {response.status_code}")
|
|
if response.headers is None:
|
|
logger.error("Response headers is None!")
|
|
return {"error": "No headers"}
|
|
|
|
content_type = response.headers.get('Content-Type', '').lower()
|
|
if 'text/html' not in content_type:
|
|
logger.warning(f"Skipping non-HTML content for {url}: {content_type}")
|
|
return {"error": "Not HTML"}
|
|
|
|
# Parse Main Page
|
|
try:
|
|
result = self._parse_html(response.content)
|
|
except Exception as e:
|
|
logger.error(f"Error in _parse_html: {e}", exc_info=True)
|
|
return {"error": f"Parse error: {e}"}
|
|
|
|
# --- IMPRESSUM LOGIC ---
|
|
try:
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
impressum_url = self._find_impressum_link(soup, url)
|
|
except Exception as e:
|
|
logger.error(f"Error finding impressum: {e}", exc_info=True)
|
|
impressum_url = None
|
|
|
|
# FALLBACK: If deep URL (e.g. /ueber-uns/) yielded no Impressum, try Root URL
|
|
if not impressum_url and url.count('/') > 3:
|
|
try:
|
|
parsed = urlparse(url)
|
|
root_url = f"{parsed.scheme}://{parsed.netloc}/"
|
|
logger.info(f"No Impressum on deep URL. Checking Root: {root_url}")
|
|
|
|
root_resp = requests.get(root_url, headers=headers, timeout=10, verify=False)
|
|
if root_resp.status_code == 200:
|
|
root_soup = BeautifulSoup(root_resp.content, 'html.parser')
|
|
impressum_url = self._find_impressum_link(root_soup, root_url)
|
|
except Exception as ex:
|
|
logger.warning(f"Root URL fallback failed: {ex}")
|
|
|
|
if impressum_url:
|
|
logger.info(f"Found Impressum URL: {impressum_url}")
|
|
impressum_data = self._scrape_impressum_data(impressum_url)
|
|
result["impressum"] = impressum_data
|
|
else:
|
|
logger.info(f"No Impressum link found for {url}")
|
|
result["impressum"] = None
|
|
|
|
return result
|
|
|
|
except requests.exceptions.SSLError:
|
|
# Retry with HTTP if HTTPS fails
|
|
if url.startswith("https://"):
|
|
logger.info(f"SSL failed for {url}, retrying with http://...")
|
|
return self.scrape_url(url.replace("https://", "http://"))
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Scraping failed for {url}: {e}")
|
|
return {"error": str(e)}
|
|
|
|
def _find_impressum_link(self, soup: BeautifulSoup, base_url: str) -> Optional[str]:
|
|
"""
|
|
Scans links for Impressum. If not found, tries to find 'Kontakt' page and looks there.
|
|
"""
|
|
# 1. Try Direct Impressum Link
|
|
direct_url = self._find_link_by_keywords(soup, base_url, ["impressum", "imprint", "legal notice", "anbieterkennzeichnung", "rechtliches"])
|
|
if direct_url:
|
|
return direct_url
|
|
|
|
# 2. Try 2-Hop via "Kontakt"
|
|
logger.info(f"No direct Impressum found on {base_url}. Checking 'Kontakt' page...")
|
|
kontakt_url = self._find_link_by_keywords(soup, base_url, ["kontakt", "contact"])
|
|
|
|
if kontakt_url:
|
|
try:
|
|
headers = {'User-Agent': random.choice(USER_AGENTS)}
|
|
resp = requests.get(kontakt_url, headers=headers, timeout=10, verify=False)
|
|
if resp.status_code == 200:
|
|
sub_soup = BeautifulSoup(resp.content, 'html.parser')
|
|
# Look for Impressum on Kontakt page
|
|
sub_impressum = self._find_link_by_keywords(sub_soup, kontakt_url, ["impressum", "imprint", "legal notice", "anbieterkennzeichnung"])
|
|
if sub_impressum:
|
|
logger.info(f"Found Impressum via Kontakt page: {sub_impressum}")
|
|
return sub_impressum
|
|
except Exception as e:
|
|
logger.warning(f"Failed to scan Kontakt page {kontakt_url}: {e}")
|
|
|
|
return None
|
|
|
|
def _find_link_by_keywords(self, soup: BeautifulSoup, base_url: str, keywords: list) -> Optional[str]:
|
|
"""Helper to find a link matching specific keywords."""
|
|
candidates = []
|
|
for a in soup.find_all('a', href=True):
|
|
text = clean_text(a.get_text()).lower()
|
|
href = a['href'].lower()
|
|
|
|
if any(kw in text for kw in keywords) or any(kw in href for kw in keywords):
|
|
if "mailto:" in href or "tel:" in href or "javascript:" in href:
|
|
continue
|
|
|
|
full_url = urljoin(base_url, a['href'])
|
|
|
|
score = 0
|
|
# Higher score if keyword is in visible text
|
|
if any(kw in text for kw in keywords): score += 10
|
|
# Lower score if only in href
|
|
if any(kw in href for kw in keywords): score += 5
|
|
# Boost specific exact matches
|
|
if text in keywords: score += 5
|
|
|
|
candidates.append((score, full_url))
|
|
|
|
if candidates:
|
|
candidates.sort(key=lambda x: x[0], reverse=True)
|
|
return candidates[0][1]
|
|
return None
|
|
|
|
def _scrape_impressum_data(self, url: str) -> Dict[str, str]:
|
|
"""
|
|
Fetches the Impressum page and uses LLM to extract structured data.
|
|
"""
|
|
try:
|
|
headers = {'User-Agent': random.choice(USER_AGENTS)}
|
|
response = requests.get(url, headers=headers, timeout=self.timeout, verify=False)
|
|
response.raise_for_status()
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
# Aggressive cleaning for Impressum too
|
|
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav']):
|
|
element.decompose()
|
|
|
|
raw_text = soup.get_text(separator=' ', strip=True)[:10000] # Limit context
|
|
|
|
logger.debug(f"Impressum raw text sent to LLM ({len(raw_text)} chars): {raw_text[:500]}...")
|
|
|
|
# LLM Extraction (Adhering to Rule 1: r"""...""".format())
|
|
prompt = r"""
|
|
Extract the official company details from this German 'Impressum' text.
|
|
Return JSON ONLY. Keys: 'legal_name', 'street', 'zip', 'city', 'country_code', 'email', 'phone', 'ceo_name', 'vat_id'.
|
|
'country_code' should be the two-letter ISO code (e.g., "DE", "CH", "AT").
|
|
If a field is missing, use null. The street and city might be on different lines.
|
|
|
|
Text:
|
|
{text}
|
|
""".format(text=raw_text)
|
|
|
|
response_text = call_gemini_flash(prompt, json_mode=True, temperature=0.1)
|
|
logger.debug(f"Impressum LLM raw response ({len(response_text)} chars): {response_text[:500]}...")
|
|
|
|
result = json.loads(clean_json_response(response_text))
|
|
|
|
# --- FIX: Handle List vs Dict ---
|
|
# If LLM returns a list like [{...}], take the first element
|
|
if isinstance(result, list) and len(result) > 0:
|
|
result = result[0]
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Impressum scrape failed for {url}: {e}", exc_info=True) # Log full traceback
|
|
return None
|
|
|
|
def _parse_html(self, html_content: bytes) -> Dict[str, str]:
|
|
if not html_content:
|
|
return {"title": "", "description": "", "text": "", "emails": []}
|
|
|
|
try:
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# 1. Cleanup Junk
|
|
# Safe removal of tags
|
|
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside', 'form', 'button']):
|
|
if element: element.decompose()
|
|
|
|
# 1b. Remove common Cookie Banners (Defensive)
|
|
try:
|
|
for div in soup.find_all("div"):
|
|
if not div: continue
|
|
# .get can return None for attributes if not found? No, returns None if key not found.
|
|
# But if div is somehow None (unlikely in loop), check first.
|
|
|
|
# Convert list of classes to string if needed
|
|
cls_attr = div.get("class")
|
|
classes = " ".join(cls_attr).lower() if isinstance(cls_attr, list) else str(cls_attr or "").lower()
|
|
|
|
id_attr = div.get("id")
|
|
ids = str(id_attr or "").lower()
|
|
|
|
if any(x in classes or x in ids for x in ["cookie", "consent", "banner", "popup", "modal", "disclaimer"]):
|
|
div.decompose()
|
|
except Exception as e:
|
|
logger.warning(f"Error filtering divs: {e}")
|
|
|
|
# 2. Extract Title & Meta Description
|
|
title = ""
|
|
try:
|
|
if soup.title and soup.title.string:
|
|
title = soup.title.string
|
|
except: pass
|
|
|
|
meta_desc = ""
|
|
try:
|
|
meta_tag = soup.find('meta', attrs={'name': 'description'})
|
|
if meta_tag:
|
|
meta_desc = meta_tag.get('content', '') or ""
|
|
except: pass
|
|
|
|
# 3. Extract Main Text
|
|
try:
|
|
body = soup.find('body')
|
|
raw_text = body.get_text(separator=' ', strip=True) if body else soup.get_text(separator=' ', strip=True)
|
|
cleaned_text = clean_text(raw_text)
|
|
except Exception as e:
|
|
logger.warning(f"Text extraction failed: {e}")
|
|
cleaned_text = ""
|
|
|
|
# 4. Extract Emails
|
|
emails = []
|
|
try:
|
|
emails = list(set(re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', cleaned_text)))[:5]
|
|
except: pass
|
|
|
|
return {
|
|
"title": clean_text(title),
|
|
"description": clean_text(meta_desc),
|
|
"text": cleaned_text[:25000],
|
|
"emails": emails
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Critical error in _parse_html: {e}", exc_info=True)
|
|
return {"title": "", "description": "", "text": "", "emails": [], "error": str(e)}
|
|
|
|
# --- HELPER FUNCTION FOR EXTERNAL USE (RESTORED TO USE REQUESTS, NO TRAFILATURA) ---
|
|
def scrape_website_content(url: str) -> Optional[str]:
|
|
"""
|
|
Fetches text content from a URL using requests + BeautifulSoup (Fallback since Trafilatura is missing).
|
|
"""
|
|
if not url or url.lower() == "k.a.": return None
|
|
try:
|
|
headers = {'User-Agent': random.choice(USER_AGENTS)}
|
|
response = requests.get(url, headers=headers, timeout=15, verify=False)
|
|
response.raise_for_status()
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
# Basic cleanup
|
|
for element in soup(['script', 'style', 'noscript']):
|
|
element.decompose()
|
|
|
|
text = soup.get_text(separator=' ', strip=True)
|
|
if text:
|
|
logger.debug(f"Scraped content length for {url}: {len(text)} chars")
|
|
return text
|
|
except Exception as e:
|
|
logger.error(f"Scraping error for {url}: {e}")
|
|
return None |