Files
Brancheneinstufung2/wikipedia_scraper.py

521 lines
26 KiB
Python

# --- START OF FILE wikipedia_scraper.py ---
#!/usr/bin/env python3
"""
wikipedia_scraper.py
Klasse zur Kapselung der Interaktionen mit Wikipedia, inklusive Suche,
Validierung und Extraktion von Unternehmensdaten.
"""
import logging
import re
import time
import traceback
from urllib.parse import unquote
import requests
import wikipedia
from bs4 import BeautifulSoup
# Import der abhängigen Module
from config import Config
from helpers import (retry_on_failure, simple_normalize_url,
normalize_company_name, extract_numeric_value,
clean_text, fuzzy_similarity)
class WikipediaScraper:
"""
Handhabt das Suchen von Wikipedia-Artikeln und das Extrahieren relevanter
Unternehmensdaten. Beinhaltet Validierungslogik fuer Artikel.
Nutzt die wikipedia-Bibliothek und Requests fuer direktes HTML-Scraping.
"""
def __init__(self, user_agent=None):
"""
Initialisiert den Scraper mit einer Requests-Session und konfigurierter
Wikipedia-Bibliothek.
"""
self.logger = logging.getLogger(__name__ + ".WikipediaScraper")
self.logger.debug("WikipediaScraper initialisiert.")
self.user_agent = user_agent or getattr(Config, 'USER_AGENT', 'Mozilla/5.0 (compatible; UnternehmenSkript/1.0; +http://www.example.com/bot)')
self.session = requests.Session()
self.session.headers.update({'User-Agent': self.user_agent})
self.logger.debug(f"Requests Session mit User-Agent '{self.user_agent}' initialisiert.")
self.keywords_map = {
'branche': ['branche', 'wirtschaftszweig', 'industry', 'taetigkeit', 'sektor', 'produkte', 'leistungen'],
'umsatz': ['umsatz', 'erloes', 'revenue', 'jahresumsatz', 'konzernumsatz', 'ergebnis'],
'mitarbeiter': ['mitarbeiter', 'mitarbeiterzahl', 'beschaeftigte', 'employees', 'number of employees', 'personal', 'belegschaft'],
'sitz': ['sitz', 'hauptsitz', 'unternehmenssitz', 'firmensitz', 'headquarters', 'standort', 'sitz des unternehmens', 'anschrift', 'adresse']
}
try:
wiki_lang = getattr(Config, 'LANG', 'de')
wikipedia.set_lang(wiki_lang)
wikipedia.set_rate_limiting(False)
self.logger.info(f"Wikipedia library language set to '{wiki_lang}'. Rate limiting DISABLED.")
except Exception as e:
self.logger.warning(f"Fehler beim Setzen der Wikipedia-Sprache oder Rate Limiting: {e}")
def _get_full_domain(self, website):
"""Extrahiert die normalisierte Domain (ohne www, ohne Pfad) aus einer URL."""
return simple_normalize_url(website)
def _generate_search_terms(self, company_name, website=None):
"""
Generiert eine Liste von potenziellen Wikipedia-Artikeltiteln.
v2.0: Mit verbesserter Logik für Namen, die Zahlen enthalten.
"""
if not company_name:
return []
# Basis-Normalisierung
normalized = normalize_company_name(company_name)
# NEUE LOGIK: Speziell für Namen wie "11 88 0 Solutions"
# Fügt eine Version hinzu, bei der Leerzeichen zwischen Zahlen entfernt werden.
if re.search(r'\d[\s\d]+\d', normalized):
condensed_normalized = re.sub(r'(\d)\s+(\d)', r'\1\2', normalized)
# Führe eine erneute, aggressivere Normalisierung durch, um Reste zu entfernen
condensed_normalized = normalize_company_name(condensed_normalized)
else:
condensed_normalized = None
search_terms = []
# Füge die kondensierte Version mit höchster Priorität hinzu, falls sie existiert
if condensed_normalized and condensed_normalized not in search_terms:
search_terms.append(condensed_normalized)
# Füge den Originalnamen und die normalisierte Version hinzu
if company_name not in search_terms:
search_terms.append(company_name)
if normalized not in search_terms:
search_terms.append(normalized)
# Füge Teile des Namens hinzu
parts = normalized.split()
if len(parts) > 1:
if parts[0] not in search_terms: search_terms.append(parts[0])
first_two = " ".join(parts[:2])
if first_two not in search_terms: search_terms.append(first_two)
# Füge die Website-Domain als Suchbegriff hinzu
if website:
domain = simple_normalize_url(website)
if domain != "k.A." and domain not in search_terms:
search_terms.append(domain)
# Entferne Duplikate und behalte die Reihenfolge bei
unique_terms = []
for term in search_terms:
if term and term not in unique_terms:
unique_terms.append(term)
# Limitiere auf maximal 5 Suchbegriffe, um API-Calls zu sparen
return unique_terms[:5]
@retry_on_failure
def _get_page_soup(self, url):
"""
Holt HTML von einer URL und gibt ein BeautifulSoup-Objekt zurueck.
"""
if not url or not isinstance(url, str) or not url.lower().startswith(("http://", "https://")):
self.logger.warning(f"_get_page_soup: Ungueltige URL '{url[:100]}...'.")
return None
try:
self.logger.debug(f"_get_page_soup: Rufe URL ab: {url[:100]}...")
response = self.session.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15))
response.raise_for_status()
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser'))
return soup
except Exception as e:
self.logger.error(f"_get_page_soup: Fehler beim Abrufen oder Parsen von HTML von {url[:100]}...: {e}")
raise e
def _validate_article(self, page, company_name, website):
"""
Validiert, ob ein Wikipedia-Artikel zum Unternehmen passt.
"""
if not page or not company_name: return False
self.logger.debug(f"Validiere Artikel '{page.title[:100]}...' fuer Firma '{company_name[:100]}'")
normalized_company = normalize_company_name(company_name)
normalized_title = normalize_company_name(page.title)
if not normalized_company or not normalized_title:
self.logger.warning("Validierung nicht moeglich, da Normalisierung eines Namens fehlschlug.")
return False
standard_threshold = getattr(Config, 'SIMILARITY_THRESHOLD', 0.65)
similarity = fuzzy_similarity(normalized_title, normalized_company)
company_tokens = normalized_company.split()
title_tokens = normalized_title.split()
first_word_match = False
first_two_words_match = False
if company_tokens and title_tokens and company_tokens[0] == title_tokens[0]:
first_word_match = True
if len(company_tokens) > 1 and len(title_tokens) > 1 and company_tokens[1] == title_tokens[1]:
first_two_words_match = True
domain_found = False
full_domain = self._get_full_domain(website)
if full_domain != "k.A.":
try:
article_html = page.html()
if article_html:
soup = BeautifulSoup(article_html, getattr(Config, 'HTML_PARSER', 'html.parser'))
external_links = soup.select('a[href^="http"]')
for link_tag in external_links:
href = link_tag.get('href', '')
if href and isinstance(href, str) and full_domain in simple_normalize_url(href):
if not any(ex in href.lower() for ex in ['wikipedia.org', 'wikimedia.org', 'wikidata.org', 'archive.org', 'webcitation.org']):
domain_found = True
break
except KeyError as e_key:
if 'extlinks' in str(e_key).lower():
self.logger.warning(f"KeyError ('{e_key}') bei Domain-Check für Artikel '{page.title[:100]}...'. Domain-Validierung übersprungen.")
else:
self.logger.error(f"Unerwarteter KeyError bei Domain-Prüfung für '{page.title[:100]}...': {e_key}")
except Exception as e_link_check:
self.logger.error(f"Allgemeiner Fehler waehrend der Domain-Link-Pruefung fuer '{page.title[:100]}...': {e_link_check}")
is_valid = False
reason = ""
# NEU: Detaillierte Debug-Ausgaben für jeden Schritt
self.logger.debug(f" Validierungs-Check für '{page.title[:50]}...':")
self.logger.debug(f" - Aehnlichkeit: {similarity:.2f} (Schwelle: {standard_threshold:.2f})")
self.logger.debug(f" - Domain '{full_domain}' im Artikel gefunden: {domain_found}")
self.logger.debug(f" - Erstes Wort identisch: {first_word_match}")
self.logger.debug(f" - Erste 2 Worte identisch: {first_two_words_match}")
if similarity >= standard_threshold:
is_valid, reason = True, f"Gesamt-Aehnlichkeit ({similarity:.2f}) >= Schwelle ({standard_threshold:.2f})"
elif domain_found and first_two_words_match:
is_valid, reason = True, f"Domain gefunden UND erste 2 Worte stimmen ueberein"
elif domain_found and first_word_match and similarity >= 0.40:
is_valid, reason = True, f"Domain gefunden UND erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.40"
elif first_two_words_match and similarity >= 0.45:
is_valid, reason = True, f"Erste zwei Worte stimmen ueberein UND Aehnlichkeit >= 0.45"
elif domain_found and similarity >= 0.50:
is_valid, reason = True, f"Domain gefunden UND Aehnlichkeit >= 0.50"
elif first_word_match and similarity >= 0.55:
is_valid, reason = True, f"Erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.55"
else:
reason = "Keine Validierungsregel traf zu"
log_level = logging.INFO if is_valid else logging.DEBUG
self.logger.log(log_level, f" => Artikel '{page.title[:100]}...' {'VALIDIERT' if is_valid else 'NICHT validiert'} (Grund: {reason})")
return is_valid
def search_company_article(self, company_name, website=None, parent_name=None, max_recursion_depth=1):
"""
Sucht einen passenden Wikipedia-Artikel fuer das Unternehmen und gibt das
wikipedia.WikipediaPage Objekt zurueck. Berücksichtigt nun auch den parent_name.
"""
if not company_name or str(company_name).strip() == "":
self.logger.warning("Wikipedia search skipped: No company name provided.")
raise ValueError("Kein Firmenname fuer Wikipedia Suche angegeben.")
search_terms = self._generate_search_terms(company_name, website)
if not search_terms:
self.logger.warning(f"Keine Suchbegriffe fuer '{company_name[:100]}...' generiert.")
return None
self.logger.info(f"Starte Wikipedia-Suche fuer '{company_name[:100]}...' mit Begriffen: {search_terms}")
processed_titles = set()
original_search_name_norm = normalize_company_name(company_name)
# KORRIGIERT: parent_name wird nun an die innere Funktion übergeben
def check_page_recursive(title_to_check, current_depth):
# KORRIGIERT: Sicherer Vergleich, falls max_recursion_depth None sein sollte
effective_max_depth = max_recursion_depth if max_recursion_depth is not None else 2
if title_to_check in processed_titles or current_depth > effective_max_depth:
return None
processed_titles.add(title_to_check)
self.logger.debug(f" -> Pruefe potenziellen Artikel: '{title_to_check[:100]}...' (Tiefe: {current_depth})")
normalized_option_title_local = normalize_company_name(title_to_check)
title_similarity_to_original = fuzzy_similarity(normalized_option_title_local, original_search_name_norm)
if current_depth > 0 and title_similarity_to_original < 0.3:
self.logger.debug(f" -> Option '{title_to_check[:100]}' hat zu geringe Ähnlichkeit ({title_similarity_to_original:.2f}). Übersprungen.")
return None
page = None
try:
page = wikipedia.page(title_to_check, auto_suggest=False, preload=False, redirect=True)
# KORRIGIERT: parent_name wird an die Validierung übergeben
if self._validate_article(page, company_name, website, parent_name):
self.logger.info(f" -> Titel '{page.title[:100]}...' erfolgreich validiert!")
return page
else:
return None
except wikipedia.exceptions.PageError:
self.logger.debug(f" -> Artikel '{title_to_check[:100]}' nicht gefunden (PageError).")
return None
except wikipedia.exceptions.DisambiguationError as e_disamb:
self.logger.info(f" -> Begriffsklaerung '{e_disamb.title}' gefunden (Tiefe {current_depth}). Pruefe Optionen...")
if current_depth >= effective_max_depth: return None
relevant_options = []
for option in e_disamb.options:
option_lower = option.lower()
if not any(ex in option_lower for ex in ["(person)", "(familienname)"]) and len(option) < 80:
if fuzzy_similarity(normalize_company_name(option), original_search_name_norm) > 0.3:
relevant_options.append(option)
for option_to_check in relevant_options[:3]:
# KORRIGIERT: parent_name wird im rekursiven Aufruf weitergereicht
validated_page = check_page_recursive(option_to_check, current_depth + 1)
if validated_page: return validated_page
return None
except Exception as e_page:
title_for_log = page.title[:100] if page and hasattr(page, 'title') and page.title else title_to_check[:100]
if "extlinks" in str(e_page).lower():
self.logger.warning(f" -> KeyError ('extlinks') beim Verarbeiten von Titel '{title_for_log}...'. Übersprungen.")
else:
self.logger.error(f" -> Unerwarteter Fehler bei Verarbeitung von Seite '{title_for_log}': {e_page}")
return None
# Hauptlogik der Suche
# Die innere Funktion `check_page_recursive` erbt `parent_name` jetzt aus dem äußeren Scope.
# Daher muss der Parameter hier nicht mehr übergeben werden.
# Iteriere durch alle generierten Suchbegriffe, inklusive des Originalnamens
for term in search_terms:
self.logger.debug(f" -> Versuche Suchbegriff: '{term[:100]}...'")
page_found = check_page_recursive(term, 0)
if page_found:
return page_found
self.logger.warning(f"Kein passender & validierter Wikipedia-Artikel fuer '{company_name[:100]}...' gefunden.")
return None
def _extract_first_paragraph_from_soup(self, soup):
"""
Extrahiert den ersten aussagekraeftigen Absatz aus dem Soup-Objekt eines Wikipedia-Artikels.
"""
if not soup: return "k.A."
paragraph_text = "k.A."
try:
content_div = soup.find('div', class_='mw-parser-output')
search_area = content_div if content_div else soup
paragraphs = search_area.find_all('p', recursive=False)
if not paragraphs: paragraphs = search_area.find_all('p')
for p in paragraphs:
for sup in p.find_all('sup', class_='reference'): sup.decompose()
for span in p.find_all('span', style=lambda v: v and 'display:none' in v): span.decompose()
for span in p.find_all('span', id='coordinates'): span.decompose()
text = clean_text(p.get_text(separator=' ', strip=True))
if text != "k.A." and len(text) > 50 and not re.match(r'^(Datei:|Abbildung:|Siehe auch:|Einzelnachweise|Siehe auch|Literatur)', text, re.IGNORECASE):
paragraph_text = text[:1500]
break
except Exception as e:
self.logger.error(f"Fehler beim Extrahieren des ersten Absatzes: {e}")
return paragraph_text
def extract_categories(self, soup):
"""
Extrahiert Wikipedia-Kategorien aus dem Soup-Objekt.
"""
if not soup: return "k.A."
cats_filtered = []
try:
cat_div = soup.find('div', id="mw-normal-catlinks")
if cat_div:
ul = cat_div.find('ul')
if ul:
cats = [clean_text(li.get_text()) for li in ul.find_all('li')]
cats_filtered = [c for c in cats if c and isinstance(c, str) and c.strip() and "kategorien:" not in c.lower()]
except Exception as e:
self.logger.error(f"Fehler beim Extrahieren der Kategorien: {e}")
return ", ".join(cats_filtered) if cats_filtered else "k.A."
def _extract_infobox_value(self, soup, target):
"""
Extrahiert gezielt Werte (Branche, Umsatz, etc.) aus der Infobox.
"""
if not soup or target not in self.keywords_map:
return "k.A."
keywords = self.keywords_map[target]
infobox = soup.select_one('table[class*="infobox"]')
if not infobox: return "k.A."
value_found = "k.A."
try:
rows = infobox.find_all('tr')
for row in rows:
cells = row.find_all(['th', 'td'], recursive=False)
header_text, value_cell = None, None
if len(cells) >= 2:
if cells[0].name == 'th':
header_text, value_cell = cells[0].get_text(strip=True), cells[1]
elif cells[0].name == 'td' and cells[1].name == 'td':
style = cells[0].get('style', '').lower()
is_header_like = 'font-weight' in style and ('bold' in style or '700' in style) or cells[0].find(['b', 'strong'], recursive=False)
if is_header_like:
header_text, value_cell = cells[0].get_text(strip=True), cells[1]
if header_text and value_cell:
if any(kw in header_text.lower() for kw in keywords):
for sup in value_cell.find_all(['sup', 'span']):
sup.decompose()
raw_value_text = value_cell.get_text(separator=' ', strip=True)
if target == 'branche' or target == 'sitz':
value_found = clean_text(raw_value_text).split('\n')[0].strip()
elif target == 'umsatz':
value_found = extract_numeric_value(raw_value_text, is_umsatz=True)
elif target == 'mitarbeiter':
value_found = extract_numeric_value(raw_value_text, is_umsatz=False)
value_found = value_found if value_found else "k.A."
self.logger.info(f" --> Infobox '{target}' gefunden: '{value_found}'")
break
except Exception as e:
self.logger.exception(f"Fehler beim Durchlaufen der Infobox-Zeilen fuer '{target}': {e}")
return "k.A. (Fehler Extraktion)"
return value_found
def _parse_sitz_string_detailed(self, raw_sitz_string_input):
"""
Versucht, aus einem rohen Sitz-String Stadt und Land detailliert zu extrahieren.
"""
sitz_stadt_val, sitz_land_val = "k.A.", "k.A."
if not raw_sitz_string_input or not isinstance(raw_sitz_string_input, str):
return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val}
temp_sitz = raw_sitz_string_input.strip()
if not temp_sitz or temp_sitz.lower() == "k.a.":
return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val}
# Diese Mappings könnten in die Config ausgelagert werden
known_countries_detailed = {
"deutschland": "Deutschland", "germany": "Deutschland", "de": "Deutschland",
"österreich": "Österreich", "austria": "Österreich", "at": "Österreich",
"schweiz": "Schweiz", "switzerland": "Schweiz", "ch": "Schweiz", "suisse": "Schweiz",
"usa": "USA", "u.s.": "USA", "united states": "USA", "vereinigte staaten": "USA",
"vereinigtes königreich": "Vereinigtes Königreich", "united kingdom": "Vereinigtes Königreich", "uk": "Vereinigtes Königreich",
}
region_to_country = {
"nrw": "Deutschland", "nordrhein-westfalen": "Deutschland", "bayern": "Deutschland", "hessen": "Deutschland",
"zg": "Schweiz", "zug": "Schweiz", "zh": "Schweiz", "zürich": "Schweiz",
"ca": "USA", "california": "USA", "ny": "USA", "new york": "USA",
}
extracted_country = ""
original_temp_sitz = temp_sitz
klammer_match = re.search(r'\(([^)]+)\)$', temp_sitz)
if klammer_match:
suffix_in_klammer = klammer_match.group(1).strip().lower()
if suffix_in_klammer in known_countries_detailed:
extracted_country = known_countries_detailed[suffix_in_klammer]
temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,")
elif suffix_in_klammer in region_to_country:
extracted_country = region_to_country[suffix_in_klammer]
temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,")
if not extracted_country and ',' in temp_sitz:
parts = [p.strip() for p in temp_sitz.split(',')]
if len(parts) > 1:
last_part_lower = parts[-1].lower()
if last_part_lower in known_countries_detailed:
extracted_country = known_countries_detailed[last_part_lower]
temp_sitz = ", ".join(parts[:-1]).strip(" ,")
elif last_part_lower in region_to_country:
extracted_country = region_to_country[last_part_lower]
temp_sitz = ", ".join(parts[:-1]).strip(" ,")
sitz_land_val = extracted_country if extracted_country else "k.A."
sitz_stadt_val = re.sub(r'^\d{4,8}\s*', '', temp_sitz).strip(" ,")
if not sitz_stadt_val:
sitz_stadt_val = "k.A." if sitz_land_val != "k.A." else re.sub(r'^\d{4,8}\s*', '', original_temp_sitz).strip(" ,") or "k.A."
return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val}
@retry_on_failure
def extract_company_data(self, url_or_page):
"""
Extrahiert strukturierte Unternehmensdaten aus einem Wikipedia-Artikel (URL oder page-Objekt).
Gibt nun auch den gesamten Rohtext des Artikels ('full_text') und den Titel zurück.
"""
default_result = {
'url': 'k.A.', 'title': 'k.A.', 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.',
'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.',
'mitarbeiter': 'k.A.', 'categories': 'k.A.', 'full_text': ''
}
page = None
try:
if isinstance(url_or_page, str) and "wikipedia.org" in url_or_page:
page_title = unquote(url_or_page.split('/wiki/')[-1].replace('_', ' '))
page = wikipedia.page(title=page_title, auto_suggest=False, redirect=True)
elif not isinstance(url_or_page, str): # Annahme: es ist ein page-Objekt
page = url_or_page
else:
self.logger.warning(f"extract_company_data: Ungültiger Input '{str(url_or_page)[:100]}...'.")
return default_result
self.logger.info(f"Extrahiere Daten für Wiki-Artikel: {page.title[:100]}...")
# Grundlegende Daten direkt aus dem page-Objekt extrahieren
first_paragraph = page.summary.split('\n')[0] if page.summary else 'k.A.'
categories = ", ".join(page.categories)
full_text = page.content
# Für Infobox-Daten benötigen wir weiterhin BeautifulSoup, da die 'wikipedia'-Bibliothek
# keinen strukturierten Zugriff darauf bietet.
soup = self._get_page_soup(page.url)
if not soup:
self.logger.warning(f" -> Konnte Seite für Soup-Parsing nicht laden. Extrahiere nur Basis-Daten.")
# Fallback, wenn Soup fehlschlägt
return {
'url': page.url, 'title': page.title, 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.',
'first_paragraph': first_paragraph, 'branche': 'k.A.', 'umsatz': 'k.A.',
'mitarbeiter': 'k.A.', 'categories': categories, 'full_text': full_text
}
# Extraktion der Infobox-Daten mit den bestehenden Helper-Funktionen
branche_val = self._extract_infobox_value(soup, 'branche')
umsatz_val = self._extract_infobox_value(soup, 'umsatz')
mitarbeiter_val = self._extract_infobox_value(soup, 'mitarbeiter')
raw_sitz_string = self._extract_infobox_value(soup, 'sitz')
parsed_sitz = self._parse_sitz_string_detailed(raw_sitz_string)
sitz_stadt_val = parsed_sitz['sitz_stadt']
sitz_land_val = parsed_sitz['sitz_land']
# Sammle die finalen Daten
result = {
'url': page.url,
'title': page.title,
'sitz_stadt': sitz_stadt_val,
'sitz_land': sitz_land_val,
'first_paragraph': first_paragraph,
'branche': branche_val,
'umsatz': umsatz_val,
'mitarbeiter': mitarbeiter_val,
'categories': categories,
'full_text': full_text
}
self.logger.info(f" -> Extrahierte Daten: Stadt='{sitz_stadt_val}', Land='{sitz_land_val}', U='{umsatz_val}', M='{mitarbeiter_val}'")
return result
except wikipedia.exceptions.PageError:
self.logger.error(f" -> Fehler: Wikipedia-Artikel für '{str(url_or_page)[:100]}' konnte nicht gefunden werden (PageError).")
return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'}
except Exception as e:
self.logger.error(f" -> Unerwarteter Fehler bei der Extraktion von '{str(url_or_page)[:100]}': {e}")
return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'}