Files
Brancheneinstufung2/wikipedia_scraper.py

491 lines
25 KiB
Python

# --- START OF FILE wikipedia_scraper.py ---
#!/usr/bin/env python3
"""
wikipedia_scraper.py
Klasse zur Kapselung der Interaktionen mit Wikipedia, inklusive Suche,
Validierung und Extraktion von Unternehmensdaten.
"""
import logging
import re
import time
import traceback
from urllib.parse import unquote
import requests
import wikipedia
from bs4 import BeautifulSoup
# Import der abhängigen Module
from config import Config
from helpers import (retry_on_failure, simple_normalize_url,
normalize_company_name, extract_numeric_value,
clean_text, fuzzy_similarity)
class WikipediaScraper:
"""
Handhabt das Suchen von Wikipedia-Artikeln und das Extrahieren relevanter
Unternehmensdaten. Beinhaltet Validierungslogik fuer Artikel.
Nutzt die wikipedia-Bibliothek und Requests fuer direktes HTML-Scraping.
"""
def __init__(self, user_agent=None):
"""
Initialisiert den Scraper mit einer Requests-Session und konfigurierter
Wikipedia-Bibliothek.
"""
self.logger = logging.getLogger(__name__ + ".WikipediaScraper")
self.logger.debug("WikipediaScraper initialisiert.")
self.user_agent = user_agent or getattr(Config, 'USER_AGENT', 'Mozilla/5.0 (compatible; UnternehmenSkript/1.0; +http://www.example.com/bot)')
self.session = requests.Session()
self.session.headers.update({'User-Agent': self.user_agent})
self.logger.debug(f"Requests Session mit User-Agent '{self.user_agent}' initialisiert.")
self.keywords_map = {
'branche': ['branche', 'wirtschaftszweig', 'industry', 'taetigkeit', 'sektor', 'produkte', 'leistungen'],
'umsatz': ['umsatz', 'erloes', 'revenue', 'jahresumsatz', 'konzernumsatz', 'ergebnis'],
'mitarbeiter': ['mitarbeiter', 'mitarbeiterzahl', 'beschaeftigte', 'employees', 'number of employees', 'personal', 'belegschaft'],
'sitz': ['sitz', 'hauptsitz', 'unternehmenssitz', 'firmensitz', 'headquarters', 'standort', 'sitz des unternehmens', 'anschrift', 'adresse']
}
try:
wiki_lang = getattr(Config, 'LANG', 'de')
wikipedia.set_lang(wiki_lang)
wikipedia.set_rate_limiting(False)
self.logger.info(f"Wikipedia library language set to '{wiki_lang}'. Rate limiting DISABLED.")
except Exception as e:
self.logger.warning(f"Fehler beim Setzen der Wikipedia-Sprache oder Rate Limiting: {e}")
def _get_full_domain(self, website):
"""Extrahiert die normalisierte Domain (ohne www, ohne Pfad) aus einer URL."""
return simple_normalize_url(website)
def _generate_search_terms(self, company_name, website=None):
"""
Generiert eine Liste von potenziellen Wikipedia-Artikeltiteln.
v2.0: Mit verbesserter Logik für Namen, die Zahlen enthalten.
"""
if not company_name:
return []
normalized = normalize_company_name(company_name)
# Verbesserte Logik für Namen wie "11 88 0 Solutions"
condensed_normalized = None
if re.search(r'\d[\s\d]+\d', normalized):
condensed_normalized = re.sub(r'(\d)\s+(\d)', r'\1\2', normalized)
condensed_normalized = normalize_company_name(condensed_normalized)
search_terms = []
if condensed_normalized: search_terms.append(condensed_normalized)
search_terms.append(company_name)
search_terms.append(normalized)
parts = normalized.split()
if len(parts) > 1:
search_terms.append(parts[0])
search_terms.append(" ".join(parts[:2]))
if website:
domain = simple_normalize_url(website)
if domain != "k.A.":
search_terms.append(domain)
unique_terms = list(dict.fromkeys([term for term in search_terms if term])) # Entfernt Duplikate, behält Reihenfolge
return unique_terms[:5]
@retry_on_failure
def _get_page_soup(self, url):
"""
Holt HTML von einer URL und gibt ein BeautifulSoup-Objekt zurueck.
"""
if not url or not isinstance(url, str) or not url.lower().startswith(("http://", "https://")):
self.logger.warning(f"_get_page_soup: Ungueltige URL '{url[:100]}...'.")
return None
try:
self.logger.debug(f"_get_page_soup: Rufe URL ab: {url[:100]}...")
response = self.session.get(url, timeout=getattr(Config, 'REQUEST_TIMEOUT', 15))
response.raise_for_status()
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, getattr(Config, 'HTML_PARSER', 'html.parser'))
return soup
except Exception as e:
self.logger.error(f"_get_page_soup: Fehler beim Abrufen oder Parsen von HTML von {url[:100]}...: {e}")
raise e
def _validate_article(self, page, company_name, website, parent_name=None):
"""
Validiert, ob ein Wikipedia-Artikel zum Unternehmen passt.
v2.0: Nutzt parent_name als primäres Kriterium. Ihre bestehenden
Regeln bleiben als Fallback erhalten.
"""
if not page or not company_name:
return False
self.logger.debug(f"Validiere Artikel '{page.title[:100]}...' fuer Firma '{company_name[:100]}'")
# --- Stufe 1: Parent-Validierung (höchste Priorität) ---
normalized_parent = normalize_company_name(parent_name) if parent_name else None
if normalized_parent:
# Überprüfe Titel und den ersten Absatz (Summary) auf den Parent-Namen
page_content_for_check = (page.title + " " + page.summary).lower()
if normalized_parent in page_content_for_check:
reason = f"Parent-Name '{parent_name}' im Artikel-Titel oder -Summary gefunden."
self.logger.info(f" => Artikel '{page.title[:100]}...' VALIDIERT (Grund: {reason})")
return True
# --- Stufe 2: Ihre bestehende, detaillierte Validierungslogik als Fallback ---
normalized_company = normalize_company_name(company_name)
normalized_title = normalize_company_name(page.title)
if not normalized_company or not normalized_title:
self.logger.warning("Validierung nicht moeglich, da Normalisierung eines Namens fehlschlug.")
return False
standard_threshold = getattr(Config, 'SIMILARITY_THRESHOLD', 0.65)
similarity = fuzzy_similarity(normalized_title, normalized_company)
company_tokens = normalized_company.split()
title_tokens = normalized_title.split()
first_word_match = False
first_two_words_match = False
if company_tokens and title_tokens and company_tokens[0] == title_tokens[0]:
first_word_match = True
if len(company_tokens) > 1 and len(title_tokens) > 1 and company_tokens[1] == title_tokens[1]:
first_two_words_match = True
domain_found = False
full_domain = self._get_full_domain(website)
if full_domain != "k.A.":
try:
# page.html() kann fehleranfällig sein, wir prüfen den gerenderten Text (page.content)
if page.content and full_domain in page.content.lower():
domain_found = True
except Exception as e_link_check:
self.logger.error(f"Allgemeiner Fehler waehrend der Domain-Pruefung fuer '{page.title[:100]}...': {e_link_check}")
is_valid = False
reason = ""
self.logger.debug(f" Validierungs-Check (Fallback) für '{page.title[:50]}...':")
self.logger.debug(f" - Aehnlichkeit: {similarity:.2f} (Schwelle: {standard_threshold:.2f})")
self.logger.debug(f" - Domain '{full_domain}' im Artikel gefunden: {domain_found}")
self.logger.debug(f" - Erstes Wort identisch: {first_word_match}")
self.logger.debug(f" - Erste 2 Worte identisch: {first_two_words_match}")
if similarity >= standard_threshold:
is_valid, reason = True, f"Gesamt-Aehnlichkeit ({similarity:.2f}) >= Schwelle ({standard_threshold:.2f})"
elif domain_found and first_two_words_match:
is_valid, reason = True, "Domain gefunden UND erste 2 Worte stimmen ueberein"
elif domain_found and first_word_match and similarity >= 0.40:
is_valid, reason = True, "Domain gefunden UND erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.40"
elif first_two_words_match and similarity >= 0.45:
is_valid, reason = True, "Erste zwei Worte stimmen ueberein UND Aehnlichkeit >= 0.45"
elif domain_found and similarity >= 0.50:
is_valid, reason = True, "Domain gefunden UND Aehnlichkeit >= 0.50"
elif first_word_match and similarity >= 0.55:
is_valid, reason = True, "Erstes Wort stimmt ueberein UND Aehnlichkeit >= 0.55"
else:
reason = "Keine der Fallback-Validierungsregeln traf zu"
log_level = logging.INFO if is_valid else logging.DEBUG
self.logger.log(log_level, f" => Artikel '{page.title[:100]}...' {'VALIDIERT' if is_valid else 'NICHT validiert'} (Grund: {reason})")
return is_valid
def search_company_article(self, company_name, website=None, parent_name=None, max_recursion_depth=1):
"""
Sucht einen passenden Wikipedia-Artikel. Behält die komplexe Logik bei und behebt den TypeError.
"""
if not company_name or str(company_name).strip() == "":
return None
search_terms = self._generate_search_terms(company_name, website)
if not search_terms:
return None
self.logger.info(f"Starte Wikipedia-Suche fuer '{company_name[:100]}...' mit Begriffen: {search_terms}")
processed_titles = set()
original_search_name_norm = normalize_company_name(company_name)
# Die innere Funktion "erbt" `parent_name` aus dem Scope der äußeren Funktion.
def check_page_recursive(title_to_check, current_depth):
effective_max_depth = max_recursion_depth if max_recursion_depth is not None else 2
if title_to_check in processed_titles or current_depth > effective_max_depth:
return None
processed_titles.add(title_to_check)
self.logger.debug(f" -> Pruefe potenziellen Artikel: '{title_to_check[:100]}...' (Tiefe: {current_depth})")
# Ihre bestehende Logik mit fuzzy_similarity
normalized_option_title_local = normalize_company_name(title_to_check)
title_similarity_to_original = fuzzy_similarity(normalized_option_title_local, original_search_name_norm)
if current_depth > 0 and title_similarity_to_original < 0.3:
self.logger.debug(f" -> Option '{title_to_check[:100]}' hat zu geringe Ähnlichkeit ({title_similarity_to_original:.2f}). Übersprungen.")
return None
page = None
try:
page = wikipedia.page(title_to_check, auto_suggest=False, preload=False, redirect=True)
# KORRIGIERTER AUFRUF: Übergibt `parent_name` aus dem äußeren Scope
if self._validate_article(page, company_name, website, parent_name):
self.logger.info(f" -> Titel '{page.title[:100]}...' erfolgreich validiert!")
return page
else:
return None
except wikipedia.exceptions.PageError:
self.logger.debug(f" -> Artikel '{title_to_check[:100]}' nicht gefunden (PageError).")
return None
except wikipedia.exceptions.DisambiguationError as e_disamb:
self.logger.info(f" -> Begriffsklaerung '{e_disamb.title}' gefunden (Tiefe {current_depth}). Pruefe Optionen...")
if current_depth >= effective_max_depth: return None
# Ihre bestehende Logik zur Filterung von Optionen
relevant_options = []
for option in e_disamb.options:
option_lower = option.lower()
if not any(ex in option_lower for ex in ["(person)", "(familienname)"]) and len(option) < 80:
if fuzzy_similarity(normalize_company_name(option), original_search_name_norm) > 0.3:
relevant_options.append(option)
for option_to_check in relevant_options[:3]:
validated_page = check_page_recursive(option_to_check, current_depth + 1)
if validated_page: return validated_page
return None
except Exception as e_page:
# Ihre bestehende Fehlerbehandlung
title_for_log = page.title[:100] if page and hasattr(page, 'title') and page.title else title_to_check[:100]
self.logger.error(f" -> Unerwarteter Fehler bei Verarbeitung von Seite '{title_for_log}': {e_page}")
return None
# Ihre bestehende Hauptlogik der Suche
for term in search_terms:
page_found = check_page_recursive(term, 0)
if page_found: return page_found
self.logger.warning(f"Kein passender & validierter Wikipedia-Artikel fuer '{company_name[:100]}...' gefunden.")
return None
def _extract_first_paragraph_from_soup(self, soup):
"""
Extrahiert den ersten aussagekraeftigen Absatz aus dem Soup-Objekt eines Wikipedia-Artikels.
"""
if not soup: return "k.A."
paragraph_text = "k.A."
try:
content_div = soup.find('div', class_='mw-parser-output')
search_area = content_div if content_div else soup
paragraphs = search_area.find_all('p', recursive=False)
if not paragraphs: paragraphs = search_area.find_all('p')
for p in paragraphs:
for sup in p.find_all('sup', class_='reference'): sup.decompose()
for span in p.find_all('span', style=lambda v: v and 'display:none' in v): span.decompose()
for span in p.find_all('span', id='coordinates'): span.decompose()
text = clean_text(p.get_text(separator=' ', strip=True))
if text != "k.A." and len(text) > 50 and not re.match(r'^(Datei:|Abbildung:|Siehe auch:|Einzelnachweise|Siehe auch|Literatur)', text, re.IGNORECASE):
paragraph_text = text[:1500]
break
except Exception as e:
self.logger.error(f"Fehler beim Extrahieren des ersten Absatzes: {e}")
return paragraph_text
def extract_categories(self, soup):
"""
Extrahiert Wikipedia-Kategorien aus dem Soup-Objekt.
"""
if not soup: return "k.A."
cats_filtered = []
try:
cat_div = soup.find('div', id="mw-normal-catlinks")
if cat_div:
ul = cat_div.find('ul')
if ul:
cats = [clean_text(li.get_text()) for li in ul.find_all('li')]
cats_filtered = [c for c in cats if c and isinstance(c, str) and c.strip() and "kategorien:" not in c.lower()]
except Exception as e:
self.logger.error(f"Fehler beim Extrahieren der Kategorien: {e}")
return ", ".join(cats_filtered) if cats_filtered else "k.A."
def _extract_infobox_value(self, soup, target):
"""
Extrahiert gezielt Werte (Branche, Umsatz, etc.) aus der Infobox.
"""
if not soup or target not in self.keywords_map:
return "k.A."
keywords = self.keywords_map[target]
infobox = soup.select_one('table[class*="infobox"]')
if not infobox: return "k.A."
value_found = "k.A."
try:
rows = infobox.find_all('tr')
for row in rows:
cells = row.find_all(['th', 'td'], recursive=False)
header_text, value_cell = None, None
if len(cells) >= 2:
if cells[0].name == 'th':
header_text, value_cell = cells[0].get_text(strip=True), cells[1]
elif cells[0].name == 'td' and cells[1].name == 'td':
style = cells[0].get('style', '').lower()
is_header_like = 'font-weight' in style and ('bold' in style or '700' in style) or cells[0].find(['b', 'strong'], recursive=False)
if is_header_like:
header_text, value_cell = cells[0].get_text(strip=True), cells[1]
if header_text and value_cell:
if any(kw in header_text.lower() for kw in keywords):
for sup in value_cell.find_all(['sup', 'span']):
sup.decompose()
raw_value_text = value_cell.get_text(separator=' ', strip=True)
if target == 'branche' or target == 'sitz':
value_found = clean_text(raw_value_text).split('\n')[0].strip()
elif target == 'umsatz':
value_found = extract_numeric_value(raw_value_text, is_umsatz=True)
elif target == 'mitarbeiter':
value_found = extract_numeric_value(raw_value_text, is_umsatz=False)
value_found = value_found if value_found else "k.A."
self.logger.info(f" --> Infobox '{target}' gefunden: '{value_found}'")
break
except Exception as e:
self.logger.exception(f"Fehler beim Durchlaufen der Infobox-Zeilen fuer '{target}': {e}")
return "k.A. (Fehler Extraktion)"
return value_found
def _parse_sitz_string_detailed(self, raw_sitz_string_input):
"""
Versucht, aus einem rohen Sitz-String Stadt und Land detailliert zu extrahieren.
"""
sitz_stadt_val, sitz_land_val = "k.A.", "k.A."
if not raw_sitz_string_input or not isinstance(raw_sitz_string_input, str):
return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val}
temp_sitz = raw_sitz_string_input.strip()
if not temp_sitz or temp_sitz.lower() == "k.a.":
return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val}
# Diese Mappings könnten in die Config ausgelagert werden
known_countries_detailed = {
"deutschland": "Deutschland", "germany": "Deutschland", "de": "Deutschland",
"österreich": "Österreich", "austria": "Österreich", "at": "Österreich",
"schweiz": "Schweiz", "switzerland": "Schweiz", "ch": "Schweiz", "suisse": "Schweiz",
"usa": "USA", "u.s.": "USA", "united states": "USA", "vereinigte staaten": "USA",
"vereinigtes königreich": "Vereinigtes Königreich", "united kingdom": "Vereinigtes Königreich", "uk": "Vereinigtes Königreich",
}
region_to_country = {
"nrw": "Deutschland", "nordrhein-westfalen": "Deutschland", "bayern": "Deutschland", "hessen": "Deutschland",
"zg": "Schweiz", "zug": "Schweiz", "zh": "Schweiz", "zürich": "Schweiz",
"ca": "USA", "california": "USA", "ny": "USA", "new york": "USA",
}
extracted_country = ""
original_temp_sitz = temp_sitz
klammer_match = re.search(r'\(([^)]+)\)$', temp_sitz)
if klammer_match:
suffix_in_klammer = klammer_match.group(1).strip().lower()
if suffix_in_klammer in known_countries_detailed:
extracted_country = known_countries_detailed[suffix_in_klammer]
temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,")
elif suffix_in_klammer in region_to_country:
extracted_country = region_to_country[suffix_in_klammer]
temp_sitz = temp_sitz[:klammer_match.start()].strip(" ,")
if not extracted_country and ',' in temp_sitz:
parts = [p.strip() for p in temp_sitz.split(',')]
if len(parts) > 1:
last_part_lower = parts[-1].lower()
if last_part_lower in known_countries_detailed:
extracted_country = known_countries_detailed[last_part_lower]
temp_sitz = ", ".join(parts[:-1]).strip(" ,")
elif last_part_lower in region_to_country:
extracted_country = region_to_country[last_part_lower]
temp_sitz = ", ".join(parts[:-1]).strip(" ,")
sitz_land_val = extracted_country if extracted_country else "k.A."
sitz_stadt_val = re.sub(r'^\d{4,8}\s*', '', temp_sitz).strip(" ,")
if not sitz_stadt_val:
sitz_stadt_val = "k.A." if sitz_land_val != "k.A." else re.sub(r'^\d{4,8}\s*', '', original_temp_sitz).strip(" ,") or "k.A."
return {'sitz_stadt': sitz_stadt_val, 'sitz_land': sitz_land_val}
@retry_on_failure
def extract_company_data(self, url_or_page):
"""
Extrahiert strukturierte Unternehmensdaten aus einem Wikipedia-Artikel (URL oder page-Objekt).
Gibt nun auch den gesamten Rohtext des Artikels ('full_text') und den Titel zurück.
"""
default_result = {
'url': 'k.A.', 'title': 'k.A.', 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.',
'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.',
'mitarbeiter': 'k.A.', 'categories': 'k.A.', 'full_text': ''
}
page = None
try:
if isinstance(url_or_page, str) and "wikipedia.org" in url_or_page:
page_title = unquote(url_or_page.split('/wiki/')[-1].replace('_', ' '))
page = wikipedia.page(title=page_title, auto_suggest=False, redirect=True)
elif not isinstance(url_or_page, str): # Annahme: es ist ein page-Objekt
page = url_or_page
else:
self.logger.warning(f"extract_company_data: Ungültiger Input '{str(url_or_page)[:100]}...'.")
return default_result
self.logger.info(f"Extrahiere Daten für Wiki-Artikel: {page.title[:100]}...")
# Grundlegende Daten direkt aus dem page-Objekt extrahieren
first_paragraph = page.summary.split('\n')[0] if page.summary else 'k.A.'
categories = ", ".join(page.categories)
full_text = page.content
# Für Infobox-Daten benötigen wir weiterhin BeautifulSoup, da die 'wikipedia'-Bibliothek
# keinen strukturierten Zugriff darauf bietet.
soup = self._get_page_soup(page.url)
if not soup:
self.logger.warning(f" -> Konnte Seite für Soup-Parsing nicht laden. Extrahiere nur Basis-Daten.")
# Fallback, wenn Soup fehlschlägt
return {
'url': page.url, 'title': page.title, 'sitz_stadt': 'k.A.', 'sitz_land': 'k.A.',
'first_paragraph': first_paragraph, 'branche': 'k.A.', 'umsatz': 'k.A.',
'mitarbeiter': 'k.A.', 'categories': categories, 'full_text': full_text
}
# Extraktion der Infobox-Daten mit den bestehenden Helper-Funktionen
branche_val = self._extract_infobox_value(soup, 'branche')
umsatz_val = self._extract_infobox_value(soup, 'umsatz')
mitarbeiter_val = self._extract_infobox_value(soup, 'mitarbeiter')
raw_sitz_string = self._extract_infobox_value(soup, 'sitz')
parsed_sitz = self._parse_sitz_string_detailed(raw_sitz_string)
sitz_stadt_val = parsed_sitz['sitz_stadt']
sitz_land_val = parsed_sitz['sitz_land']
# Sammle die finalen Daten
result = {
'url': page.url,
'title': page.title,
'sitz_stadt': sitz_stadt_val,
'sitz_land': sitz_land_val,
'first_paragraph': first_paragraph,
'branche': branche_val,
'umsatz': umsatz_val,
'mitarbeiter': mitarbeiter_val,
'categories': categories,
'full_text': full_text
}
self.logger.info(f" -> Extrahierte Daten: Stadt='{sitz_stadt_val}', Land='{sitz_land_val}', U='{umsatz_val}', M='{mitarbeiter_val}'")
return result
except wikipedia.exceptions.PageError:
self.logger.error(f" -> Fehler: Wikipedia-Artikel für '{str(url_or_page)[:100]}' konnte nicht gefunden werden (PageError).")
return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'}
except Exception as e:
self.logger.error(f" -> Unerwarteter Fehler bei der Extraktion von '{str(url_or_page)[:100]}': {e}")
return {**default_result, 'url': str(url_or_page) if isinstance(url_or_page, str) else 'k.A.'}