Files
Brancheneinstufung2/company-explorer/backend/services/scraping.py

282 lines
12 KiB
Python

import logging
import requests
import random
import re
import json
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from typing import Optional, Dict
from ..lib.core_utils import clean_text, retry_on_failure, call_gemini_flash, clean_json_response
logger = logging.getLogger(__name__)
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0'
]
class ScraperService:
def __init__(self, timeout: int = 15):
self.timeout = timeout
@retry_on_failure(max_retries=2)
def scrape_url(self, url: str) -> Dict[str, str]:
"""
Fetches a URL and returns cleaned text content + meta info.
Also attempts to find and scrape the Impressum (Imprint).
"""
if not url.startswith("http"):
url = "https://" + url
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
# verify=False is risky but often needed for poorly configured corporate sites
response = requests.get(url, headers=headers, timeout=self.timeout, verify=False)
response.raise_for_status()
# Check Content Type
logger.debug(f"Response status: {response.status_code}")
if response.headers is None:
logger.error("Response headers is None!")
return {"error": "No headers"}
content_type = response.headers.get('Content-Type', '').lower()
if 'text/html' not in content_type:
logger.warning(f"Skipping non-HTML content for {url}: {content_type}")
return {"error": "Not HTML"}
# Parse Main Page
try:
result = self._parse_html(response.content)
except Exception as e:
logger.error(f"Error in _parse_html: {e}", exc_info=True)
return {"error": f"Parse error: {e}"}
# --- IMPRESSUM LOGIC ---
try:
soup = BeautifulSoup(response.content, 'html.parser')
impressum_url = self._find_impressum_link(soup, url)
except Exception as e:
logger.error(f"Error finding impressum: {e}", exc_info=True)
impressum_url = None
# FALLBACK: If deep URL (e.g. /ueber-uns/) yielded no Impressum, try Root URL
if not impressum_url and url.count('/') > 3:
try:
parsed = urlparse(url)
root_url = f"{parsed.scheme}://{parsed.netloc}/"
logger.info(f"No Impressum on deep URL. Checking Root: {root_url}")
root_resp = requests.get(root_url, headers=headers, timeout=10, verify=False)
if root_resp.status_code == 200:
root_soup = BeautifulSoup(root_resp.content, 'html.parser')
impressum_url = self._find_impressum_link(root_soup, root_url)
except Exception as ex:
logger.warning(f"Root URL fallback failed: {ex}")
if impressum_url:
logger.info(f"Found Impressum URL: {impressum_url}")
impressum_data = self._scrape_impressum_data(impressum_url)
result["impressum"] = impressum_data
else:
logger.info(f"No Impressum link found for {url}")
result["impressum"] = None
return result
except requests.exceptions.SSLError:
# Retry with HTTP if HTTPS fails
if url.startswith("https://"):
logger.info(f"SSL failed for {url}, retrying with http://...")
return self.scrape_url(url.replace("https://", "http://"))
raise
except Exception as e:
logger.error(f"Scraping failed for {url}: {e}")
return {"error": str(e)}
def _find_impressum_link(self, soup: BeautifulSoup, base_url: str) -> Optional[str]:
"""
Scans links for Impressum. If not found, tries to find 'Kontakt' page and looks there.
"""
# 1. Try Direct Impressum Link
direct_url = self._find_link_by_keywords(soup, base_url, ["impressum", "imprint", "legal notice", "anbieterkennzeichnung", "rechtliches"])
if direct_url:
return direct_url
# 2. Try 2-Hop via "Kontakt"
logger.info(f"No direct Impressum found on {base_url}. Checking 'Kontakt' page...")
kontakt_url = self._find_link_by_keywords(soup, base_url, ["kontakt", "contact"])
if kontakt_url:
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
resp = requests.get(kontakt_url, headers=headers, timeout=10, verify=False)
if resp.status_code == 200:
sub_soup = BeautifulSoup(resp.content, 'html.parser')
# Look for Impressum on Kontakt page
sub_impressum = self._find_link_by_keywords(sub_soup, kontakt_url, ["impressum", "imprint", "legal notice", "anbieterkennzeichnung"])
if sub_impressum:
logger.info(f"Found Impressum via Kontakt page: {sub_impressum}")
return sub_impressum
except Exception as e:
logger.warning(f"Failed to scan Kontakt page {kontakt_url}: {e}")
return None
def _find_link_by_keywords(self, soup: BeautifulSoup, base_url: str, keywords: list) -> Optional[str]:
"""Helper to find a link matching specific keywords."""
candidates = []
for a in soup.find_all('a', href=True):
text = clean_text(a.get_text()).lower()
href = a['href'].lower()
if any(kw in text for kw in keywords) or any(kw in href for kw in keywords):
if "mailto:" in href or "tel:" in href or "javascript:" in href:
continue
full_url = urljoin(base_url, a['href'])
score = 0
# Higher score if keyword is in visible text
if any(kw in text for kw in keywords): score += 10
# Lower score if only in href
if any(kw in href for kw in keywords): score += 5
# Boost specific exact matches
if text in keywords: score += 5
candidates.append((score, full_url))
if candidates:
candidates.sort(key=lambda x: x[0], reverse=True)
return candidates[0][1]
return None
def _scrape_impressum_data(self, url: str) -> Dict[str, str]:
"""
Fetches the Impressum page and uses LLM to extract structured data.
"""
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
response = requests.get(url, headers=headers, timeout=self.timeout, verify=False)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Aggressive cleaning for Impressum too
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav']):
element.decompose()
raw_text = soup.get_text(separator=' ', strip=True)[:10000] # Limit context
logger.debug(f"Impressum raw text sent to LLM ({len(raw_text)} chars): {raw_text[:500]}...")
# LLM Extraction
prompt = f"""
Extract the official company details from this German 'Impressum' text.
Return JSON ONLY. Keys: 'legal_name', 'street', 'zip', 'city', 'country_code', 'email', 'phone', 'ceo_name', 'vat_id'.
'country_code' should be the two-letter ISO code (e.g., "DE", "CH", "AT").
If a field is missing, use null.
Text:
{raw_text}
"""
response_text = call_gemini(prompt, json_mode=True, temperature=0.1)
logger.debug(f"Impressum LLM raw response ({len(response_text)} chars): {response_text[:500]}...")
result = json.loads(clean_json_response(response_text))
# --- FIX: Handle List vs Dict ---
# If LLM returns a list like [{...}], take the first element
if isinstance(result, list) and len(result) > 0:
result = result[0]
return result
except Exception as e:
logger.error(f"Impressum scrape failed for {url}: {e}", exc_info=True) # Log full traceback
return None
def _parse_html(self, html_content: bytes) -> Dict[str, str]:
if not html_content:
return {"title": "", "description": "", "text": "", "emails": []}
try:
soup = BeautifulSoup(html_content, 'html.parser')
# 1. Cleanup Junk
# Safe removal of tags
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside', 'form', 'button']):
if element: element.decompose()
# 1b. Remove common Cookie Banners (Defensive)
try:
for div in soup.find_all("div"):
if not div: continue
# .get can return None for attributes if not found? No, returns None if key not found.
# But if div is somehow None (unlikely in loop), check first.
# Convert list of classes to string if needed
cls_attr = div.get("class")
classes = " ".join(cls_attr).lower() if isinstance(cls_attr, list) else str(cls_attr or "").lower()
id_attr = div.get("id")
ids = str(id_attr or "").lower()
if any(x in classes or x in ids for x in ["cookie", "consent", "banner", "popup", "modal", "disclaimer"]):
div.decompose()
except Exception as e:
logger.warning(f"Error filtering divs: {e}")
# 2. Extract Title & Meta Description
title = ""
try:
if soup.title and soup.title.string:
title = soup.title.string
except: pass
meta_desc = ""
try:
meta_tag = soup.find('meta', attrs={'name': 'description'})
if meta_tag:
meta_desc = meta_tag.get('content', '') or ""
except: pass
# 3. Extract Main Text
try:
body = soup.find('body')
raw_text = body.get_text(separator=' ', strip=True) if body else soup.get_text(separator=' ', strip=True)
cleaned_text = clean_text(raw_text)
except Exception as e:
logger.warning(f"Text extraction failed: {e}")
cleaned_text = ""
# 4. Extract Emails
emails = []
try:
emails = list(set(re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', cleaned_text)))[:5]
except: pass
return {
"title": clean_text(title),
"description": clean_text(meta_desc),
"text": cleaned_text[:25000],
"emails": emails
}
except Exception as e:
logger.error(f"Critical error in _parse_html: {e}", exc_info=True)
return {"title": "", "description": "", "text": "", "emails": [], "error": str(e)}
# --- HELPER FUNCTION FOR EXTERNAL USE ---
def scrape_website_content(url: str) -> Optional[str]:
"""
Simple wrapper to get just the text content of a URL.
Used by ClassificationService.
"""
scraper = ScraperService()
result = scraper.scrape_url(url)
if result and result.get("text"):
return result["text"]
return None