#!/usr/bin/env python3 """ wikipedia_service.py Service class for interacting with Wikipedia, including search, validation, and extraction of company data. """ import logging import re from urllib.parse import unquote import requests import wikipedia from bs4 import BeautifulSoup # Import settings and helpers from ..config import settings from ..lib.core_utils import ( retry_on_failure, simple_normalize_url, normalize_company_name, extract_numeric_value, clean_text, fuzzy_similarity ) logger = logging.getLogger(__name__) class WikipediaService: """ Handles searching for Wikipedia articles and extracting relevant company data. Includes validation logic for articles. """ def __init__(self, user_agent=None): """ Initialize the scraper with a requests session. """ self.user_agent = user_agent or 'Mozilla/5.0 (compatible; CompanyExplorer/1.0; +http://www.example.com/bot)' self.session = requests.Session() self.session.headers.update({'User-Agent': self.user_agent}) self.keywords_map = { 'branche': ['branche', 'wirtschaftszweig', 'industry', 'taetigkeit', 'sektor', 'produkte', 'leistungen'], 'umsatz': ['umsatz', 'erloes', 'revenue', 'jahresumsatz', 'konzernumsatz', 'ergebnis'], 'mitarbeiter': ['mitarbeiter', 'mitarbeiterzahl', 'beschaeftigte', 'employees', 'number of employees', 'personal', 'belegschaft'], 'sitz': ['sitz', 'hauptsitz', 'unternehmenssitz', 'firmensitz', 'headquarters', 'standort', 'sitz des unternehmens', 'anschrift', 'adresse'] } try: # Default to German for now, could be configurable wiki_lang = 'de' wikipedia.set_lang(wiki_lang) wikipedia.set_rate_limiting(False) logger.info(f"Wikipedia library language set to '{wiki_lang}'. Rate limiting DISABLED.") except Exception as e: logger.warning(f"Error setting Wikipedia language or rate limiting: {e}") @retry_on_failure(max_retries=3) def serp_wikipedia_lookup(self, company_name: str, lang: str = 'de') -> str: """ Searches for the best Wikipedia URL for a company using Google Search (via SerpAPI). Prioritizes Knowledge Graph hits and then organic results. Args: company_name (str): The name of the company to search for. lang (str): The language code for Wikipedia search (e.g., 'de'). Returns: str: The URL of the best hit or None if nothing suitable was found. """ logger.info(f"Starting SerpAPI Wikipedia search for '{company_name}'...") serp_key = settings.SERP_API_KEY if not serp_key: logger.warning("SerpAPI Key not configured. Skipping search.") return None query = f'site:{lang}.wikipedia.org "{company_name}"' params = {"engine": "google", "q": query, "api_key": serp_key, "hl": lang} try: response = requests.get("https://serpapi.com/search", params=params, timeout=15) response.raise_for_status() data = response.json() # 1. Check Knowledge Graph (highest priority) if "knowledge_graph" in data and "source" in data["knowledge_graph"]: source = data["knowledge_graph"]["source"] if "link" in source and f"{lang}.wikipedia.org" in source["link"]: url = source["link"] logger.info(f" -> Hit found in Knowledge Graph: {url}") return url # 2. Check organic results if "organic_results" in data: for result in data.get("organic_results", []): link = result.get("link") if link and f"{lang}.wikipedia.org/wiki/" in link: logger.info(f" -> Best organic hit found: {link}") return link logger.warning(f" -> No suitable Wikipedia URL found for '{company_name}' in SerpAPI results.") return None except Exception as e: logger.error(f"Error during SerpAPI request for '{company_name}': {e}") return None @retry_on_failure(max_retries=3) def _get_page_soup(self, url: str) -> BeautifulSoup: """ Fetches HTML from a URL and returns a BeautifulSoup object. """ if not url or not isinstance(url, str) or not url.lower().startswith(("http://", "https://")): logger.warning(f"_get_page_soup: Invalid URL '{str(url)[:100]}...'") return None try: response = self.session.get(url, timeout=15) response.raise_for_status() # Handle encoding response.encoding = response.apparent_encoding soup = BeautifulSoup(response.text, 'html.parser') return soup except Exception as e: logger.error(f"_get_page_soup: Error fetching or parsing HTML from {str(url)[:100]}...: {e}") raise e def _extract_first_paragraph_from_soup(self, soup: BeautifulSoup) -> str: """ Extracts the first meaningful paragraph from the Wikipedia article soup. Mimics the sophisticated cleaning from the legacy system. """ if not soup: return "k.A." paragraph_text = "k.A." try: content_div = soup.find('div', class_='mw-parser-output') search_area = content_div if content_div else soup paragraphs = search_area.find_all('p', recursive=False) if not paragraphs: paragraphs = search_area.find_all('p') for p in paragraphs: # Remove references [1], [2], etc. for sup in p.find_all('sup', class_='reference'): sup.decompose() # Remove hidden spans for span in p.find_all('span', style=lambda v: v and 'display:none' in v): span.decompose() # Remove coordinates for span in p.find_all('span', id='coordinates'): span.decompose() text = clean_text(p.get_text(separator=' ', strip=True)) # Filter out meta-paragraphs or too short ones if text != "k.A." and len(text) > 50 and not re.match(r'^(Datei:|Abbildung:|Siehe auch:|Einzelnachweise|Siehe auch|Literatur)', text, re.IGNORECASE): paragraph_text = text[:2000] # Limit length break except Exception as e: logger.error(f"Error extracting first paragraph: {e}") return paragraph_text def extract_categories(self, soup: BeautifulSoup) -> str: """ Extracts Wikipedia categories from the soup object, filtering out meta-categories. """ if not soup: return "k.A." cats_filtered = [] try: cat_div = soup.find('div', id="mw-normal-catlinks") if cat_div: ul = cat_div.find('ul') if ul: cats = [clean_text(li.get_text()) for li in ul.find_all('li')] cats_filtered = [c for c in cats if c and isinstance(c, str) and c.strip() and "kategorien:" not in c.lower()] except Exception as e: logger.error(f"Error extracting categories: {e}") return ", ".join(cats_filtered) if cats_filtered else "k.A." def _validate_article(self, page, company_name: str, website: str, crm_city: str, parent_name: str = None) -> bool: """ Validates fact-based whether a Wikipedia article matches the company. Prioritizes hard facts (Domain, City) over pure name similarity. """ if not page or not hasattr(page, 'html'): return False logger.debug(f"Validating article '{page.title}' for company '{company_name}'...") try: page_html = page.html() soup = BeautifulSoup(page_html, 'html.parser') except Exception as e: logger.error(f"Could not parse HTML for article '{page.title}': {e}") return False # --- Stage 1: Website Domain Validation (very strong signal) --- normalized_domain = simple_normalize_url(website) if normalized_domain != "k.A.": # Search for domain in "External links" section or infobox external_links = soup.select('.external, .infobox a[href*="."]') for link in external_links: href = link.get('href', '') if normalized_domain in href: logger.info(f" => VALIDATION SUCCESS (Domain Match): Domain '{normalized_domain}' found in links.") return True # --- Stage 2: City Validation (strong signal) --- if crm_city and crm_city.lower() != 'k.a.': infobox_sitz_raw = self._extract_infobox_value(soup, 'sitz') if infobox_sitz_raw and infobox_sitz_raw.lower() != 'k.a.': if crm_city.lower() in infobox_sitz_raw.lower(): logger.info(f" => VALIDATION SUCCESS (City Match): CRM City '{crm_city}' found in Infobox City '{infobox_sitz_raw}'.") return True # --- Stage 3: Parent Validation --- normalized_parent = normalize_company_name(parent_name) if parent_name else None if normalized_parent: page_content_for_check = (page.title + " " + page.summary).lower() if normalized_parent in page_content_for_check: logger.info(f" => VALIDATION SUCCESS (Parent Match): Parent Name '{parent_name}' found in article.") return True # --- Stage 4: Name Similarity (Fallback with stricter rules) --- normalized_company = normalize_company_name(company_name) normalized_title = normalize_company_name(page.title) similarity = fuzzy_similarity(normalized_title, normalized_company) if similarity > 0.85: # Stricter threshold logger.info(f" => VALIDATION SUCCESS (High Similarity): High name similarity ({similarity:.2f}).") return True logger.debug(f" => VALIDATION FAILED: No hard fact (Domain, City, Parent) and similarity ({similarity:.2f}) too low.") return False def search_company_article(self, company_name: str, website: str = None, crm_city: str = None, parent_name: str = None): """ Searches and validates a matching Wikipedia article using the 'Google-First' strategy. 1. Finds the best URL via SerpAPI. 2. Validates the found article with hard facts. """ if not company_name: return None logger.info(f"Starting 'Google-First' Wikipedia search for '{company_name}'...") # 1. Find the best URL candidate via Google Search url_candidate = self.serp_wikipedia_lookup(company_name) if not url_candidate: logger.warning(f" -> No URL found via SerpAPI. Search aborted.") return None # 2. Load and validate the found article try: page_title = unquote(url_candidate.split('/wiki/')[-1].replace('_', ' ')) page = wikipedia.page(title=page_title, auto_suggest=False, redirect=True) # Use the new fact-based validation if self._validate_article(page, company_name, website, crm_city, parent_name): logger.info(f" -> Article '{page.title}' successfully validated.") return page else: logger.warning(f" -> Article '{page.title}' could not be validated.") return None except wikipedia.exceptions.PageError: logger.error(f" -> Error: Found URL '{url_candidate}' did not lead to a valid Wikipedia page.") return None except Exception as e: logger.error(f" -> Unexpected error processing page '{url_candidate}': {e}") return None def _extract_infobox_value(self, soup: BeautifulSoup, target: str) -> str: """ Targetedly extracts values (Industry, Revenue, etc.) from the infobox. """ if not soup or target not in self.keywords_map: return "k.A." keywords = self.keywords_map[target] infobox = soup.select_one('table[class*="infobox"]') if not infobox: return "k.A." value_found = "k.A." try: rows = infobox.find_all('tr') for row in rows: cells = row.find_all(['th', 'td'], recursive=False) header_text, value_cell = None, None if len(cells) >= 2: if cells[0].name == 'th': header_text, value_cell = cells[0].get_text(strip=True), cells[1] elif cells[0].name == 'td' and cells[1].name == 'td': style = cells[0].get('style', '').lower() is_header_like = 'font-weight' in style and ('bold' in style or '700' in style) or cells[0].find(['b', 'strong'], recursive=False) if is_header_like: header_text, value_cell = cells[0].get_text(strip=True), cells[1] if header_text and value_cell: if any(kw in header_text.lower() for kw in keywords): for sup in value_cell.find_all(['sup', 'span']): sup.decompose() raw_value_text = value_cell.get_text(separator=' ', strip=True) if target == 'branche' or target == 'sitz': value_found = clean_text(raw_value_text).split('\n')[0].strip() elif target == 'umsatz': value_found = extract_numeric_value(raw_value_text, is_umsatz=True) elif target == 'mitarbeiter': value_found = extract_numeric_value(raw_value_text, is_umsatz=False) value_found = value_found if value_found else "k.A." logger.info(f" --> Infobox '{target}' found: '{value_found}'") break except Exception as e: logger.error(f"Error iterating infobox rows for '{target}': {e}") return "k.A." return value_found def _parse_sitz_string_detailed(self, raw_sitz_string_input: str) -> dict: """ Attempts to extract City and Country in detail from a raw Sitz string. """ sitz_stadt_val, sitz_land_val = "k.A.", "k.A." if not raw_sitz_string_input or not isinstance(raw_sitz_string_input, str): return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} temp_sitz = raw_sitz_string_input.strip() if not temp_sitz or temp_sitz.lower() == "k.a.": return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} known_countries_detailed = { "deutschland": "Deutschland", "germany": "Deutschland", "de": "Deutschland", "österreich": "Österreich", "austria": "Österreich", "at": "Österreich", "schweiz": "Schweiz", "switzerland": "Schweiz", "ch": "Schweiz", "suisse": "Schweiz", "usa": "USA", "u.s.": "USA", "united states": "USA", "vereinigte staaten": "USA", "vereinigtes königreich": "Vereinigtes Königreich", "united kingdom": "Vereinigtes Königreich", "uk": "Vereinigtes Königreich", } region_to_country = { "nrw": "Deutschland", "nordrhein-westfalen": "Deutschland", "bayern": "Deutschland", "hessen": "Deutschland", "zg": "Schweiz", "zug": "Schweiz", "zh": "Schweiz", "zürich": "Schweiz", "ca": "USA", "california": "USA", "ny": "USA", "new york": "USA", } extracted_country = "" original_temp_sitz = temp_sitz klammer_match = re.search(r'\(([^)]+)\)$', temp_sitz) if klammer_match: suffix_in_klammer = klammer_match.group(1).strip().lower() if suffix_in_klammer in known_countries_detailed: extracted_country = known_countries_detailed[suffix_in_klammer] temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") elif suffix_in_klammer in region_to_country: extracted_country = region_to_country[suffix_in_klammer] temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") if not extracted_country and "," in temp_sitz: parts = [p.strip() for p in temp_sitz.split(',')] if len(parts) > 1: last_part_lower = parts[-1].lower() if last_part_lower in known_countries_detailed: extracted_country = known_countries_detailed[last_part_lower] temp_sitz = ", ".join(parts[:-1]).strip(" ,") elif last_part_lower in region_to_country: extracted_country = region_to_country[last_part_lower] temp_sitz = ", ".join(parts[:-1]).strip(" ,") sitz_land_val = extracted_country if extracted_country else "k.A." sitz_stadt_val = re.sub(r'^\d{4,8}\s*', '', temp_sitz).strip(" ,") if not sitz_stadt_val: sitz_stadt_val = "k.A." if sitz_land_val != "k.A." else re.sub(r'^\d{4,8}\s*', '', original_temp_sitz).strip(" ,") or "k.A." return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} @retry_on_failure(max_retries=3) def extract_company_data(self, url_or_page) -> dict: """ Extracts structured company data from a Wikipedia article (URL or page object). """ default_result = { 'url': 'k.A.', 'title': 'k.A.', 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.', 'full_text': '' } page = None try: if isinstance(url_or_page, str) and "wikipedia.org" in url_or_page: page_title = unquote(url_or_page.split('/wiki/')[-1].replace('_', ' ')) page = wikipedia.page(title=page_title, auto_suggest=False, redirect=True) elif not isinstance(url_or_page, str): # Assumption: it is a page object page = url_or_page else: logger.warning(f"extract_company_data: Invalid Input '{str(url_or_page)[:100]}...") return default_result logger.info(f"Extracting data for Wiki Article: {page.title[:100]}...") # Extract basic data directly from page object first_paragraph = page.summary.split('\n')[0] if page.summary else 'k.A.' categories = ", ".join(page.categories) full_text = page.content # BeautifulSoup needed for infobox and refined extraction soup = self._get_page_soup(page.url) if not soup: logger.warning(f" -> Could not load page for Soup parsing. Extracting basic data only.") return { 'url': page.url, 'title': page.title, 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.', 'first_paragraph': page.summary.split('\n')[0] if page.summary else 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': ", ".join(page.categories), 'full_text': full_text } # Refined Extraction from Soup first_paragraph = self._extract_first_paragraph_from_soup(soup) categories = self.extract_categories(soup) # Extract infobox data branche_val = self._extract_infobox_value(soup, 'branche') umsatz_val = self._extract_infobox_value(soup, 'umsatz') mitarbeiter_val = self._extract_infobox_value(soup, 'mitarbeiter') raw_sitz_string = self._extract_infobox_value(soup, 'sitz') parsed_sitz = self._parse_sitz_string_detailed(raw_sitz_string) sitz_stadt_val = parsed_sitz['sitz_stadt'] sitz_land_val = parsed_sitz['sitz_land'] result = { 'url': page.url, 'title': page.title, 'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val, 'first_paragraph': first_paragraph, 'branche': branche_val, 'umsatz': umsatz_val, 'mitarbeiter': mitarbeiter_val, 'categories': categories, 'full_text': full_text } logger.info(f" -> Extracted Data: City='{sitz_stadt_val}', Country='{sitz_land_val}', Rev='{umsatz_val}', Emp='{mitarbeiter_val}'") return result except wikipedia.exceptions.PageError: logger.error(f" -> Error: Wikipedia article for '{str(url_or_page)[:100]}' could not be found (PageError).") return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'} except Exception as e: logger.error(f" -> Unexpected error extracting from '{str(url_or_page)[:100]}': {e}") return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'}