# --- START OF FILE wikipedia_scraper.py --- #!/usr/bin/env python3 """ wikipedia_scraper.py Klasse zur Kapselung der Interaktionen mit Wikipedia, inklusive Suche, Validierung und Extraktion von Unternehmensdaten. """ import logging import re import time import traceback from urllib.parse import unquote import requests import wikipedia from bs4 import BeautifulSoup # Import der abhängigen Module from config import Config from helpers import (retry_on_failure, simple_normalize_url, normalize_company_name, extract_numeric_value, clean_text, fuzzy_similarity) class WikipediaScraper: """ Handhabt das Suchen von Wikipedia-Artikeln und das Extrahieren relevanter Unternehmensdaten. Beinhaltet Validierungslogik fuer Artikel. Nutzt die wikipedia-Bibliothek und Requests fuer direktes HTML-Scraping. """ def __init__(self, user_agent=None): """ Initialisiert den Scraper mit einer Requests-Session und konfigurierter Wikipedia-Bibliothek. """ self.logger = logging.getLogger(__name__ + ".WikipediaScraper") self.logger.debug("WikipediaScraper initialisiert.") self.user_agent = user_agent or getattr(Config, 'USER_AGENT', 'Mozilla/5.0 (compatible; UnternehmenSkript/1.0; +http://www.example.com/bot)') self.session = requests.Session() self.session.headers.update({'User-Agent': self.user_agent}) self.logger.debug(f"Requests Session mit User-Agent '{self.user_agent}' initialisiert.") self.keywords_map = { 'branche': ['branche', 'wirtschaftszweig', 'industry', 'taetigkeit', 'sektor', 'produkte', 'leistungen'], 'umsatz': ['umsatz', 'erloes', 'revenue', 'jahresumsatz', 'konzernumsatz', 'ergebnis'], 'mitarbeiter': ['mitarbeiter', 'mitarbeiterzahl', 'beschaeftigte', 'employees', 'number of employees', 'personal', 'belegschaft'], 'sitz': ['sitz', 'hauptsitz', 'unternehmenssitz', 'firmensitz', 'headquarters', 'standort', 'sitz des unternehmens', 'anschrift', 'adresse'] } try: wiki_lang = getattr(Config, 'LANG', 'de') wikipedia.set_lang(wiki_lang) wikipedia.set_rate_limiting(False) self.logger.info(f"Wikipedia library language set to '{wiki_lang}'. Rate limiting DISABLED.") except Exception as e: self.logger.warning(f"Fehler beim Setzen der Wikipedia-Sprache oder Rate Limiting: {e}") def _get_full_domain(self, website): """Extrahiert die normalisierte Domain (ohne www, ohne Pfad) aus einer URL.""" return simple_normalize_url(website) def _generate_search_terms(self, company_name, website): """ Generiert eine Liste von Suchbegriffen fuer die Wikipedia-Suche. """ if not company_name: return [] terms = set() original_name_cleaned = str(company_name).strip() if original_name_cleaned: terms.add(original_name_cleaned) normalized_name = normalize_company_name(company_name) if normalized_name: terms.add(normalized_name) name_parts = normalized_name.split() if len(name_parts) > 0: terms.add(name_parts[0]) if len(name_parts) > 1: terms.add(" ".join(name_parts[:2])) full_domain = self._get_full_domain(website) if full_domain != "k.A.": terms.add(full_domain) final_terms = [term for term in list(terms) if term][:getattr(Config, 'WIKIPEDIA_SEARCH_RESULTS', 5)] self.logger.debug(f"Generierte Suchbegriffe fuer '{company_name[:100]}...': {final_terms}") return final_terms @retry_on_failure def _get_page_soup(self, url): """ Holt HTML von einer URL und gibt ein BeautifulSoup-Objekt zurueck. """ if not url or not isinstance(url, str) or not url.lower().startswith(("http://", "https://")): self.logger.warning(f"_get_page_soup: Ungueltige URL '{url[:100]}...'.") return None try: self.logger.debug(f"_get_page_soup: Rufe URL ab: {url[:100]}...") response = self.session.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15)) response.raise_for_status() response.encoding = response.apparent_encoding soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser')) return soup except Exception as e: self.logger.error(f"_get_page_soup: Fehler beim Abrufen oder Parsen von HTML von {url[:100]}...: {e}") raise e def _validate_article(self, page, company_name, website): """ Validiert, ob ein Wikipedia-Artikel zum Unternehmen passt. """ if not page or not company_name: return False self.logger.debug(f"Validiere Artikel '{page.title[:100]}...' fuer Firma '{company_name[:100]}'") normalized_company = normalize_company_name(company_name) normalized_title = normalize_company_name(page.title) if not normalized_company or not normalized_title: self.logger.warning("Validierung nicht moeglich, da Normalisierung eines Namens fehlschlug.") return False standard_threshold = getattr(Config, 'SIMILARITY_THRESHOLD', 0.65) similarity = fuzzy_similarity(normalized_title, normalized_company) company_tokens = normalized_company.split() title_tokens = normalized_title.split() first_word_match = False first_two_words_match = False if company_tokens and title_tokens and company_tokens[0] == title_tokens[0]: first_word_match = True if len(company_tokens) > 1 and len(title_tokens) > 1 and company_tokens[1] == title_tokens[1]: first_two_words_match = True domain_found = False full_domain = self._get_full_domain(website) if full_domain != "k.A.": try: article_html = page.html() if article_html: soup = BeautifulSoup(article_html, getattr(Config, 'HTML_PARSER', 'html.parser')) external_links = soup.select('a[href^="http"]') for link_tag in external_links: href = link_tag.get('href', '') if href and isinstance(href, str) and full_domain in simple_normalize_url(href): if not any(ex in href.lower() for ex in ['wikipedia.org', 'wikimedia.org', 'wikidata.org', 'archive.org', 'webcitation.org']): domain_found = True break except KeyError as e_key: if 'extlinks' in str(e_key).lower(): self.logger.warning(f"KeyError ('{e_key}') bei Domain-Check für Artikel '{page.title[:100]}...'. Domain-Validierung übersprungen.") else: self.logger.error(f"Unerwarteter KeyError bei Domain-Prüfung für '{page.title[:100]}...': {e_key}") except Exception as e_link_check: self.logger.error(f"Allgemeiner Fehler waehrend der Domain-Link-Pruefung fuer '{page.title[:100]}...': {e_link_check}") is_valid = False reason = "Keine Validierungsregel traf zu" if similarity >= standard_threshold: is_valid = True; reason = f"Gesamt-Aehnlichkeit ({similarity:.2f}) >= Schwelle ({standard_threshold:.2f})" elif domain_found and first_two_words_match: is_valid = True; reason = f"Domain gefunden UND erste 2 Worte stimmen ueberein" elif domain_found and first_word_match and similarity >= 0.40: is_valid = True; reason = f"Domain gefunden UND erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.40" elif first_two_words_match and similarity >= 0.45: is_valid = True; reason = f"Erste zwei Worte stimmen ueberein UND Aehnlichkeit >= 0.45" elif domain_found and similarity >= 0.50: is_valid = True; reason = f"Domain gefunden UND Aehnlichkeit >= 0.50" elif first_word_match and similarity >= 0.55: is_valid = True; reason = f"Erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.55" log_level = logging.INFO if is_valid else logging.DEBUG self.logger.log(log_level, f" => Artikel '{page.title[:100]}...' {'VALIDIERT' if is_valid else 'NICHT validiert'} (Grund: {reason})") return is_valid def search_company_article(self, company_name, website=None, max_recursion_depth=1): """ Sucht einen passenden Wikipedia-Artikel fuer das Unternehmen und gibt das wikipedia.WikipediaPage Objekt zurueck, wenn ein relevanter und validierter Artikel gefunden wird. Behandelt explizit Begriffsklaerungsseiten. """ if not company_name or str(company_name).strip() == "": self.logger.warning("Wikipedia search skipped: No company name provided.") raise ValueError("Kein Firmenname fuer Wikipedia Suche angegeben.") search_terms = self._generate_search_terms(company_name, website) if not search_terms: self.logger.warning(f"Keine Suchbegriffe fuer '{company_name[:100]}...' generiert.") return None self.logger.info(f"Starte Wikipedia-Suche fuer '{company_name[:100]}...' mit Begriffen: {search_terms}") processed_titles = set() original_search_name_norm = normalize_company_name(company_name) def check_page_recursive(title_to_check, current_depth): if title_to_check in processed_titles or current_depth > max_recursion_depth: return None processed_titles.add(title_to_check) self.logger.debug(f" -> Pruefe potenziellen Artikel: '{title_to_check[:100]}...' (Tiefe: {current_depth})") normalized_option_title_local = normalize_company_name(title_to_check) title_similarity_to_original = fuzzy_similarity(normalized_option_title_local, original_search_name_norm) if current_depth > 0 and title_similarity_to_original < 0.3: self.logger.debug(f" -> Option '{title_to_check[:100]}' hat zu geringe Ähnlichkeit ({title_similarity_to_original:.2f}). Übersprungen.") return None page = None try: page = wikipedia.page(title_to_check, auto_suggest=False, preload=False, redirect=True) if self._validate_article(page, company_name, website): self.logger.info(f" -> Titel '{page.title[:100]}...' erfolgreich validiert!") return page else: return None except wikipedia.exceptions.DisambiguationError as e_disamb: self.logger.info(f" -> Begriffsklaerung '{e_disamb.title}' gefunden (Tiefe {current_depth}). Pruefe Optionen...") if current_depth >= max_recursion_depth: return None relevant_options = [] for option in e_disamb.options: option_lower = option.lower() if not any(ex in option_lower for ex in ["(person)", "(familienname)"]) and len(option) < 80: if fuzzy_similarity(normalize_company_name(option), original_search_name_norm) > 0.3: relevant_options.append(option) for option_to_check in relevant_options[:3]: validated_page = check_page_recursive(option_to_check, current_depth + 1) if validated_page: return validated_page return None except Exception as e_page: title_for_log = page.title[:100] if page and hasattr(page, 'title') and page.title else title_to_check[:100] if "extlinks" in str(e_page).lower(): self.logger.warning(f" -> KeyError ('extlinks') beim Verarbeiten von Titel '{title_for_log}...'. Übersprungen.") else: self.logger.error(f" -> Unerwarteter Fehler bei Verarbeitung von Seite '{title_for_log}': {e_page}") return None # Hauptlogik der Suche self.logger.debug(f" -> Versuche direkten Match fuer '{company_name[:100]}...'") page_found = check_page_recursive(company_name, 0) if page_found: return page_found self.logger.debug(f" -> Kein direkter Treffer. Starte Suche mit generierten Begriffen...") for term in search_terms: if term == company_name: continue self.logger.debug(f" -> Versuche Suchbegriff: '{term[:100]}...'") page_found = check_page_recursive(term, 0) if page_found: return page_found self.logger.warning(f"Kein passender & validierter Wikipedia-Artikel fuer '{company_name[:100]}...' gefunden.") return None def _extract_first_paragraph_from_soup(self, soup): """ Extrahiert den ersten aussagekraeftigen Absatz aus dem Soup-Objekt eines Wikipedia-Artikels. """ if not soup: return "k.A." paragraph_text = "k.A." try: content_div = soup.find('div', class_='mw-parser-output') search_area = content_div if content_div else soup paragraphs = search_area.find_all('p', recursive=False) if not paragraphs: paragraphs = search_area.find_all('p') for p in paragraphs: for sup in p.find_all('sup', class_='reference'): sup.decompose() for span in p.find_all('span', style=lambda v: v and 'display:none' in v): span.decompose() for span in p.find_all('span', id='coordinates'): span.decompose() text = clean_text(p.get_text(separator=' ', strip=True)) if text != "k.A." and len(text) > 50 and not re.match(r'^(Datei:|Abbildung:|Siehe auch:|Einzelnachweise|Siehe auch|Literatur)', text, re.IGNORECASE): paragraph_text = text[:1500] break except Exception as e: self.logger.error(f"Fehler beim Extrahieren des ersten Absatzes: {e}") return paragraph_text def extract_categories(self, soup): """ Extrahiert Wikipedia-Kategorien aus dem Soup-Objekt. """ if not soup: return "k.A." cats_filtered = [] try: cat_div = soup.find('div', id="mw-normal-catlinks") if cat_div: ul = cat_div.find('ul') if ul: cats = [clean_text(li.get_text()) for li in ul.find_all('li')] cats_filtered = [c for c in cats if c and isinstance(c, str) and c.strip() and "kategorien:" not in c.lower()] except Exception as e: self.logger.error(f"Fehler beim Extrahieren der Kategorien: {e}") return ", ".join(cats_filtered) if cats_filtered else "k.A." def _extract_infobox_value(self, soup, target): """ Extrahiert gezielt Werte (Branche, Umsatz, etc.) aus der Infobox. """ if not soup or target not in self.keywords_map: return "k.A." keywords = self.keywords_map[target] infobox = soup.select_one('table[class*="infobox"]') if not infobox: return "k.A." value_found = "k.A." try: rows = infobox.find_all('tr') for row in rows: cells = row.find_all(['th', 'td'], recursive=False) header_text, value_cell = None, None if len(cells) >= 2: if cells[0].name == 'th': header_text, value_cell = cells[0].get_text(strip=True), cells[1] elif cells[0].name == 'td' and cells[1].name == 'td': style = cells[0].get('style', '').lower() is_header_like = 'font-weight' in style and ('bold' in style or '700' in style) or cells[0].find(['b', 'strong'], recursive=False) if is_header_like: header_text, value_cell = cells[0].get_text(strip=True), cells[1] if header_text and value_cell: if any(kw in header_text.lower() for kw in keywords): for sup in value_cell.find_all(['sup', 'span']): sup.decompose() raw_value_text = value_cell.get_text(separator=' ', strip=True) if target == 'branche' or target == 'sitz': value_found = clean_text(raw_value_text).split('\n')[0].strip() elif target == 'umsatz': value_found = extract_numeric_value(raw_value_text, is_umsatz=True) elif target == 'mitarbeiter': value_found = extract_numeric_value(raw_value_text, is_umsatz=False) value_found = value_found if value_found else "k.A." self.logger.info(f" --> Infobox '{target}' gefunden: '{value_found}'") break except Exception as e: self.logger.exception(f"Fehler beim Durchlaufen der Infobox-Zeilen fuer '{target}': {e}") return "k.A. (Fehler Extraktion)" return value_found def _parse_sitz_string_detailed(self, raw_sitz_string_input): """ Versucht, aus einem rohen Sitz-String Stadt und Land detailliert zu extrahieren. """ sitz_stadt_val, sitz_land_val = "k.A.", "k.A." if not raw_sitz_string_input or not isinstance(raw_sitz_string_input, str): return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} temp_sitz = raw_sitz_string_input.strip() if not temp_sitz or temp_sitz.lower() == "k.a.": return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} # Diese Mappings könnten in die Config ausgelagert werden known_countries_detailed = { "deutschland": "Deutschland", "germany": "Deutschland", "de": "Deutschland", "österreich": "Österreich", "austria": "Österreich", "at": "Österreich", "schweiz": "Schweiz", "switzerland": "Schweiz", "ch": "Schweiz", "suisse": "Schweiz", "usa": "USA", "u.s.": "USA", "united states": "USA", "vereinigte staaten": "USA", "vereinigtes königreich": "Vereinigtes Königreich", "united kingdom": "Vereinigtes Königreich", "uk": "Vereinigtes Königreich", } region_to_country = { "nrw": "Deutschland", "nordrhein-westfalen": "Deutschland", "bayern": "Deutschland", "hessen": "Deutschland", "zg": "Schweiz", "zug": "Schweiz", "zh": "Schweiz", "zürich": "Schweiz", "ca": "USA", "california": "USA", "ny": "USA", "new york": "USA", } extracted_country = "" original_temp_sitz = temp_sitz klammer_match = re.search(r'\(([^)]+)\)$', temp_sitz) if klammer_match: suffix_in_klammer = klammer_match.group(1).strip().lower() if suffix_in_klammer in known_countries_detailed: extracted_country = known_countries_detailed[suffix_in_klammer] temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") elif suffix_in_klammer in region_to_country: extracted_country = region_to_country[suffix_in_klammer] temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") if not extracted_country and ',' in temp_sitz: parts = [p.strip() for p in temp_sitz.split(',')] if len(parts) > 1: last_part_lower = parts[-1].lower() if last_part_lower in known_countries_detailed: extracted_country = known_countries_detailed[last_part_lower] temp_sitz = ", ".join(parts[:-1]).strip(" ,") elif last_part_lower in region_to_country: extracted_country = region_to_country[last_part_lower] temp_sitz = ", ".join(parts[:-1]).strip(" ,") sitz_land_val = extracted_country if extracted_country else "k.A." sitz_stadt_val = re.sub(r'^\d{4,8}\s*', '', temp_sitz).strip(" ,") if not sitz_stadt_val: sitz_stadt_val = "k.A." if sitz_land_val != "k.A." else re.sub(r'^\d{4,8}\s*', '', original_temp_sitz).strip(" ,") or "k.A." return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} @retry_on_failure def extract_company_data(self, page_url): """ Extrahiert Firmendaten von einer gegebenen Wikipedia-Artikel-URL. """ default_result = {'url': page_url or 'k.A.', 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'} if not page_url or not isinstance(page_url, str) or "wikipedia.org/wiki/" not in page_url.lower(): self.logger.warning(f"extract_company_data: Ungueltige URL '{page_url[:100]}...'.") return default_result self.logger.info(f"Extrahiere Daten fuer Wiki-URL: {page_url[:100]}...") soup = self._get_page_soup(page_url) if not soup: self.logger.error(f" -> Fehler: Konnte Seite {page_url[:100]}... nicht laden oder parsen.") return default_result first_paragraph = self._extract_first_paragraph_from_soup(soup) categories_val = self.extract_categories(soup) branche_val = self._extract_infobox_value(soup, 'branche') umsatz_val = self._extract_infobox_value(soup, 'umsatz') mitarbeiter_val = self._extract_infobox_value(soup, 'mitarbeiter') raw_sitz_string = self._extract_infobox_value(soup, 'sitz') parsed_sitz = self._parse_sitz_string_detailed(raw_sitz_string) sitz_stadt_val = parsed_sitz['sitz_stadt'] sitz_land_val = parsed_sitz['sitz_land'] result = { 'url': page_url, 'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val, 'first_paragraph': first_paragraph, 'branche': branche_val, 'umsatz': umsatz_val, 'mitarbeiter': mitarbeiter_val, 'categories': categories_val } self.logger.info(f" -> Extrahierte Daten: Stadt='{sitz_stadt_val}', Land='{sitz_land_val}', U='{umsatz_val}', M='{mitarbeiter_val}'") return result