#!/usr/bin/env python3 """ wikipedia_scraper.py Klasse zur Kapselung der Interaktionen mit Wikipedia, inklusive Suche, Validierung und Extraktion von Unternehmensdaten. """ __version__ = "v2.0.1" import logging import re import time import traceback from urllib.parse import unquote import requests import wikipedia from bs4 import BeautifulSoup # Import der abhängigen Module from config import Config from helpers import (retry_on_failure, simple_normalize_url, normalize_company_name, extract_numeric_value, clean_text, fuzzy_similarity) class WikipediaScraper: """ Handhabt das Suchen von Wikipedia-Artikeln und das Extrahieren relevanter Unternehmensdaten. Beinhaltet Validierungslogik fuer Artikel. Nutzt die wikipedia-Bibliothek und Requests fuer direktes HTML-Scraping. """ def __init__(self, user_agent=None): """ Initialisiert den Scraper mit einer Requests-Session und konfigurierter Wikipedia-Bibliothek. """ self.logger = logging.getLogger(__name__ + ".WikipediaScraper") self.logger.debug("WikipediaScraper initialisiert.") self.user_agent = user_agent or getattr(Config, 'USER_AGENT', 'Mozilla/5.0 (compatible; UnternehmenSkript/1.0; +http://www.example.com/bot)') self.session = requests.Session() self.session.headers.update({'User-Agent': self.user_agent}) self.logger.debug(f"Requests Session mit User-Agent '{self.user_agent}' initialisiert.") self.keywords_map = { 'branche': ['branche', 'wirtschaftszweig', 'industry', 'taetigkeit', 'sektor', 'produkte', 'leistungen'], 'umsatz': ['umsatz', 'erloes', 'revenue', 'jahresumsatz', 'konzernumsatz', 'ergebnis'], 'mitarbeiter': ['mitarbeiter', 'mitarbeiterzahl', 'beschaeftigte', 'employees', 'number of employees', 'personal', 'belegschaft'], 'sitz': ['sitz', 'hauptsitz', 'unternehmenssitz', 'firmensitz', 'headquarters', 'standort', 'sitz des unternehmens', 'anschrift', 'adresse'] } try: wiki_lang = getattr(Config, 'LANG', 'de') wikipedia.set_lang(wiki_lang) wikipedia.set_rate_limiting(False) self.logger.info(f"Wikipedia library language set to '{wiki_lang}'. Rate limiting DISABLED.") except Exception as e: self.logger.warning(f"Fehler beim Setzen der Wikipedia-Sprache oder Rate Limiting: {e}") def _get_full_domain(self, website): """Extrahiert die normalisierte Domain (ohne www, ohne Pfad) aus einer URL.""" return simple_normalize_url(website) def _generate_search_terms(self, company_name, website=None): """ Generiert eine Liste von potenziellen Wikipedia-Artikeltiteln. v2.0: Mit verbesserter Logik für Namen, die Zahlen enthalten. """ if not company_name: return [] normalized = normalize_company_name(company_name) # Verbesserte Logik für Namen wie "11 88 0 Solutions" condensed_normalized = None if re.search(r'\d[\s\d]+\d', normalized): condensed_normalized = re.sub(r'(\d)\s+(\d)', r'\1\2', normalized) condensed_normalized = normalize_company_name(condensed_normalized) search_terms = [] if condensed_normalized: search_terms.append(condensed_normalized) search_terms.append(company_name) search_terms.append(normalized) parts = normalized.split() if len(parts) > 1: search_terms.append(parts[0]) search_terms.append(" ".join(parts[:2])) if website: domain = simple_normalize_url(website) if domain != "k.A.": search_terms.append(domain) unique_terms = list(dict.fromkeys([term for term in search_terms if term])) # Entfernt Duplikate, behält Reihenfolge return unique_terms[:5] @retry_on_failure def _get_page_soup(self, url): """ Holt HTML von einer URL und gibt ein BeautifulSoup-Objekt zurueck. """ if not url or not isinstance(url, str) or not url.lower().startswith(("http://", "https://")): self.logger.warning(f"_get_page_soup: Ungueltige URL '{url[:100]}...'.") return None try: self.logger.debug(f"_get_page_soup: Rufe URL ab: {url[:100]}...") response = self.session.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15)) response.raise_for_status() response.encoding = response.apparent_encoding soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser')) return soup except Exception as e: self.logger.error(f"_get_page_soup: Fehler beim Abrufen oder Parsen von HTML von {url[:100]}...: {e}") raise e def _validate_article(self, page, company_name, website, parent_name=None): """ Validiert, ob ein Wikipedia-Artikel zum Unternehmen passt. v2.0: Nutzt parent_name als primäres Kriterium. Ihre bestehenden Regeln bleiben als Fallback erhalten. """ if not page or not company_name: return False self.logger.debug(f"Validiere Artikel '{page.title[:100]}...' fuer Firma '{company_name[:100]}'") # --- Stufe 1: Parent-Validierung (höchste Priorität) --- normalized_parent = normalize_company_name(parent_name) if parent_name else None if normalized_parent: # Überprüfe Titel und den ersten Absatz (Summary) auf den Parent-Namen page_content_for_check = (page.title + " " + page.summary).lower() if normalized_parent in page_content_for_check: reason = f"Parent-Name '{parent_name}' im Artikel-Titel oder -Summary gefunden." self.logger.info(f" => Artikel '{page.title[:100]}...' VALIDIERT (Grund: {reason})") return True # --- Stufe 2: Ihre bestehende, detaillierte Validierungslogik als Fallback --- normalized_company = normalize_company_name(company_name) normalized_title = normalize_company_name(page.title) if not normalized_company or not normalized_title: self.logger.warning("Validierung nicht moeglich, da Normalisierung eines Namens fehlschlug.") return False standard_threshold = getattr(Config, 'SIMILARITY_THRESHOLD', 0.65) similarity = fuzzy_similarity(normalized_title, normalized_company) company_tokens = normalized_company.split() title_tokens = normalized_title.split() first_word_match = False first_two_words_match = False if company_tokens and title_tokens and company_tokens[0] == title_tokens[0]: first_word_match = True if len(company_tokens) > 1 and len(title_tokens) > 1 and company_tokens[1] == title_tokens[1]: first_two_words_match = True domain_found = False full_domain = self._get_full_domain(website) if full_domain != "k.A.": try: # page.html() kann fehleranfällig sein, wir prüfen den gerenderten Text (page.content) if page.content and full_domain in page.content.lower(): domain_found = True except Exception as e_link_check: self.logger.error(f"Allgemeiner Fehler waehrend der Domain-Pruefung fuer '{page.title[:100]}...': {e_link_check}") is_valid = False reason = "" self.logger.debug(f" Validierungs-Check (Fallback) für '{page.title[:50]}...':") self.logger.debug(f" - Aehnlichkeit: {similarity:.2f} (Schwelle: {standard_threshold:.2f})") self.logger.debug(f" - Domain '{full_domain}' im Artikel gefunden: {domain_found}") self.logger.debug(f" - Erstes Wort identisch: {first_word_match}") self.logger.debug(f" - Erste 2 Worte identisch: {first_two_words_match}") if similarity >= standard_threshold: is_valid, reason = True, f"Gesamt-Aehnlichkeit ({similarity:.2f}) >= Schwelle ({standard_threshold:.2f})" elif domain_found and first_two_words_match: is_valid, reason = True, "Domain gefunden UND erste 2 Worte stimmen ueberein" elif domain_found and first_word_match and similarity >= 0.40: is_valid, reason = True, "Domain gefunden UND erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.40" elif first_two_words_match and similarity >= 0.45: is_valid, reason = True, "Erste zwei Worte stimmen ueberein UND Aehnlichkeit >= 0.45" elif domain_found and similarity >= 0.50: is_valid, reason = True, "Domain gefunden UND Aehnlichkeit >= 0.50" elif first_word_match and similarity >= 0.55: is_valid, reason = True, "Erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.55" else: reason = "Keine der Fallback-Validierungsregeln traf zu" log_level = logging.INFO if is_valid else logging.DEBUG self.logger.log(log_level, f" => Artikel '{page.title[:100]}...' {'VALIDIERT' if is_valid else 'NICHT validiert'} (Grund: {reason})") return is_valid def search_company_article(self, company_name, website=None, parent_name=None, max_recursion_depth=1): """ Sucht einen passenden Wikipedia-Artikel. Behält die komplexe Logik bei und behebt den TypeError. """ if not company_name or str(company_name).strip() == "": return None search_terms = self._generate_search_terms(company_name, website) if not search_terms: return None self.logger.info(f"Starte Wikipedia-Suche fuer '{company_name[:100]}...' mit Begriffen: {search_terms}") processed_titles = set() original_search_name_norm = normalize_company_name(company_name) # Die innere Funktion "erbt" `parent_name` aus dem Scope der äußeren Funktion. def check_page_recursive(title_to_check, current_depth): effective_max_depth = max_recursion_depth if max_recursion_depth is not None else 2 if title_to_check in processed_titles or current_depth > effective_max_depth: return None processed_titles.add(title_to_check) self.logger.debug(f" -> Pruefe potenziellen Artikel: '{title_to_check[:100]}...' (Tiefe: {current_depth})") # Ihre bestehende Logik mit fuzzy_similarity normalized_option_title_local = normalize_company_name(title_to_check) title_similarity_to_original = fuzzy_similarity(normalized_option_title_local, original_search_name_norm) if current_depth > 0 and title_similarity_to_original < 0.3: self.logger.debug(f" -> Option '{title_to_check[:100]}' hat zu geringe Ähnlichkeit ({title_similarity_to_original:.2f}). Übersprungen.") return None page = None try: page = wikipedia.page(title_to_check, auto_suggest=False, preload=False, redirect=True) # KORRIGIERTER AUFRUF: Übergibt `parent_name` aus dem äußeren Scope if self._validate_article(page, company_name, website, parent_name): self.logger.info(f" -> Titel '{page.title[:100]}...' erfolgreich validiert!") return page else: return None except wikipedia.exceptions.PageError: self.logger.debug(f" -> Artikel '{title_to_check[:100]}' nicht gefunden (PageError).") return None except wikipedia.exceptions.DisambiguationError as e_disamb: self.logger.info(f" -> Begriffsklaerung '{e_disamb.title}' gefunden (Tiefe {current_depth}). Pruefe Optionen...") if current_depth >= effective_max_depth: return None # Ihre bestehende Logik zur Filterung von Optionen relevant_options = [] for option in e_disamb.options: option_lower = option.lower() if not any(ex in option_lower for ex in ["(person)", "(familienname)"]) and len(option) < 80: if fuzzy_similarity(normalize_company_name(option), original_search_name_norm) > 0.3: relevant_options.append(option) for option_to_check in relevant_options[:3]: validated_page = check_page_recursive(option_to_check, current_depth + 1) if validated_page: return validated_page return None except Exception as e_page: # Ihre bestehende Fehlerbehandlung title_for_log = page.title[:100] if page and hasattr(page, 'title') and page.title else title_to_check[:100] self.logger.error(f" -> Unerwarteter Fehler bei Verarbeitung von Seite '{title_for_log}': {e_page}") return None # Ihre bestehende Hauptlogik der Suche for term in search_terms: page_found = check_page_recursive(term, 0) if page_found: return page_found self.logger.warning(f"Kein passender & validierter Wikipedia-Artikel fuer '{company_name[:100]}...' gefunden.") return None def _extract_first_paragraph_from_soup(self, soup): """ Extrahiert den ersten aussagekraeftigen Absatz aus dem Soup-Objekt eines Wikipedia-Artikels. """ if not soup: return "k.A." paragraph_text = "k.A." try: content_div = soup.find('div', class_='mw-parser-output') search_area = content_div if content_div else soup paragraphs = search_area.find_all('p', recursive=False) if not paragraphs: paragraphs = search_area.find_all('p') for p in paragraphs: for sup in p.find_all('sup', class_='reference'): sup.decompose() for span in p.find_all('span', style=lambda v: v and 'display:none' in v): span.decompose() for span in p.find_all('span', id='coordinates'): span.decompose() text = clean_text(p.get_text(separator=' ', strip=True)) if text != "k.A." and len(text) > 50 and not re.match(r'^(Datei:|Abbildung:|Siehe auch:|Einzelnachweise|Siehe auch|Literatur)', text, re.IGNORECASE): paragraph_text = text[:1500] break except Exception as e: self.logger.error(f"Fehler beim Extrahieren des ersten Absatzes: {e}") return paragraph_text def extract_categories(self, soup): """ Extrahiert Wikipedia-Kategorien aus dem Soup-Objekt. """ if not soup: return "k.A." cats_filtered = [] try: cat_div = soup.find('div', id="mw-normal-catlinks") if cat_div: ul = cat_div.find('ul') if ul: cats = [clean_text(li.get_text()) for li in ul.find_all('li')] cats_filtered = [c for c in cats if c and isinstance(c, str) and c.strip() and "kategorien:" not in c.lower()] except Exception as e: self.logger.error(f"Fehler beim Extrahieren der Kategorien: {e}") return ", ".join(cats_filtered) if cats_filtered else "k.A." def _extract_infobox_value(self, soup, target): """ Extrahiert gezielt Werte (Branche, Umsatz, etc.) aus der Infobox. """ if not soup or target not in self.keywords_map: return "k.A." keywords = self.keywords_map[target] infobox = soup.select_one('table[class*="infobox"]') if not infobox: return "k.A." value_found = "k.A." try: rows = infobox.find_all('tr') for row in rows: cells = row.find_all(['th', 'td'], recursive=False) header_text, value_cell = None, None if len(cells) >= 2: if cells[0].name == 'th': header_text, value_cell = cells[0].get_text(strip=True), cells[1] elif cells[0].name == 'td' and cells[1].name == 'td': style = cells[0].get('style', '').lower() is_header_like = 'font-weight' in style and ('bold' in style or '700' in style) or cells[0].find(['b', 'strong'], recursive=False) if is_header_like: header_text, value_cell = cells[0].get_text(strip=True), cells[1] if header_text and value_cell: if any(kw in header_text.lower() for kw in keywords): for sup in value_cell.find_all(['sup', 'span']): sup.decompose() raw_value_text = value_cell.get_text(separator=' ', strip=True) if target == 'branche' or target == 'sitz': value_found = clean_text(raw_value_text).split('\n')[0].strip() elif target == 'umsatz': value_found = extract_numeric_value(raw_value_text, is_umsatz=True) elif target == 'mitarbeiter': value_found = extract_numeric_value(raw_value_text, is_umsatz=False) value_found = value_found if value_found else "k.A." self.logger.info(f" --> Infobox '{target}' gefunden: '{value_found}'") break except Exception as e: self.logger.exception(f"Fehler beim Durchlaufen der Infobox-Zeilen fuer '{target}': {e}") return "k.A. (Fehler Extraktion)" return value_found def _parse_sitz_string_detailed(self, raw_sitz_string_input): """ Versucht, aus einem rohen Sitz-String Stadt und Land detailliert zu extrahieren. """ sitz_stadt_val, sitz_land_val = "k.A.", "k.A." if not raw_sitz_string_input or not isinstance(raw_sitz_string_input, str): return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} temp_sitz = raw_sitz_string_input.strip() if not temp_sitz or temp_sitz.lower() == "k.a.": return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} # Diese Mappings könnten in die Config ausgelagert werden known_countries_detailed = { "deutschland": "Deutschland", "germany": "Deutschland", "de": "Deutschland", "österreich": "Österreich", "austria": "Österreich", "at": "Österreich", "schweiz": "Schweiz", "switzerland": "Schweiz", "ch": "Schweiz", "suisse": "Schweiz", "usa": "USA", "u.s.": "USA", "united states": "USA", "vereinigte staaten": "USA", "vereinigtes königreich": "Vereinigtes Königreich", "united kingdom": "Vereinigtes Königreich", "uk": "Vereinigtes Königreich", } region_to_country = { "nrw": "Deutschland", "nordrhein-westfalen": "Deutschland", "bayern": "Deutschland", "hessen": "Deutschland", "zg": "Schweiz", "zug": "Schweiz", "zh": "Schweiz", "zürich": "Schweiz", "ca": "USA", "california": "USA", "ny": "USA", "new york": "USA", } extracted_country = "" original_temp_sitz = temp_sitz klammer_match = re.search(r'\(([^)]+)\)$', temp_sitz) if klammer_match: suffix_in_klammer = klammer_match.group(1).strip().lower() if suffix_in_klammer in known_countries_detailed: extracted_country = known_countries_detailed[suffix_in_klammer] temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") elif suffix_in_klammer in region_to_country: extracted_country = region_to_country[suffix_in_klammer] temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") if not extracted_country and ',' in temp_sitz: parts = [p.strip() for p in temp_sitz.split(',')] if len(parts) > 1: last_part_lower = parts[-1].lower() if last_part_lower in known_countries_detailed: extracted_country = known_countries_detailed[last_part_lower] temp_sitz = ", ".join(parts[:-1]).strip(" ,") elif last_part_lower in region_to_country: extracted_country = region_to_country[last_part_lower] temp_sitz = ", ".join(parts[:-1]).strip(" ,") sitz_land_val = extracted_country if extracted_country else "k.A." sitz_stadt_val = re.sub(r'^\d{4,8}\s*', '', temp_sitz).strip(" ,") if not sitz_stadt_val: sitz_stadt_val = "k.A." if sitz_land_val != "k.A." else re.sub(r'^\d{4,8}\s*', '', original_temp_sitz).strip(" ,") or "k.A." return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} @retry_on_failure def extract_company_data(self, url_or_page): """ Extrahiert strukturierte Unternehmensdaten aus einem Wikipedia-Artikel (URL oder page-Objekt). Gibt nun auch den gesamten Rohtext des Artikels ('full_text') und den Titel zurück. """ default_result = { 'url': 'k.A.', 'title': 'k.A.', 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.', 'full_text': '' } page = None try: if isinstance(url_or_page, str) and "wikipedia.org" in url_or_page: page_title = unquote(url_or_page.split('/wiki/')[-1].replace('_', ' ')) page = wikipedia.page(title=page_title, auto_suggest=False, redirect=True) elif not isinstance(url_or_page, str): # Annahme: es ist ein page-Objekt page = url_or_page else: self.logger.warning(f"extract_company_data: Ungültiger Input '{str(url_or_page)[:100]}...'.") return default_result self.logger.info(f"Extrahiere Daten für Wiki-Artikel: {page.title[:100]}...") # Grundlegende Daten direkt aus dem page-Objekt extrahieren first_paragraph = page.summary.split('\n')[0] if page.summary else 'k.A.' categories = ", ".join(page.categories) full_text = page.content # Für Infobox-Daten benötigen wir weiterhin BeautifulSoup, da die 'wikipedia'-Bibliothek # keinen strukturierten Zugriff darauf bietet. soup = self._get_page_soup(page.url) if not soup: self.logger.warning(f" -> Konnte Seite für Soup-Parsing nicht laden. Extrahiere nur Basis-Daten.") # Fallback, wenn Soup fehlschlägt return { 'url': page.url, 'title': page.title, 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.', 'first_paragraph': first_paragraph, 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': categories, 'full_text': full_text } # Extraktion der Infobox-Daten mit den bestehenden Helper-Funktionen branche_val = self._extract_infobox_value(soup, 'branche') umsatz_val = self._extract_infobox_value(soup, 'umsatz') mitarbeiter_val = self._extract_infobox_value(soup, 'mitarbeiter') raw_sitz_string = self._extract_infobox_value(soup, 'sitz') parsed_sitz = self._parse_sitz_string_detailed(raw_sitz_string) sitz_stadt_val = parsed_sitz['sitz_stadt'] sitz_land_val = parsed_sitz['sitz_land'] # Sammle die finalen Daten result = { 'url': page.url, 'title': page.title, 'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val, 'first_paragraph': first_paragraph, 'branche': branche_val, 'umsatz': umsatz_val, 'mitarbeiter': mitarbeiter_val, 'categories': categories, 'full_text': full_text } self.logger.info(f" -> Extrahierte Daten: Stadt='{sitz_stadt_val}', Land='{sitz_land_val}', U='{umsatz_val}', M='{mitarbeiter_val}'") return result except wikipedia.exceptions.PageError: self.logger.error(f" -> Fehler: Wikipedia-Artikel für '{str(url_or_page)[:100]}' konnte nicht gefunden werden (PageError).") return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'} except Exception as e: self.logger.error(f" -> Unerwarteter Fehler bei der Extraktion von '{str(url_or_page)[:100]}': {e}") return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'}