# --- START OF FILE wikipedia_scraper.py --- #!/usr/bin/env python3 """ wikipedia_scraper.py Klasse zur Kapselung der Interaktionen mit Wikipedia, inklusive Suche, Validierung und Extraktion von Unternehmensdaten. """ import logging import re import time import traceback from urllib.parse import unquote import requests import wikipedia from bs4 import BeautifulSoup # Import der abhängigen Module from config import Config from helpers import (retry_on_failure, simple_normalize_url, normalize_company_name, extract_numeric_value, clean_text, fuzzy_similarity) class WikipediaScraper: """ Handhabt das Suchen von Wikipedia-Artikeln und das Extrahieren relevanter Unternehmensdaten. Beinhaltet Validierungslogik fuer Artikel. Nutzt die wikipedia-Bibliothek und Requests fuer direktes HTML-Scraping. """ def __init__(self, user_agent=None): """ Initialisiert den Scraper mit einer Requests-Session und konfigurierter Wikipedia-Bibliothek. """ self.logger = logging.getLogger(__name__ + ".WikipediaScraper") self.logger.debug("WikipediaScraper initialisiert.") self.user_agent = user_agent or getattr(Config, 'USER_AGENT', 'Mozilla/5.0 (compatible; UnternehmenSkript/1.0; +http://www.example.com/bot)') self.session = requests.Session() self.session.headers.update({'User-Agent': self.user_agent}) self.logger.debug(f"Requests Session mit User-Agent '{self.user_agent}' initialisiert.") self.keywords_map = { 'branche': ['branche', 'wirtschaftszweig', 'industry', 'taetigkeit', 'sektor', 'produkte', 'leistungen'], 'umsatz': ['umsatz', 'erloes', 'revenue', 'jahresumsatz', 'konzernumsatz', 'ergebnis'], 'mitarbeiter': ['mitarbeiter', 'mitarbeiterzahl', 'beschaeftigte', 'employees', 'number of employees', 'personal', 'belegschaft'], 'sitz': ['sitz', 'hauptsitz', 'unternehmenssitz', 'firmensitz', 'headquarters', 'standort', 'sitz des unternehmens', 'anschrift', 'adresse'] } try: wiki_lang = getattr(Config, 'LANG', 'de') wikipedia.set_lang(wiki_lang) wikipedia.set_rate_limiting(False) self.logger.info(f"Wikipedia library language set to '{wiki_lang}'. Rate limiting DISABLED.") except Exception as e: self.logger.warning(f"Fehler beim Setzen der Wikipedia-Sprache oder Rate Limiting: {e}") def _get_full_domain(self, website): """Extrahiert die normalisierte Domain (ohne www, ohne Pfad) aus einer URL.""" return simple_normalize_url(website) def _generate_search_terms(self, company_name, website=None): """ Generiert eine Liste von potenziellen Wikipedia-Artikeltiteln. v2.0: Mit verbesserter Logik für Namen, die Zahlen enthalten. """ if not company_name: return [] # Basis-Normalisierung normalized = normalize_company_name(company_name) # NEUE LOGIK: Speziell für Namen wie "11 88 0 Solutions" # Fügt eine Version hinzu, bei der Leerzeichen zwischen Zahlen entfernt werden. if re.search(r'\d[\s\d]+\d', normalized): condensed_normalized = re.sub(r'(\d)\s+(\d)', r'\1\2', normalized) # Führe eine erneute, aggressivere Normalisierung durch, um Reste zu entfernen condensed_normalized = normalize_company_name(condensed_normalized) else: condensed_normalized = None search_terms = [] # Füge die kondensierte Version mit höchster Priorität hinzu, falls sie existiert if condensed_normalized and condensed_normalized not in search_terms: search_terms.append(condensed_normalized) # Füge den Originalnamen und die normalisierte Version hinzu if company_name not in search_terms: search_terms.append(company_name) if normalized not in search_terms: search_terms.append(normalized) # Füge Teile des Namens hinzu parts = normalized.split() if len(parts) > 1: if parts[0] not in search_terms: search_terms.append(parts[0]) first_two = " ".join(parts[:2]) if first_two not in search_terms: search_terms.append(first_two) # Füge die Website-Domain als Suchbegriff hinzu if website: domain = simple_normalize_url(website) if domain != "k.A." and domain not in search_terms: search_terms.append(domain) # Entferne Duplikate und behalte die Reihenfolge bei unique_terms = [] for term in search_terms: if term and term not in unique_terms: unique_terms.append(term) # Limitiere auf maximal 5 Suchbegriffe, um API-Calls zu sparen return unique_terms[:5] @retry_on_failure def _get_page_soup(self, url): """ Holt HTML von einer URL und gibt ein BeautifulSoup-Objekt zurueck. """ if not url or not isinstance(url, str) or not url.lower().startswith(("http://", "https://")): self.logger.warning(f"_get_page_soup: Ungueltige URL '{url[:100]}...'.") return None try: self.logger.debug(f"_get_page_soup: Rufe URL ab: {url[:100]}...") response = self.session.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15)) response.raise_for_status() response.encoding = response.apparent_encoding soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser')) return soup except Exception as e: self.logger.error(f"_get_page_soup: Fehler beim Abrufen oder Parsen von HTML von {url[:100]}...: {e}") raise e def _validate_article(self, page, company_name, website): """ Validiert, ob ein Wikipedia-Artikel zum Unternehmen passt. """ if not page or not company_name: return False self.logger.debug(f"Validiere Artikel '{page.title[:100]}...' fuer Firma '{company_name[:100]}'") normalized_company = normalize_company_name(company_name) normalized_title = normalize_company_name(page.title) if not normalized_company or not normalized_title: self.logger.warning("Validierung nicht moeglich, da Normalisierung eines Namens fehlschlug.") return False standard_threshold = getattr(Config, 'SIMILARITY_THRESHOLD', 0.65) similarity = fuzzy_similarity(normalized_title, normalized_company) company_tokens = normalized_company.split() title_tokens = normalized_title.split() first_word_match = False first_two_words_match = False if company_tokens and title_tokens and company_tokens[0] == title_tokens[0]: first_word_match = True if len(company_tokens) > 1 and len(title_tokens) > 1 and company_tokens[1] == title_tokens[1]: first_two_words_match = True domain_found = False full_domain = self._get_full_domain(website) if full_domain != "k.A.": try: article_html = page.html() if article_html: soup = BeautifulSoup(article_html, getattr(Config, 'HTML_PARSER', 'html.parser')) external_links = soup.select('a[href^="http"]') for link_tag in external_links: href = link_tag.get('href', '') if href and isinstance(href, str) and full_domain in simple_normalize_url(href): if not any(ex in href.lower() for ex in ['wikipedia.org', 'wikimedia.org', 'wikidata.org', 'archive.org', 'webcitation.org']): domain_found = True break except KeyError as e_key: if 'extlinks' in str(e_key).lower(): self.logger.warning(f"KeyError ('{e_key}') bei Domain-Check für Artikel '{page.title[:100]}...'. Domain-Validierung übersprungen.") else: self.logger.error(f"Unerwarteter KeyError bei Domain-Prüfung für '{page.title[:100]}...': {e_key}") except Exception as e_link_check: self.logger.error(f"Allgemeiner Fehler waehrend der Domain-Link-Pruefung fuer '{page.title[:100]}...': {e_link_check}") is_valid = False reason = "" # NEU: Detaillierte Debug-Ausgaben für jeden Schritt self.logger.debug(f" Validierungs-Check für '{page.title[:50]}...':") self.logger.debug(f" - Aehnlichkeit: {similarity:.2f} (Schwelle: {standard_threshold:.2f})") self.logger.debug(f" - Domain '{full_domain}' im Artikel gefunden: {domain_found}") self.logger.debug(f" - Erstes Wort identisch: {first_word_match}") self.logger.debug(f" - Erste 2 Worte identisch: {first_two_words_match}") if similarity >= standard_threshold: is_valid, reason = True, f"Gesamt-Aehnlichkeit ({similarity:.2f}) >= Schwelle ({standard_threshold:.2f})" elif domain_found and first_two_words_match: is_valid, reason = True, f"Domain gefunden UND erste 2 Worte stimmen ueberein" elif domain_found and first_word_match and similarity >= 0.40: is_valid, reason = True, f"Domain gefunden UND erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.40" elif first_two_words_match and similarity >= 0.45: is_valid, reason = True, f"Erste zwei Worte stimmen ueberein UND Aehnlichkeit >= 0.45" elif domain_found and similarity >= 0.50: is_valid, reason = True, f"Domain gefunden UND Aehnlichkeit >= 0.50" elif first_word_match and similarity >= 0.55: is_valid, reason = True, f"Erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.55" else: reason = "Keine Validierungsregel traf zu" log_level = logging.INFO if is_valid else logging.DEBUG self.logger.log(log_level, f" => Artikel '{page.title[:100]}...' {'VALIDIERT' if is_valid else 'NICHT validiert'} (Grund: {reason})") return is_valid def search_company_article(self, company_name, website=None, parent_name=None, max_recursion_depth=1): """ Sucht einen passenden Wikipedia-Artikel für ein Unternehmen durch eine lineare Überprüfung generierter Suchbegriffe. Diese Methode ist einfacher und robuster als die alte, rekursive Implementierung. """ if not company_name: self.logger.warning("Wikipedia-Suche übersprungen: Kein Firmenname angegeben.") return None search_terms = self._generate_search_terms(company_name, website) self.logger.info(f"Starte Wikipedia-Suche für '{company_name[:50]}...' mit Begriffen: {search_terms}") for term in search_terms: self.logger.debug(f" -> Prüfe Suchbegriff: '{term}'") try: # Nutze die Suchfunktion von Wikipedia, die besser mit uneindeutigen Begriffen umgeht # und oft direkt die richtige Seite vorschlägt. suggested_titles = wikipedia.search(term, results=3) if not suggested_titles: continue for title in suggested_titles: self.logger.debug(f" -> Potenzieller Artikel gefunden: '{title}'") try: page = wikipedia.page(title, auto_suggest=False, redirect=True) # Validiere den gefundenen Artikel mit dem vollen Kontext if self._validate_article(page, company_name, website, parent_name): self.logger.info(f" -> ERFOLG: Artikel '{page.title}' wurde für '{company_name}' validiert.") return page else: # Artikel ist nicht relevant, fahre mit dem nächsten Vorschlag fort continue except wikipedia.exceptions.DisambiguationError as e: self.logger.debug(f" -> Begriffsklärungsseite '{title}' gefunden. Prüfe Optionen...") # Prüfe die ersten 3 Optionen der Begriffsklärungsseite for option in e.options[:3]: try: page = wikipedia.page(option, auto_suggest=False, redirect=True) if self._validate_article(page, company_name, website, parent_name): self.logger.info(f" -> ERFOLG: Artikel '{page.title}' aus Begriffsklärung validiert.") return page except Exception: continue # Ignoriere Fehler bei einzelnen Optionen except Exception: # Ignoriere Fehler beim Laden einer einzelnen Seite und mache weiter continue except Exception as e_search: self.logger.error(f" -> Unerwarteter Fehler bei der Suche mit Begriff '{term}': {e}") continue # Mache mit dem nächsten Suchbegriff weiter self.logger.warning(f"Kein passender & validierter Wikipedia-Artikel für '{company_name[:50]}...' gefunden.") return None def _extract_first_paragraph_from_soup(self, soup): """ Extrahiert den ersten aussagekraeftigen Absatz aus dem Soup-Objekt eines Wikipedia-Artikels. """ if not soup: return "k.A." paragraph_text = "k.A." try: content_div = soup.find('div', class_='mw-parser-output') search_area = content_div if content_div else soup paragraphs = search_area.find_all('p', recursive=False) if not paragraphs: paragraphs = search_area.find_all('p') for p in paragraphs: for sup in p.find_all('sup', class_='reference'): sup.decompose() for span in p.find_all('span', style=lambda v: v and 'display:none' in v): span.decompose() for span in p.find_all('span', id='coordinates'): span.decompose() text = clean_text(p.get_text(separator=' ', strip=True)) if text != "k.A." and len(text) > 50 and not re.match(r'^(Datei:|Abbildung:|Siehe auch:|Einzelnachweise|Siehe auch|Literatur)', text, re.IGNORECASE): paragraph_text = text[:1500] break except Exception as e: self.logger.error(f"Fehler beim Extrahieren des ersten Absatzes: {e}") return paragraph_text def extract_categories(self, soup): """ Extrahiert Wikipedia-Kategorien aus dem Soup-Objekt. """ if not soup: return "k.A." cats_filtered = [] try: cat_div = soup.find('div', id="mw-normal-catlinks") if cat_div: ul = cat_div.find('ul') if ul: cats = [clean_text(li.get_text()) for li in ul.find_all('li')] cats_filtered = [c for c in cats if c and isinstance(c, str) and c.strip() and "kategorien:" not in c.lower()] except Exception as e: self.logger.error(f"Fehler beim Extrahieren der Kategorien: {e}") return ", ".join(cats_filtered) if cats_filtered else "k.A." def _extract_infobox_value(self, soup, target): """ Extrahiert gezielt Werte (Branche, Umsatz, etc.) aus der Infobox. """ if not soup or target not in self.keywords_map: return "k.A." keywords = self.keywords_map[target] infobox = soup.select_one('table[class*="infobox"]') if not infobox: return "k.A." value_found = "k.A." try: rows = infobox.find_all('tr') for row in rows: cells = row.find_all(['th', 'td'], recursive=False) header_text, value_cell = None, None if len(cells) >= 2: if cells[0].name == 'th': header_text, value_cell = cells[0].get_text(strip=True), cells[1] elif cells[0].name == 'td' and cells[1].name == 'td': style = cells[0].get('style', '').lower() is_header_like = 'font-weight' in style and ('bold' in style or '700' in style) or cells[0].find(['b', 'strong'], recursive=False) if is_header_like: header_text, value_cell = cells[0].get_text(strip=True), cells[1] if header_text and value_cell: if any(kw in header_text.lower() for kw in keywords): for sup in value_cell.find_all(['sup', 'span']): sup.decompose() raw_value_text = value_cell.get_text(separator=' ', strip=True) if target == 'branche' or target == 'sitz': value_found = clean_text(raw_value_text).split('\n')[0].strip() elif target == 'umsatz': value_found = extract_numeric_value(raw_value_text, is_umsatz=True) elif target == 'mitarbeiter': value_found = extract_numeric_value(raw_value_text, is_umsatz=False) value_found = value_found if value_found else "k.A." self.logger.info(f" --> Infobox '{target}' gefunden: '{value_found}'") break except Exception as e: self.logger.exception(f"Fehler beim Durchlaufen der Infobox-Zeilen fuer '{target}': {e}") return "k.A. (Fehler Extraktion)" return value_found def _parse_sitz_string_detailed(self, raw_sitz_string_input): """ Versucht, aus einem rohen Sitz-String Stadt und Land detailliert zu extrahieren. """ sitz_stadt_val, sitz_land_val = "k.A.", "k.A." if not raw_sitz_string_input or not isinstance(raw_sitz_string_input, str): return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} temp_sitz = raw_sitz_string_input.strip() if not temp_sitz or temp_sitz.lower() == "k.a.": return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} # Diese Mappings könnten in die Config ausgelagert werden known_countries_detailed = { "deutschland": "Deutschland", "germany": "Deutschland", "de": "Deutschland", "österreich": "Österreich", "austria": "Österreich", "at": "Österreich", "schweiz": "Schweiz", "switzerland": "Schweiz", "ch": "Schweiz", "suisse": "Schweiz", "usa": "USA", "u.s.": "USA", "united states": "USA", "vereinigte staaten": "USA", "vereinigtes königreich": "Vereinigtes Königreich", "united kingdom": "Vereinigtes Königreich", "uk": "Vereinigtes Königreich", } region_to_country = { "nrw": "Deutschland", "nordrhein-westfalen": "Deutschland", "bayern": "Deutschland", "hessen": "Deutschland", "zg": "Schweiz", "zug": "Schweiz", "zh": "Schweiz", "zürich": "Schweiz", "ca": "USA", "california": "USA", "ny": "USA", "new york": "USA", } extracted_country = "" original_temp_sitz = temp_sitz klammer_match = re.search(r'\(([^)]+)\)$', temp_sitz) if klammer_match: suffix_in_klammer = klammer_match.group(1).strip().lower() if suffix_in_klammer in known_countries_detailed: extracted_country = known_countries_detailed[suffix_in_klammer] temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") elif suffix_in_klammer in region_to_country: extracted_country = region_to_country[suffix_in_klammer] temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") if not extracted_country and ',' in temp_sitz: parts = [p.strip() for p in temp_sitz.split(',')] if len(parts) > 1: last_part_lower = parts[-1].lower() if last_part_lower in known_countries_detailed: extracted_country = known_countries_detailed[last_part_lower] temp_sitz = ", ".join(parts[:-1]).strip(" ,") elif last_part_lower in region_to_country: extracted_country = region_to_country[last_part_lower] temp_sitz = ", ".join(parts[:-1]).strip(" ,") sitz_land_val = extracted_country if extracted_country else "k.A." sitz_stadt_val = re.sub(r'^\d{4,8}\s*', '', temp_sitz).strip(" ,") if not sitz_stadt_val: sitz_stadt_val = "k.A." if sitz_land_val != "k.A." else re.sub(r'^\d{4,8}\s*', '', original_temp_sitz).strip(" ,") or "k.A." return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} @retry_on_failure def extract_company_data(self, url_or_page): """ Extrahiert strukturierte Unternehmensdaten aus einem Wikipedia-Artikel (URL oder page-Objekt). Gibt nun auch den gesamten Rohtext des Artikels ('full_text') und den Titel zurück. """ default_result = { 'url': 'k.A.', 'title': 'k.A.', 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.', 'full_text': '' } page = None try: if isinstance(url_or_page, str) and "wikipedia.org" in url_or_page: page_title = unquote(url_or_page.split('/wiki/')[-1].replace('_', ' ')) page = wikipedia.page(title=page_title, auto_suggest=False, redirect=True) elif not isinstance(url_or_page, str): # Annahme: es ist ein page-Objekt page = url_or_page else: self.logger.warning(f"extract_company_data: Ungültiger Input '{str(url_or_page)[:100]}...'.") return default_result self.logger.info(f"Extrahiere Daten für Wiki-Artikel: {page.title[:100]}...") # Grundlegende Daten direkt aus dem page-Objekt extrahieren first_paragraph = page.summary.split('\n')[0] if page.summary else 'k.A.' categories = ", ".join(page.categories) full_text = page.content # Für Infobox-Daten benötigen wir weiterhin BeautifulSoup, da die 'wikipedia'-Bibliothek # keinen strukturierten Zugriff darauf bietet. soup = self._get_page_soup(page.url) if not soup: self.logger.warning(f" -> Konnte Seite für Soup-Parsing nicht laden. Extrahiere nur Basis-Daten.") # Fallback, wenn Soup fehlschlägt return { 'url': page.url, 'title': page.title, 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.', 'first_paragraph': first_paragraph, 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': categories, 'full_text': full_text } # Extraktion der Infobox-Daten mit den bestehenden Helper-Funktionen branche_val = self._extract_infobox_value(soup, 'branche') umsatz_val = self._extract_infobox_value(soup, 'umsatz') mitarbeiter_val = self._extract_infobox_value(soup, 'mitarbeiter') raw_sitz_string = self._extract_infobox_value(soup, 'sitz') parsed_sitz = self._parse_sitz_string_detailed(raw_sitz_string) sitz_stadt_val = parsed_sitz['sitz_stadt'] sitz_land_val = parsed_sitz['sitz_land'] # Sammle die finalen Daten result = { 'url': page.url, 'title': page.title, 'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val, 'first_paragraph': first_paragraph, 'branche': branche_val, 'umsatz': umsatz_val, 'mitarbeiter': mitarbeiter_val, 'categories': categories, 'full_text': full_text } self.logger.info(f" -> Extrahierte Daten: Stadt='{sitz_stadt_val}', Land='{sitz_land_val}', U='{umsatz_val}', M='{mitarbeiter_val}'") return result except wikipedia.exceptions.PageError: self.logger.error(f" -> Fehler: Wikipedia-Artikel für '{str(url_or_page)[:100]}' konnte nicht gefunden werden (PageError).") return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'} except Exception as e: self.logger.error(f" -> Unerwarteter Fehler bei der Extraktion von '{str(url_or_page)[:100]}': {e}") return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'}