- Fixed a critical in the company-explorer by forcing a database re-initialization with a new file (). This ensures the application code is in sync with the database schema. - Documented the schema mismatch incident and its resolution in MIGRATION_PLAN.md. - Restored and enhanced BUILDER_APPS_MIGRATION.md by recovering extensive, valuable content from the git history that was accidentally deleted. The guide now again includes detailed troubleshooting steps and code templates for common migration pitfalls.
270 lines
12 KiB
Python
270 lines
12 KiB
Python
import logging
|
|
import requests
|
|
import random
|
|
import re
|
|
import json
|
|
from urllib.parse import urljoin, urlparse
|
|
from bs4 import BeautifulSoup
|
|
from typing import Optional, Dict
|
|
from ..lib.core_utils import clean_text, retry_on_failure, call_gemini, clean_json_response
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
USER_AGENTS = [
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Safari/605.1.15',
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0'
|
|
]
|
|
|
|
class ScraperService:
|
|
def __init__(self, timeout: int = 15):
|
|
self.timeout = timeout
|
|
|
|
@retry_on_failure(max_retries=2)
|
|
def scrape_url(self, url: str) -> Dict[str, str]:
|
|
"""
|
|
Fetches a URL and returns cleaned text content + meta info.
|
|
Also attempts to find and scrape the Impressum (Imprint).
|
|
"""
|
|
if not url.startswith("http"):
|
|
url = "https://" + url
|
|
|
|
try:
|
|
headers = {'User-Agent': random.choice(USER_AGENTS)}
|
|
# verify=False is risky but often needed for poorly configured corporate sites
|
|
response = requests.get(url, headers=headers, timeout=self.timeout, verify=False)
|
|
response.raise_for_status()
|
|
|
|
# Check Content Type
|
|
logger.debug(f"Response status: {response.status_code}")
|
|
if response.headers is None:
|
|
logger.error("Response headers is None!")
|
|
return {"error": "No headers"}
|
|
|
|
content_type = response.headers.get('Content-Type', '').lower()
|
|
if 'text/html' not in content_type:
|
|
logger.warning(f"Skipping non-HTML content for {url}: {content_type}")
|
|
return {"error": "Not HTML"}
|
|
|
|
# Parse Main Page
|
|
try:
|
|
result = self._parse_html(response.content)
|
|
except Exception as e:
|
|
logger.error(f"Error in _parse_html: {e}", exc_info=True)
|
|
return {"error": f"Parse error: {e}"}
|
|
|
|
# --- IMPRESSUM LOGIC ---
|
|
try:
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
impressum_url = self._find_impressum_link(soup, url)
|
|
except Exception as e:
|
|
logger.error(f"Error finding impressum: {e}", exc_info=True)
|
|
impressum_url = None
|
|
|
|
# FALLBACK: If deep URL (e.g. /ueber-uns/) yielded no Impressum, try Root URL
|
|
if not impressum_url and url.count('/') > 3:
|
|
try:
|
|
parsed = urlparse(url)
|
|
root_url = f"{parsed.scheme}://{parsed.netloc}/"
|
|
logger.info(f"No Impressum on deep URL. Checking Root: {root_url}")
|
|
|
|
root_resp = requests.get(root_url, headers=headers, timeout=10, verify=False)
|
|
if root_resp.status_code == 200:
|
|
root_soup = BeautifulSoup(root_resp.content, 'html.parser')
|
|
impressum_url = self._find_impressum_link(root_soup, root_url)
|
|
except Exception as ex:
|
|
logger.warning(f"Root URL fallback failed: {ex}")
|
|
|
|
if impressum_url:
|
|
logger.info(f"Found Impressum URL: {impressum_url}")
|
|
impressum_data = self._scrape_impressum_data(impressum_url)
|
|
result["impressum"] = impressum_data
|
|
else:
|
|
logger.info(f"No Impressum link found for {url}")
|
|
result["impressum"] = None
|
|
|
|
return result
|
|
|
|
except requests.exceptions.SSLError:
|
|
# Retry with HTTP if HTTPS fails
|
|
if url.startswith("https://"):
|
|
logger.info(f"SSL failed for {url}, retrying with http://...")
|
|
return self.scrape_url(url.replace("https://", "http://"))
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Scraping failed for {url}: {e}")
|
|
return {"error": str(e)}
|
|
|
|
def _find_impressum_link(self, soup: BeautifulSoup, base_url: str) -> Optional[str]:
|
|
"""
|
|
Scans links for Impressum. If not found, tries to find 'Kontakt' page and looks there.
|
|
"""
|
|
# 1. Try Direct Impressum Link
|
|
direct_url = self._find_link_by_keywords(soup, base_url, ["impressum", "imprint", "legal notice", "anbieterkennzeichnung", "rechtliches"])
|
|
if direct_url:
|
|
return direct_url
|
|
|
|
# 2. Try 2-Hop via "Kontakt"
|
|
logger.info(f"No direct Impressum found on {base_url}. Checking 'Kontakt' page...")
|
|
kontakt_url = self._find_link_by_keywords(soup, base_url, ["kontakt", "contact"])
|
|
|
|
if kontakt_url:
|
|
try:
|
|
headers = {'User-Agent': random.choice(USER_AGENTS)}
|
|
resp = requests.get(kontakt_url, headers=headers, timeout=10, verify=False)
|
|
if resp.status_code == 200:
|
|
sub_soup = BeautifulSoup(resp.content, 'html.parser')
|
|
# Look for Impressum on Kontakt page
|
|
sub_impressum = self._find_link_by_keywords(sub_soup, kontakt_url, ["impressum", "imprint", "legal notice", "anbieterkennzeichnung"])
|
|
if sub_impressum:
|
|
logger.info(f"Found Impressum via Kontakt page: {sub_impressum}")
|
|
return sub_impressum
|
|
except Exception as e:
|
|
logger.warning(f"Failed to scan Kontakt page {kontakt_url}: {e}")
|
|
|
|
return None
|
|
|
|
def _find_link_by_keywords(self, soup: BeautifulSoup, base_url: str, keywords: list) -> Optional[str]:
|
|
"""Helper to find a link matching specific keywords."""
|
|
candidates = []
|
|
for a in soup.find_all('a', href=True):
|
|
text = clean_text(a.get_text()).lower()
|
|
href = a['href'].lower()
|
|
|
|
if any(kw in text for kw in keywords) or any(kw in href for kw in keywords):
|
|
if "mailto:" in href or "tel:" in href or "javascript:" in href:
|
|
continue
|
|
|
|
full_url = urljoin(base_url, a['href'])
|
|
|
|
score = 0
|
|
# Higher score if keyword is in visible text
|
|
if any(kw in text for kw in keywords): score += 10
|
|
# Lower score if only in href
|
|
if any(kw in href for kw in keywords): score += 5
|
|
# Boost specific exact matches
|
|
if text in keywords: score += 5
|
|
|
|
candidates.append((score, full_url))
|
|
|
|
if candidates:
|
|
candidates.sort(key=lambda x: x[0], reverse=True)
|
|
return candidates[0][1]
|
|
return None
|
|
|
|
def _scrape_impressum_data(self, url: str) -> Dict[str, str]:
|
|
"""
|
|
Fetches the Impressum page and uses LLM to extract structured data.
|
|
"""
|
|
try:
|
|
headers = {'User-Agent': random.choice(USER_AGENTS)}
|
|
response = requests.get(url, headers=headers, timeout=self.timeout, verify=False)
|
|
response.raise_for_status()
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
# Aggressive cleaning for Impressum too
|
|
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav']):
|
|
element.decompose()
|
|
|
|
raw_text = soup.get_text(separator=' ', strip=True)[:10000] # Limit context
|
|
|
|
logger.debug(f"Impressum raw text sent to LLM ({len(raw_text)} chars): {raw_text[:500]}...")
|
|
|
|
# LLM Extraction
|
|
prompt = f"""
|
|
Extract the official company details from this German 'Impressum' text.
|
|
Return JSON ONLY. Keys: 'legal_name', 'street', 'zip', 'city', 'country_code', 'email', 'phone', 'ceo_name', 'vat_id'.
|
|
'country_code' should be the two-letter ISO code (e.g., "DE", "CH", "AT").
|
|
If a field is missing, use null.
|
|
|
|
Text:
|
|
{raw_text}
|
|
"""
|
|
|
|
response_text = call_gemini(prompt, json_mode=True, temperature=0.1)
|
|
logger.debug(f"Impressum LLM raw response ({len(response_text)} chars): {response_text[:500]}...")
|
|
|
|
result = json.loads(clean_json_response(response_text))
|
|
|
|
# --- FIX: Handle List vs Dict ---
|
|
# If LLM returns a list like [{...}], take the first element
|
|
if isinstance(result, list) and len(result) > 0:
|
|
result = result[0]
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Impressum scrape failed for {url}: {e}", exc_info=True) # Log full traceback
|
|
return None
|
|
|
|
def _parse_html(self, html_content: bytes) -> Dict[str, str]:
|
|
if not html_content:
|
|
return {"title": "", "description": "", "text": "", "emails": []}
|
|
|
|
try:
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# 1. Cleanup Junk
|
|
# Safe removal of tags
|
|
for element in soup(['script', 'style', 'noscript', 'iframe', 'svg', 'header', 'footer', 'nav', 'aside', 'form', 'button']):
|
|
if element: element.decompose()
|
|
|
|
# 1b. Remove common Cookie Banners (Defensive)
|
|
try:
|
|
for div in soup.find_all("div"):
|
|
if not div: continue
|
|
# .get can return None for attributes if not found? No, returns None if key not found.
|
|
# But if div is somehow None (unlikely in loop), check first.
|
|
|
|
# Convert list of classes to string if needed
|
|
cls_attr = div.get("class")
|
|
classes = " ".join(cls_attr).lower() if isinstance(cls_attr, list) else str(cls_attr or "").lower()
|
|
|
|
id_attr = div.get("id")
|
|
ids = str(id_attr or "").lower()
|
|
|
|
if any(x in classes or x in ids for x in ["cookie", "consent", "banner", "popup", "modal", "disclaimer"]):
|
|
div.decompose()
|
|
except Exception as e:
|
|
logger.warning(f"Error filtering divs: {e}")
|
|
|
|
# 2. Extract Title & Meta Description
|
|
title = ""
|
|
try:
|
|
if soup.title and soup.title.string:
|
|
title = soup.title.string
|
|
except: pass
|
|
|
|
meta_desc = ""
|
|
try:
|
|
meta_tag = soup.find('meta', attrs={'name': 'description'})
|
|
if meta_tag:
|
|
meta_desc = meta_tag.get('content', '') or ""
|
|
except: pass
|
|
|
|
# 3. Extract Main Text
|
|
try:
|
|
body = soup.find('body')
|
|
raw_text = body.get_text(separator=' ', strip=True) if body else soup.get_text(separator=' ', strip=True)
|
|
cleaned_text = clean_text(raw_text)
|
|
except Exception as e:
|
|
logger.warning(f"Text extraction failed: {e}")
|
|
cleaned_text = ""
|
|
|
|
# 4. Extract Emails
|
|
emails = []
|
|
try:
|
|
emails = list(set(re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', cleaned_text)))[:5]
|
|
except: pass
|
|
|
|
return {
|
|
"title": clean_text(title),
|
|
"description": clean_text(meta_desc),
|
|
"text": cleaned_text[:25000],
|
|
"emails": emails
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Critical error in _parse_html: {e}", exc_info=True)
|
|
return {"title": "", "description": "", "text": "", "emails": [], "error": str(e)}
|