# --- START OF FILE wikipedia_scraper.py --- #!/usr/bin/env python3 """ wikipedia_scraper.py Klasse zur Kapselung der Interaktionen mit Wikipedia, inklusive Suche, Validierung und Extraktion von Unternehmensdaten. """ import logging import re import time import traceback from urllib.parse import unquote import requests import wikipedia from bs4 import BeautifulSoup # Import der abhängigen Module from config import Config from helpers import (retry_on_failure, simple_normalize_url, normalize_company_name, extract_numeric_value, clean_text, fuzzy_similarity) class WikipediaScraper: """ Handhabt das Suchen von Wikipedia-Artikeln und das Extrahieren relevanter Unternehmensdaten. Beinhaltet Validierungslogik fuer Artikel. Nutzt die wikipedia-Bibliothek und Requests fuer direktes HTML-Scraping. """ def __init__(self, user_agent=None): """ Initialisiert den Scraper mit einer Requests-Session und konfigurierter Wikipedia-Bibliothek. """ self.logger = logging.getLogger(__name__ + ".WikipediaScraper") self.logger.debug("WikipediaScraper initialisiert.") self.user_agent = user_agent or getattr(Config, 'USER_AGENT', 'Mozilla/5.0 (compatible; UnternehmenSkript/1.0; +http://www.example.com/bot)') self.session = requests.Session() self.session.headers.update({'User-Agent': self.user_agent}) self.logger.debug(f"Requests Session mit User-Agent '{self.user_agent}' initialisiert.") self.keywords_map = { 'branche': ['branche', 'wirtschaftszweig', 'industry', 'taetigkeit', 'sektor', 'produkte', 'leistungen'], 'umsatz': ['umsatz', 'erloes', 'revenue', 'jahresumsatz', 'konzernumsatz', 'ergebnis'], 'mitarbeiter': ['mitarbeiter', 'mitarbeiterzahl', 'beschaeftigte', 'employees', 'number of employees', 'personal', 'belegschaft'], 'sitz': ['sitz', 'hauptsitz', 'unternehmenssitz', 'firmensitz', 'headquarters', 'standort', 'sitz des unternehmens', 'anschrift', 'adresse'] } try: wiki_lang = getattr(Config, 'LANG', 'de') wikipedia.set_lang(wiki_lang) wikipedia.set_rate_limiting(False) self.logger.info(f"Wikipedia library language set to '{wiki_lang}'. Rate limiting DISABLED.") except Exception as e: self.logger.warning(f"Fehler beim Setzen der Wikipedia-Sprache oder Rate Limiting: {e}") def _get_full_domain(self, website): """Extrahiert die normalisierte Domain (ohne www, ohne Pfad) aus einer URL.""" return simple_normalize_url(website) def _generate_search_terms(self, company_name, website=None): """ Generiert eine Liste von potenziellen Wikipedia-Artikeltiteln. v2.0: Mit verbesserter Logik für Namen, die Zahlen enthalten. """ if not company_name: return [] normalized = normalize_company_name(company_name) # Verbesserte Logik für Namen wie "11 88 0 Solutions" condensed_normalized = None if re.search(r'\d[\s\d]+\d', normalized): condensed_normalized = re.sub(r'(\d)\s+(\d)', r'\1\2', normalized) condensed_normalized = normalize_company_name(condensed_normalized) search_terms = [] if condensed_normalized: search_terms.append(condensed_normalized) search_terms.append(company_name) search_terms.append(normalized) parts = normalized.split() if len(parts) > 1: search_terms.append(parts[0]) search_terms.append(" ".join(parts[:2])) if website: domain = simple_normalize_url(website) if domain != "k.A.": search_terms.append(domain) unique_terms = list(dict.fromkeys([term for term in search_terms if term])) # Entfernt Duplikate, behält Reihenfolge return unique_terms[:5] @retry_on_failure def _get_page_soup(self, url): """ Holt HTML von einer URL und gibt ein BeautifulSoup-Objekt zurueck. """ if not url or not isinstance(url, str) or not url.lower().startswith(("http://", "https://")): self.logger.warning(f"_get_page_soup: Ungueltige URL '{url[:100]}...'.") return None try: self.logger.debug(f"_get_page_soup: Rufe URL ab: {url[:100]}...") response = self.session.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15)) response.raise_for_status() response.encoding = response.apparent_encoding soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser')) return soup except Exception as e: self.logger.error(f"_get_page_soup: Fehler beim Abrufen oder Parsen von HTML von {url[:100]}...: {e}") raise e def _validate_article(self, page, company_name, website, parent_name=None): """ Validiert, ob ein Wikipedia-Artikel zum Unternehmen passt. v2.0: Nutzt parent_name als primäres Kriterium. Ihre bestehenden Regeln bleiben als Fallback erhalten. """ if not page or not company_name: return False self.logger.debug(f"Validiere Artikel '{page.title[:100]}...' fuer Firma '{company_name[:100]}'") # --- Stufe 1: Parent-Validierung (höchste Priorität) --- normalized_parent = normalize_company_name(parent_name) if parent_name else None if normalized_parent: # Überprüfe Titel und den ersten Absatz (Summary) auf den Parent-Namen page_content_for_check = (page.title + " " + page.summary).lower() if normalized_parent in page_content_for_check: reason = f"Parent-Name '{parent_name}' im Artikel-Titel oder -Summary gefunden." self.logger.info(f" => Artikel '{page.title[:100]}...' VALIDIERT (Grund: {reason})") return True # --- Stufe 2: Ihre bestehende, detaillierte Validierungslogik als Fallback --- normalized_company = normalize_company_name(company_name) normalized_title = normalize_company_name(page.title) if not normalized_company or not normalized_title: self.logger.warning("Validierung nicht moeglich, da Normalisierung eines Namens fehlschlug.") return False standard_threshold = getattr(Config, 'SIMILARITY_THRESHOLD', 0.65) similarity = fuzzy_similarity(normalized_title, normalized_company) company_tokens = normalized_company.split() title_tokens = normalized_title.split() first_word_match = False first_two_words_match = False if company_tokens and title_tokens and company_tokens[0] == title_tokens[0]: first_word_match = True if len(company_tokens) > 1 and len(title_tokens) > 1 and company_tokens[1] == title_tokens[1]: first_two_words_match = True domain_found = False full_domain = self._get_full_domain(website) if full_domain != "k.A.": try: # page.html() kann fehleranfällig sein, wir prüfen den gerenderten Text (page.content) if page.content and full_domain in page.content.lower(): domain_found = True except Exception as e_link_check: self.logger.error(f"Allgemeiner Fehler waehrend der Domain-Pruefung fuer '{page.title[:100]}...': {e_link_check}") is_valid = False reason = "" self.logger.debug(f" Validierungs-Check (Fallback) für '{page.title[:50]}...':") self.logger.debug(f" - Aehnlichkeit: {similarity:.2f} (Schwelle: {standard_threshold:.2f})") self.logger.debug(f" - Domain '{full_domain}' im Artikel gefunden: {domain_found}") self.logger.debug(f" - Erstes Wort identisch: {first_word_match}") self.logger.debug(f" - Erste 2 Worte identisch: {first_two_words_match}") if similarity >= standard_threshold: is_valid, reason = True, f"Gesamt-Aehnlichkeit ({similarity:.2f}) >= Schwelle ({standard_threshold:.2f})" elif domain_found and first_two_words_match: is_valid, reason = True, "Domain gefunden UND erste 2 Worte stimmen ueberein" elif domain_found and first_word_match and similarity >= 0.40: is_valid, reason = True, "Domain gefunden UND erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.40" elif first_two_words_match and similarity >= 0.45: is_valid, reason = True, "Erste zwei Worte stimmen ueberein UND Aehnlichkeit >= 0.45" elif domain_found and similarity >= 0.50: is_valid, reason = True, "Domain gefunden UND Aehnlichkeit >= 0.50" elif first_word_match and similarity >= 0.55: is_valid, reason = True, "Erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.55" else: reason = "Keine der Fallback-Validierungsregeln traf zu" log_level = logging.INFO if is_valid else logging.DEBUG self.logger.log(log_level, f" => Artikel '{page.title[:100]}...' {'VALIDIERT' if is_valid else 'NICHT validiert'} (Grund: {reason})") return is_valid def search_company_article(self, company_name, website=None, parent_name=None, max_recursion_depth=1): """ Sucht einen passenden Wikipedia-Artikel. Behält die komplexe Logik bei und behebt den TypeError. """ if not company_name or str(company_name).strip() == "": return None search_terms = self._generate_search_terms(company_name, website) if not search_terms: return None self.logger.info(f"Starte Wikipedia-Suche fuer '{company_name[:100]}...' mit Begriffen: {search_terms}") processed_titles = set() original_search_name_norm = normalize_company_name(company_name) # Die innere Funktion "erbt" `parent_name` aus dem Scope der äußeren Funktion. def check_page_recursive(title_to_check, current_depth): effective_max_depth = max_recursion_depth if max_recursion_depth is not None else 2 if title_to_check in processed_titles or current_depth > effective_max_depth: return None processed_titles.add(title_to_check) self.logger.debug(f" -> Pruefe potenziellen Artikel: '{title_to_check[:100]}...' (Tiefe: {current_depth})") # Ihre bestehende Logik mit fuzzy_similarity normalized_option_title_local = normalize_company_name(title_to_check) title_similarity_to_original = fuzzy_similarity(normalized_option_title_local, original_search_name_norm) if current_depth > 0 and title_similarity_to_original < 0.3: self.logger.debug(f" -> Option '{title_to_check[:100]}' hat zu geringe Ähnlichkeit ({title_similarity_to_original:.2f}). Übersprungen.") return None page = None try: page = wikipedia.page(title_to_check, auto_suggest=False, preload=False, redirect=True) # KORRIGIERTER AUFRUF: Übergibt `parent_name` aus dem äußeren Scope if self._validate_article(page, company_name, website, parent_name): self.logger.info(f" -> Titel '{page.title[:100]}...' erfolgreich validiert!") return page else: return None except wikipedia.exceptions.PageError: self.logger.debug(f" -> Artikel '{title_to_check[:100]}' nicht gefunden (PageError).") return None except wikipedia.exceptions.DisambiguationError as e_disamb: self.logger.info(f" -> Begriffsklaerung '{e_disamb.title}' gefunden (Tiefe {current_depth}). Pruefe Optionen...") if current_depth >= effective_max_depth: return None # Ihre bestehende Logik zur Filterung von Optionen relevant_options = [] for option in e_disamb.options: option_lower = option.lower() if not any(ex in option_lower for ex in ["(person)", "(familienname)"]) and len(option) < 80: if fuzzy_similarity(normalize_company_name(option), original_search_name_norm) > 0.3: relevant_options.append(option) for option_to_check in relevant_options[:3]: validated_page = check_page_recursive(option_to_check, current_depth + 1) if validated_page: return validated_page return None except Exception as e_page: # Ihre bestehende Fehlerbehandlung title_for_log = page.title[:100] if page and hasattr(page, 'title') and page.title else title_to_check[:100] self.logger.error(f" -> Unerwarteter Fehler bei Verarbeitung von Seite '{title_for_log}': {e_page}") return None # Ihre bestehende Hauptlogik der Suche for term in search_terms: page_found = check_page_recursive(term, 0) if page_found: return page_found self.logger.warning(f"Kein passender & validierter Wikipedia-Artikel fuer '{company_name[:100]}...' gefunden.") return None def _extract_first_paragraph_from_soup(self, soup): """ Extrahiert den ersten aussagekraeftigen Absatz aus dem Soup-Objekt eines Wikipedia-Artikels. """ if not soup: return "k.A." paragraph_text = "k.A." try: content_div = soup.find('div', class_='mw-parser-output') search_area = content_div if content_div else soup paragraphs = search_area.find_all('p', recursive=False) if not paragraphs: paragraphs = search_area.find_all('p') for p in paragraphs: for sup in p.find_all('sup', class_='reference'): sup.decompose() for span in p.find_all('span', style=lambda v: v and 'display:none' in v): span.decompose() for span in p.find_all('span', id='coordinates'): span.decompose() text = clean_text(p.get_text(separator=' ', strip=True)) if text != "k.A." and len(text) > 50 and not re.match(r'^(Datei:|Abbildung:|Siehe auch:|Einzelnachweise|Siehe auch|Literatur)', text, re.IGNORECASE): paragraph_text = text[:1500] break except Exception as e: self.logger.error(f"Fehler beim Extrahieren des ersten Absatzes: {e}") return paragraph_text def extract_categories(self, soup): """ Extrahiert Wikipedia-Kategorien aus dem Soup-Objekt. """ if not soup: return "k.A." cats_filtered = [] try: cat_div = soup.find('div', id="mw-normal-catlinks") if cat_div: ul = cat_div.find('ul') if ul: cats = [clean_text(li.get_text()) for li in ul.find_all('li')] cats_filtered = [c for c in cats if c and isinstance(c, str) and c.strip() and "kategorien:" not in c.lower()] except Exception as e: self.logger.error(f"Fehler beim Extrahieren der Kategorien: {e}") return ", ".join(cats_filtered) if cats_filtered else "k.A." def _extract_infobox_value(self, soup, target): """ Extrahiert gezielt Werte (Branche, Umsatz, etc.) aus der Infobox. """ if not soup or target not in self.keywords_map: return "k.A." keywords = self.keywords_map[target] infobox = soup.select_one('table[class*="infobox"]') if not infobox: return "k.A." value_found = "k.A." try: rows = infobox.find_all('tr') for row in rows: cells = row.find_all(['th', 'td'], recursive=False) header_text, value_cell = None, None if len(cells) >= 2: if cells[0].name == 'th': header_text, value_cell = cells[0].get_text(strip=True), cells[1] elif cells[0].name == 'td' and cells[1].name == 'td': style = cells[0].get('style', '').lower() is_header_like = 'font-weight' in style and ('bold' in style or '700' in style) or cells[0].find(['b', 'strong'], recursive=False) if is_header_like: header_text, value_cell = cells[0].get_text(strip=True), cells[1] if header_text and value_cell: if any(kw in header_text.lower() for kw in keywords): for sup in value_cell.find_all(['sup', 'span']): sup.decompose() raw_value_text = value_cell.get_text(separator=' ', strip=True) if target == 'branche' or target == 'sitz': value_found = clean_text(raw_value_text).split('\n')[0].strip() elif target == 'umsatz': value_found = extract_numeric_value(raw_value_text, is_umsatz=True) elif target == 'mitarbeiter': value_found = extract_numeric_value(raw_value_text, is_umsatz=False) value_found = value_found if value_found else "k.A." self.logger.info(f" --> Infobox '{target}' gefunden: '{value_found}'") break except Exception as e: self.logger.exception(f"Fehler beim Durchlaufen der Infobox-Zeilen fuer '{target}': {e}") return "k.A. (Fehler Extraktion)" return value_found def _parse_sitz_string_detailed(self, raw_sitz_string_input): """ Versucht, aus einem rohen Sitz-String Stadt und Land detailliert zu extrahieren. """ sitz_stadt_val, sitz_land_val = "k.A.", "k.A." if not raw_sitz_string_input or not isinstance(raw_sitz_string_input, str): return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} temp_sitz = raw_sitz_string_input.strip() if not temp_sitz or temp_sitz.lower() == "k.a.": return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} # Diese Mappings könnten in die Config ausgelagert werden known_countries_detailed = { "deutschland": "Deutschland", "germany": "Deutschland", "de": "Deutschland", "österreich": "Österreich", "austria": "Österreich", "at": "Österreich", "schweiz": "Schweiz", "switzerland": "Schweiz", "ch": "Schweiz", "suisse": "Schweiz", "usa": "USA", "u.s.": "USA", "united states": "USA", "vereinigte staaten": "USA", "vereinigtes königreich": "Vereinigtes Königreich", "united kingdom": "Vereinigtes Königreich", "uk": "Vereinigtes Königreich", } region_to_country = { "nrw": "Deutschland", "nordrhein-westfalen": "Deutschland", "bayern": "Deutschland", "hessen": "Deutschland", "zg": "Schweiz", "zug": "Schweiz", "zh": "Schweiz", "zürich": "Schweiz", "ca": "USA", "california": "USA", "ny": "USA", "new york": "USA", } extracted_country = "" original_temp_sitz = temp_sitz klammer_match = re.search(r'\(([^)]+)\)$', temp_sitz) if klammer_match: suffix_in_klammer = klammer_match.group(1).strip().lower() if suffix_in_klammer in known_countries_detailed: extracted_country = known_countries_detailed[suffix_in_klammer] temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") elif suffix_in_klammer in region_to_country: extracted_country = region_to_country[suffix_in_klammer] temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") if not extracted_country and ',' in temp_sitz: parts = [p.strip() for p in temp_sitz.split(',')] if len(parts) > 1: last_part_lower = parts[-1].lower() if last_part_lower in known_countries_detailed: extracted_country = known_countries_detailed[last_part_lower] temp_sitz = ", ".join(parts[:-1]).strip(" ,") elif last_part_lower in region_to_country: extracted_country = region_to_country[last_part_lower] temp_sitz = ", ".join(parts[:-1]).strip(" ,") sitz_land_val = extracted_country if extracted_country else "k.A." sitz_stadt_val = re.sub(r'^\d{4,8}\s*', '', temp_sitz).strip(" ,") if not sitz_stadt_val: sitz_stadt_val = "k.A." if sitz_land_val != "k.A." else re.sub(r'^\d{4,8}\s*', '', original_temp_sitz).strip(" ,") or "k.A." return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} @retry_on_failure def extract_company_data(self, url_or_page): """ Extrahiert strukturierte Unternehmensdaten aus einem Wikipedia-Artikel (URL oder page-Objekt). Gibt nun auch den gesamten Rohtext des Artikels ('full_text') und den Titel zurück. """ default_result = { 'url': 'k.A.', 'title': 'k.A.', 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.', 'full_text': '' } page = None try: if isinstance(url_or_page, str) and "wikipedia.org" in url_or_page: page_title = unquote(url_or_page.split('/wiki/')[-1].replace('_', ' ')) page = wikipedia.page(title=page_title, auto_suggest=False, redirect=True) elif not isinstance(url_or_page, str): # Annahme: es ist ein page-Objekt page = url_or_page else: self.logger.warning(f"extract_company_data: Ungültiger Input '{str(url_or_page)[:100]}...'.") return default_result self.logger.info(f"Extrahiere Daten für Wiki-Artikel: {page.title[:100]}...") # Grundlegende Daten direkt aus dem page-Objekt extrahieren first_paragraph = page.summary.split('\n')[0] if page.summary else 'k.A.' categories = ", ".join(page.categories) full_text = page.content # Für Infobox-Daten benötigen wir weiterhin BeautifulSoup, da die 'wikipedia'-Bibliothek # keinen strukturierten Zugriff darauf bietet. soup = self._get_page_soup(page.url) if not soup: self.logger.warning(f" -> Konnte Seite für Soup-Parsing nicht laden. Extrahiere nur Basis-Daten.") # Fallback, wenn Soup fehlschlägt return { 'url': page.url, 'title': page.title, 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.', 'first_paragraph': first_paragraph, 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': categories, 'full_text': full_text } # Extraktion der Infobox-Daten mit den bestehenden Helper-Funktionen branche_val = self._extract_infobox_value(soup, 'branche') umsatz_val = self._extract_infobox_value(soup, 'umsatz') mitarbeiter_val = self._extract_infobox_value(soup, 'mitarbeiter') raw_sitz_string = self._extract_infobox_value(soup, 'sitz') parsed_sitz = self._parse_sitz_string_detailed(raw_sitz_string) sitz_stadt_val = parsed_sitz['sitz_stadt'] sitz_land_val = parsed_sitz['sitz_land'] # Sammle die finalen Daten result = { 'url': page.url, 'title': page.title, 'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val, 'first_paragraph': first_paragraph, 'branche': branche_val, 'umsatz': umsatz_val, 'mitarbeiter': mitarbeiter_val, 'categories': categories, 'full_text': full_text } self.logger.info(f" -> Extrahierte Daten: Stadt='{sitz_stadt_val}', Land='{sitz_land_val}', U='{umsatz_val}', M='{mitarbeiter_val}'") return result except wikipedia.exceptions.PageError: self.logger.error(f" -> Fehler: Wikipedia-Artikel für '{str(url_or_page)[:100]}' konnte nicht gefunden werden (PageError).") return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'} except Exception as e: self.logger.error(f" -> Unerwarteter Fehler bei der Extraktion von '{str(url_or_page)[:100]}': {e}") return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'}