From 4b92fa27a006ac92a2d4edfbdecba479bab77b2a Mon Sep 17 00:00:00 2001 From: Floke Date: Thu, 26 Jun 2025 14:40:13 +0000 Subject: [PATCH] =?UTF-8?q?wikipedia=5Fscraper.py=20hinzugef=C3=BCgt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wikipedia_scraper.py | 438 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 438 insertions(+) create mode 100644 wikipedia_scraper.py diff --git a/wikipedia_scraper.py b/wikipedia_scraper.py new file mode 100644 index 00000000..fd331e86 --- /dev/null +++ b/wikipedia_scraper.py @@ -0,0 +1,438 @@ +# --- START OF FILE wikipedia_scraper.py --- + +#!/usr/bin/env python3 +""" +wikipedia_scraper.py + +Klasse zur Kapselung der Interaktionen mit Wikipedia, inklusive Suche, +Validierung und Extraktion von Unternehmensdaten. +""" + +import logging +import re +import time +import traceback +from urllib.parse import unquote + +import requests +import wikipedia +from bs4 import BeautifulSoup + +# Import der abhängigen Module +from config import Config +from helpers import (retry_on_failure, simple_normalize_url, + normalize_company_name, extract_numeric_value, + clean_text, fuzzy_similarity) + +class WikipediaScraper: + """ + Handhabt das Suchen von Wikipedia-Artikeln und das Extrahieren relevanter + Unternehmensdaten. Beinhaltet Validierungslogik fuer Artikel. + Nutzt die wikipedia-Bibliothek und Requests fuer direktes HTML-Scraping. + """ + def __init__(self, user_agent=None): + """ + Initialisiert den Scraper mit einer Requests-Session und konfigurierter + Wikipedia-Bibliothek. + """ + self.logger = logging.getLogger(__name__ + ".WikipediaScraper") + self.logger.debug("WikipediaScraper initialisiert.") + + self.user_agent = user_agent or getattr(Config, 'USER_AGENT', 'Mozilla/5.0 (compatible; UnternehmenSkript/1.0; +http://www.example.com/bot)') + self.session = requests.Session() + self.session.headers.update({'User-Agent': self.user_agent}) + self.logger.debug(f"Requests Session mit User-Agent '{self.user_agent}' initialisiert.") + + self.keywords_map = { + 'branche': ['branche', 'wirtschaftszweig', 'industry', 'taetigkeit', 'sektor', 'produkte', 'leistungen'], + 'umsatz': ['umsatz', 'erloes', 'revenue', 'jahresumsatz', 'konzernumsatz', 'ergebnis'], + 'mitarbeiter': ['mitarbeiter', 'mitarbeiterzahl', 'beschaeftigte', 'employees', 'number of employees', 'personal', 'belegschaft'], + 'sitz': ['sitz', 'hauptsitz', 'unternehmenssitz', 'firmensitz', 'headquarters', 'standort', 'sitz des unternehmens', 'anschrift', 'adresse'] + } + + try: + wiki_lang = getattr(Config, 'LANG', 'de') + wikipedia.set_lang(wiki_lang) + wikipedia.set_rate_limiting(False) + self.logger.info(f"Wikipedia library language set to '{wiki_lang}'. Rate limiting DISABLED.") + except Exception as e: + self.logger.warning(f"Fehler beim Setzen der Wikipedia-Sprache oder Rate Limiting: {e}") + + def _get_full_domain(self, website): + """Extrahiert die normalisierte Domain (ohne www, ohne Pfad) aus einer URL.""" + return simple_normalize_url(website) + + def _generate_search_terms(self, company_name, website): + """ + Generiert eine Liste von Suchbegriffen fuer die Wikipedia-Suche. + """ + if not company_name: return [] + terms = set() + original_name_cleaned = str(company_name).strip() + if original_name_cleaned: terms.add(original_name_cleaned) + + normalized_name = normalize_company_name(company_name) + if normalized_name: + terms.add(normalized_name) + name_parts = normalized_name.split() + if len(name_parts) > 0: terms.add(name_parts[0]) + if len(name_parts) > 1: terms.add(" ".join(name_parts[:2])) + + full_domain = self._get_full_domain(website) + if full_domain != "k.A.": terms.add(full_domain) + + final_terms = [term for term in list(terms) if term][:getattr(Config, 'WIKIPEDIA_SEARCH_RESULTS', 5)] + self.logger.debug(f"Generierte Suchbegriffe fuer '{company_name[:100]}...': {final_terms}") + return final_terms + + @retry_on_failure + def _get_page_soup(self, url): + """ + Holt HTML von einer URL und gibt ein BeautifulSoup-Objekt zurueck. + """ + if not url or not isinstance(url, str) or not url.lower().startswith(("http://", "https://")): + self.logger.warning(f"_get_page_soup: Ungueltige URL '{url[:100]}...'.") + return None + try: + self.logger.debug(f"_get_page_soup: Rufe URL ab: {url[:100]}...") + response = self.session.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15)) + response.raise_for_status() + response.encoding = response.apparent_encoding + soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser')) + return soup + except Exception as e: + self.logger.error(f"_get_page_soup: Fehler beim Abrufen oder Parsen von HTML von {url[:100]}...: {e}") + raise e + + def _validate_article(self, page, company_name, website): + """ + Validiert, ob ein Wikipedia-Artikel zum Unternehmen passt. + """ + if not page or not company_name: return False + self.logger.debug(f"Validiere Artikel '{page.title[:100]}...' fuer Firma '{company_name[:100]}'") + + normalized_company = normalize_company_name(company_name) + normalized_title = normalize_company_name(page.title) + if not normalized_company or not normalized_title: + self.logger.warning("Validierung nicht moeglich, da Normalisierung eines Namens fehlschlug.") + return False + + standard_threshold = getattr(Config, 'SIMILARITY_THRESHOLD', 0.65) + similarity = fuzzy_similarity(normalized_title, normalized_company) + + company_tokens = normalized_company.split() + title_tokens = normalized_title.split() + first_word_match = False + first_two_words_match = False + if company_tokens and title_tokens and company_tokens[0] == title_tokens[0]: + first_word_match = True + if len(company_tokens) > 1 and len(title_tokens) > 1 and company_tokens[1] == title_tokens[1]: + first_two_words_match = True + + domain_found = False + full_domain = self._get_full_domain(website) + if full_domain != "k.A.": + try: + article_html = page.html() + if article_html: + soup = BeautifulSoup(article_html, getattr(Config, 'HTML_PARSER', 'html.parser')) + external_links = soup.select('a[href^="http"]') + for link_tag in external_links: + href = link_tag.get('href', '') + if href and isinstance(href, str) and full_domain in simple_normalize_url(href): + if not any(ex in href.lower() for ex in ['wikipedia.org', 'wikimedia.org', 'wikidata.org', 'archive.org', 'webcitation.org']): + domain_found = True + break + except KeyError as e_key: + if 'extlinks' in str(e_key).lower(): + self.logger.warning(f"KeyError ('{e_key}') bei Domain-Check für Artikel '{page.title[:100]}...'. Domain-Validierung übersprungen.") + else: + self.logger.error(f"Unerwarteter KeyError bei Domain-Prüfung für '{page.title[:100]}...': {e_key}") + except Exception as e_link_check: + self.logger.error(f"Allgemeiner Fehler waehrend der Domain-Link-Pruefung fuer '{page.title[:100]}...': {e_link_check}") + + is_valid = False + reason = "Keine Validierungsregel traf zu" + if similarity >= standard_threshold: + is_valid = True; reason = f"Gesamt-Aehnlichkeit ({similarity:.2f}) >= Schwelle ({standard_threshold:.2f})" + elif domain_found and first_two_words_match: + is_valid = True; reason = f"Domain gefunden UND erste 2 Worte stimmen ueberein" + elif domain_found and first_word_match and similarity >= 0.40: + is_valid = True; reason = f"Domain gefunden UND erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.40" + elif first_two_words_match and similarity >= 0.45: + is_valid = True; reason = f"Erste zwei Worte stimmen ueberein UND Aehnlichkeit >= 0.45" + elif domain_found and similarity >= 0.50: + is_valid = True; reason = f"Domain gefunden UND Aehnlichkeit >= 0.50" + elif first_word_match and similarity >= 0.55: + is_valid = True; reason = f"Erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.55" + + log_level = logging.INFO if is_valid else logging.DEBUG + self.logger.log(log_level, f" => Artikel '{page.title[:100]}...' {'VALIDIERT' if is_valid else 'NICHT validiert'} (Grund: {reason})") + return is_valid + + def search_company_article(self, company_name, website=None, max_recursion_depth=1): + """ + Sucht einen passenden Wikipedia-Artikel fuer das Unternehmen und gibt das + wikipedia.WikipediaPage Objekt zurueck, wenn ein relevanter und validierter + Artikel gefunden wird. Behandelt explizit Begriffsklaerungsseiten. + """ + if not company_name or str(company_name).strip() == "": + self.logger.warning("Wikipedia search skipped: No company name provided.") + raise ValueError("Kein Firmenname fuer Wikipedia Suche angegeben.") + + search_terms = self._generate_search_terms(company_name, website) + if not search_terms: + self.logger.warning(f"Keine Suchbegriffe fuer '{company_name[:100]}...' generiert.") + return None + + self.logger.info(f"Starte Wikipedia-Suche fuer '{company_name[:100]}...' mit Begriffen: {search_terms}") + + processed_titles = set() + original_search_name_norm = normalize_company_name(company_name) + + def check_page_recursive(title_to_check, current_depth): + if title_to_check in processed_titles or current_depth > max_recursion_depth: + return None + + processed_titles.add(title_to_check) + self.logger.debug(f" -> Pruefe potenziellen Artikel: '{title_to_check[:100]}...' (Tiefe: {current_depth})") + + normalized_option_title_local = normalize_company_name(title_to_check) + title_similarity_to_original = fuzzy_similarity(normalized_option_title_local, original_search_name_norm) + + if current_depth > 0 and title_similarity_to_original < 0.3: + self.logger.debug(f" -> Option '{title_to_check[:100]}' hat zu geringe Ähnlichkeit ({title_similarity_to_original:.2f}). Übersprungen.") + return None + + page = None + try: + page = wikipedia.page(title_to_check, auto_suggest=False, preload=False, redirect=True) + if self._validate_article(page, company_name, website): + self.logger.info(f" -> Titel '{page.title[:100]}...' erfolgreich validiert!") + return page + else: + return None + except wikipedia.exceptions.DisambiguationError as e_disamb: + self.logger.info(f" -> Begriffsklaerung '{e_disamb.title}' gefunden (Tiefe {current_depth}). Pruefe Optionen...") + if current_depth >= max_recursion_depth: return None + + relevant_options = [] + for option in e_disamb.options: + option_lower = option.lower() + if not any(ex in option_lower for ex in ["(person)", "(familienname)"]) and len(option) < 80: + if fuzzy_similarity(normalize_company_name(option), original_search_name_norm) > 0.3: + relevant_options.append(option) + + for option_to_check in relevant_options[:3]: + validated_page = check_page_recursive(option_to_check, current_depth + 1) + if validated_page: return validated_page + return None + except Exception as e_page: + title_for_log = page.title[:100] if page and hasattr(page, 'title') and page.title else title_to_check[:100] + if "extlinks" in str(e_page).lower(): + self.logger.warning(f" -> KeyError ('extlinks') beim Verarbeiten von Titel '{title_for_log}...'. Übersprungen.") + else: + self.logger.error(f" -> Unerwarteter Fehler bei Verarbeitung von Seite '{title_for_log}': {e_page}") + return None + + # Hauptlogik der Suche + self.logger.debug(f" -> Versuche direkten Match fuer '{company_name[:100]}...'") + page_found = check_page_recursive(company_name, 0) + if page_found: return page_found + + self.logger.debug(f" -> Kein direkter Treffer. Starte Suche mit generierten Begriffen...") + for term in search_terms: + if term == company_name: continue + self.logger.debug(f" -> Versuche Suchbegriff: '{term[:100]}...'") + page_found = check_page_recursive(term, 0) + if page_found: return page_found + + self.logger.warning(f"Kein passender & validierter Wikipedia-Artikel fuer '{company_name[:100]}...' gefunden.") + return None + + def _extract_first_paragraph_from_soup(self, soup): + """ + Extrahiert den ersten aussagekraeftigen Absatz aus dem Soup-Objekt eines Wikipedia-Artikels. + """ + if not soup: return "k.A." + paragraph_text = "k.A." + try: + content_div = soup.find('div', class_='mw-parser-output') + search_area = content_div if content_div else soup + paragraphs = search_area.find_all('p', recursive=False) + if not paragraphs: paragraphs = search_area.find_all('p') + + for p in paragraphs: + for sup in p.find_all('sup', class_='reference'): sup.decompose() + for span in p.find_all('span', style=lambda v: v and 'display:none' in v): span.decompose() + for span in p.find_all('span', id='coordinates'): span.decompose() + text = clean_text(p.get_text(separator=' ', strip=True)) + if text != "k.A." and len(text) > 50 and not re.match(r'^(Datei:|Abbildung:|Siehe auch:|Einzelnachweise|Siehe auch|Literatur)', text, re.IGNORECASE): + paragraph_text = text[:1500] + break + except Exception as e: + self.logger.error(f"Fehler beim Extrahieren des ersten Absatzes: {e}") + return paragraph_text + + def extract_categories(self, soup): + """ + Extrahiert Wikipedia-Kategorien aus dem Soup-Objekt. + """ + if not soup: return "k.A." + cats_filtered = [] + try: + cat_div = soup.find('div', id="mw-normal-catlinks") + if cat_div: + ul = cat_div.find('ul') + if ul: + cats = [clean_text(li.get_text()) for li in ul.find_all('li')] + cats_filtered = [c for c in cats if c and isinstance(c, str) and c.strip() and "kategorien:" not in c.lower()] + except Exception as e: + self.logger.error(f"Fehler beim Extrahieren der Kategorien: {e}") + return ", ".join(cats_filtered) if cats_filtered else "k.A." + + def _extract_infobox_value(self, soup, target): + """ + Extrahiert gezielt Werte (Branche, Umsatz, etc.) aus der Infobox. + """ + if not soup or target not in self.keywords_map: + return "k.A." + keywords = self.keywords_map[target] + infobox = soup.select_one('table[class*="infobox"]') + if not infobox: return "k.A." + + value_found = "k.A." + try: + rows = infobox.find_all('tr') + for row in rows: + cells = row.find_all(['th', 'td'], recursive=False) + header_text, value_cell = None, None + + if len(cells) >= 2: + if cells[0].name == 'th': + header_text, value_cell = cells[0].get_text(strip=True), cells[1] + elif cells[0].name == 'td' and cells[1].name == 'td': + style = cells[0].get('style', '').lower() + is_header_like = 'font-weight' in style and ('bold' in style or '700' in style) or cells[0].find(['b', 'strong'], recursive=False) + if is_header_like: + header_text, value_cell = cells[0].get_text(strip=True), cells[1] + + if header_text and value_cell: + if any(kw in header_text.lower() for kw in keywords): + for sup in value_cell.find_all(['sup', 'span']): + sup.decompose() + + raw_value_text = value_cell.get_text(separator=' ', strip=True) + + if target == 'branche' or target == 'sitz': + value_found = clean_text(raw_value_text).split('\n')[0].strip() + elif target == 'umsatz': + value_found = extract_numeric_value(raw_value_text, is_umsatz=True) + elif target == 'mitarbeiter': + value_found = extract_numeric_value(raw_value_text, is_umsatz=False) + + value_found = value_found if value_found else "k.A." + self.logger.info(f" --> Infobox '{target}' gefunden: '{value_found}'") + break + except Exception as e: + self.logger.exception(f"Fehler beim Durchlaufen der Infobox-Zeilen fuer '{target}': {e}") + return "k.A. (Fehler Extraktion)" + + return value_found + + def _parse_sitz_string_detailed(self, raw_sitz_string_input): + """ + Versucht, aus einem rohen Sitz-String Stadt und Land detailliert zu extrahieren. + """ + sitz_stadt_val, sitz_land_val = "k.A.", "k.A." + if not raw_sitz_string_input or not isinstance(raw_sitz_string_input, str): + return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} + + temp_sitz = raw_sitz_string_input.strip() + if not temp_sitz or temp_sitz.lower() == "k.a.": + return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} + + # Diese Mappings könnten in die Config ausgelagert werden + known_countries_detailed = { + "deutschland": "Deutschland", "germany": "Deutschland", "de": "Deutschland", + "österreich": "Österreich", "austria": "Österreich", "at": "Österreich", + "schweiz": "Schweiz", "switzerland": "Schweiz", "ch": "Schweiz", "suisse": "Schweiz", + "usa": "USA", "u.s.": "USA", "united states": "USA", "vereinigte staaten": "USA", + "vereinigtes königreich": "Vereinigtes Königreich", "united kingdom": "Vereinigtes Königreich", "uk": "Vereinigtes Königreich", + } + region_to_country = { + "nrw": "Deutschland", "nordrhein-westfalen": "Deutschland", "bayern": "Deutschland", "hessen": "Deutschland", + "zg": "Schweiz", "zug": "Schweiz", "zh": "Schweiz", "zürich": "Schweiz", + "ca": "USA", "california": "USA", "ny": "USA", "new york": "USA", + } + + extracted_country = "" + original_temp_sitz = temp_sitz + + klammer_match = re.search(r'\(([^)]+)\)$', temp_sitz) + if klammer_match: + suffix_in_klammer = klammer_match.group(1).strip().lower() + if suffix_in_klammer in known_countries_detailed: + extracted_country = known_countries_detailed[suffix_in_klammer] + temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") + elif suffix_in_klammer in region_to_country: + extracted_country = region_to_country[suffix_in_klammer] + temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,") + + if not extracted_country and ',' in temp_sitz: + parts = [p.strip() for p in temp_sitz.split(',')] + if len(parts) > 1: + last_part_lower = parts[-1].lower() + if last_part_lower in known_countries_detailed: + extracted_country = known_countries_detailed[last_part_lower] + temp_sitz = ", ".join(parts[:-1]).strip(" ,") + elif last_part_lower in region_to_country: + extracted_country = region_to_country[last_part_lower] + temp_sitz = ", ".join(parts[:-1]).strip(" ,") + + sitz_land_val = extracted_country if extracted_country else "k.A." + sitz_stadt_val = re.sub(r'^\d{4,8}\s*', '', temp_sitz).strip(" ,") + + if not sitz_stadt_val: + sitz_stadt_val = "k.A." if sitz_land_val != "k.A." else re.sub(r'^\d{4,8}\s*', '', original_temp_sitz).strip(" ,") or "k.A." + + return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val} + + @retry_on_failure + def extract_company_data(self, page_url): + """ + Extrahiert Firmendaten von einer gegebenen Wikipedia-Artikel-URL. + """ + default_result = {'url': page_url or 'k.A.', 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.'} + if not page_url or not isinstance(page_url, str) or "wikipedia.org/wiki/" not in page_url.lower(): + self.logger.warning(f"extract_company_data: Ungueltige URL '{page_url[:100]}...'.") + return default_result + + self.logger.info(f"Extrahiere Daten fuer Wiki-URL: {page_url[:100]}...") + soup = self._get_page_soup(page_url) + if not soup: + self.logger.error(f" -> Fehler: Konnte Seite {page_url[:100]}... nicht laden oder parsen.") + return default_result + + first_paragraph = self._extract_first_paragraph_from_soup(soup) + categories_val = self.extract_categories(soup) + branche_val = self._extract_infobox_value(soup, 'branche') + umsatz_val = self._extract_infobox_value(soup, 'umsatz') + mitarbeiter_val = self._extract_infobox_value(soup, 'mitarbeiter') + raw_sitz_string = self._extract_infobox_value(soup, 'sitz') + parsed_sitz = self._parse_sitz_string_detailed(raw_sitz_string) + sitz_stadt_val = parsed_sitz['sitz_stadt'] + sitz_land_val = parsed_sitz['sitz_land'] + + result = { + 'url': page_url, + 'sitz_stadt': sitz_stadt_val, + 'sitz_land': sitz_land_val, + 'first_paragraph': first_paragraph, + 'branche': branche_val, + 'umsatz': umsatz_val, + 'mitarbeiter': mitarbeiter_val, + 'categories': categories_val + } + self.logger.info(f" -> Extrahierte Daten: Stadt='{sitz_stadt_val}', Land='{sitz_land_val}', U='{umsatz_val}', M='{mitarbeiter_val}'") + return result