Files
Brancheneinstufung2/brancheneinstufung.py
Floke ed0cea7cc7 domain priorisierung
Vollständige Domain-Extraktion:
Implementiert über die neue Methode _get_full_domain, die nun den kompletten Domainnamen (inklusive TLD) liefert (z. B. "heimbach.com").

Normalisierung der Firmennamen:
Einführung der Funktion normalize_company_name, welche gängige Firmierungsformen (z. B. GmbH, AG, Aktiengesellschaft, Co. KG, mbH, & Co. KG, e.V., Limited, Ltd, Inc, Corp, Corporation, Gruppe) entfernt. Dies führt zu einem konsistenten Vergleich zwischen den Unternehmensdaten und Wikipedia-Titeln.

Verbesserte Artikelvalidierung:
In _validate_article werden nun:

Infobox-Links sowie externe Links geprüft, ob sie den vollständigen Domainnamen enthalten (ohne Dateilinks).

Der Vergleich der Wikipedia-Titel und des Firmennamens erfolgt auf Basis der normalisierten Namen.

Ein dynamischer Schwellenwert wird verwendet (0.60 statt 0.65), wenn ein definitiver Link-Match gefunden wurde.
2025-03-31 19:18:07 +00:00

313 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
import re
import gspread
import wikipedia
import requests
from bs4 import BeautifulSoup
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime
from difflib import SequenceMatcher
import csv
# ==================== KONFIGURATION ====================
class Config:
VERSION = "1.1.3"
LANG = "de"
CREDENTIALS_FILE = "service_account.json"
SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo"
MAX_RETRIES = 3
RETRY_DELAY = 5
LOG_CSV = "gpt_antworten_log.csv"
SIMILARITY_THRESHOLD = 0.65
DEBUG = True
WIKIPEDIA_SEARCH_RESULTS = 5
HTML_PARSER = "html.parser"
# ==================== HELPER FUNCTIONS ====================
def retry_on_failure(func):
"""Decorator für Wiederholungsversuche bei Fehlern"""
def wrapper(*args, **kwargs):
for attempt in range(Config.MAX_RETRIES):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"⚠️ Fehler bei {func.__name__} (Versuch {attempt+1}): {str(e)[:100]}")
time.sleep(Config.RETRY_DELAY)
return None
return wrapper
def debug_print(message):
"""Debug-Ausgabe, wenn Config.DEBUG=True"""
if Config.DEBUG:
print(f"[DEBUG] {message}")
def clean_text(text):
"""Bereinigt Text von unerwünschten Zeichen"""
if not text:
return "k.A."
text = str(text)
text = re.sub(r'\[\d+\]', '', text) # Entferne Referenznummern
text = re.sub(r'\s+', ' ', text).strip()
return text if text else "k.A."
def normalize_company_name(name):
"""Entfernt gängige Firmierungsformen und normalisiert den Namen."""
if not name:
return ""
# Liste gängiger Firmierungsformen, inklusive "Gruppe"
pattern = r'\b(gmbh|ag|aktiengesellschaft|co\.?\s*kg|mbh|&\s*co\.?\s*kg|e\.v\.|limited|ltd|inc|corp|corporation|gruppe)\b'
normalized = re.sub(pattern, '', name, flags=re.IGNORECASE)
normalized = re.sub(r'[\-]', ' ', normalized) # Ersetze Bindestriche durch Leerzeichen
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized.lower()
# ==================== GOOGLE SHEET HANDLER ====================
class GoogleSheetHandler:
"""Handhabung der Google Sheets Interaktion"""
def __init__(self):
self.sheet = None
self.sheet_values = []
self._connect()
def _connect(self):
"""Stellt Verbindung zum Google Sheet her"""
scope = ["https://www.googleapis.com/auth/spreadsheets"]
creds = ServiceAccountCredentials.from_json_keyfile_name(Config.CREDENTIALS_FILE, scope)
self.sheet = gspread.authorize(creds).open_by_url(Config.SHEET_URL).sheet1
self.sheet_values = self.sheet.get_all_values()
def get_start_index(self):
"""Ermittelt die erste leere Zeile in Spalte N"""
filled_n = [row[13] if len(row) > 13 else '' for row in self.sheet_values[1:]]
return next((i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()), len(filled_n) + 1)
def update_row(self, row_num, values):
"""Aktualisiert eine Zeile im Sheet (Spalten G bis R, also 12 Spalten)"""
self.sheet.update(range_name=f"G{row_num}:R{row_num}", values=[values])
# ==================== WIKIPEDIA SCRAPER ====================
class WikipediaScraper:
"""Klasse zur Handhabung der Wikipedia-Suche und Datenextraktion"""
def __init__(self):
wikipedia.set_lang(Config.LANG)
def _get_full_domain(self, website):
"""Extrahiert den vollständigen Domainnamen (inklusive Topleveldomain) aus der URL."""
if not website:
return ""
website = website.lower().strip()
website = re.sub(r'^https?:\/\/', '', website)
website = re.sub(r'^www\.', '', website)
website = website.split('/')[0]
return website
def _generate_search_terms(self, company_name, website):
"""
Generiert Suchbegriffe in folgender Reihenfolge:
1. Vollständiger Domainname (z.B. "heimbach.com")
2. Die ersten zwei Wörter des normalisierten Firmennamens
3. Der vollständige, normalisierte Firmenname
"""
terms = []
full_domain = self._get_full_domain(website)
if full_domain:
terms.append(full_domain)
normalized_name = normalize_company_name(company_name)
candidate = " ".join(normalized_name.split()[:2]).strip()
if candidate and candidate not in terms:
terms.append(candidate)
if normalized_name and normalized_name not in terms:
terms.append(normalized_name)
debug_print(f"Generierte Suchbegriffe: {terms}")
return terms
def _validate_article(self, page, company_name, website):
"""
Validiert den Artikel:
- Sucht in der Infobox und in den externen Links nach Links, die den vollständigen Domainnamen enthalten.
Wird ein solcher Link gefunden, wird ein niedrigerer Schwellenwert (0.60) angewendet.
- Andernfalls werden der normalisierte Wikipedia-Titel und der normalisierte Firmenname verglichen.
"""
full_domain = self._get_full_domain(website)
domain_found = False
if full_domain:
try:
html_raw = requests.get(page.url).text
soup = BeautifulSoup(html_raw, Config.HTML_PARSER)
# Suche in der Infobox
infobox = soup.find('table', class_=lambda c: c and 'infobox' in c.lower())
if infobox:
links = infobox.find_all('a', href=True)
for link in links:
href = link.get('href').lower()
# Überspringe Dateilinks
if href.startswith('/wiki/datei:'):
continue
if full_domain in href:
debug_print(f"Definitiver Link-Match in Infobox gefunden: {href}")
domain_found = True
break
# Suche in den externen Links, falls vorhanden
if not domain_found and hasattr(page, 'externallinks'):
for ext_link in page.externallinks:
ext_link = ext_link.lower()
if full_domain in ext_link:
debug_print(f"Definitiver Link-Match in externen Links gefunden: {ext_link}")
domain_found = True
break
except Exception as e:
debug_print(f"Fehler beim Extrahieren von Links: {str(e)}")
# Normalisierte Namen für Vergleich
normalized_title = normalize_company_name(page.title)
normalized_company = normalize_company_name(company_name)
similarity = SequenceMatcher(None, normalized_title, normalized_company).ratio()
debug_print(f"Ähnlichkeit (normalisiert): {similarity:.2f} ({normalized_title} vs {normalized_company})")
threshold = 0.60 if domain_found else Config.SIMILARITY_THRESHOLD
return similarity >= threshold
@retry_on_failure
def search_company_article(self, company_name, website):
"""Sucht mit optimierten Suchbegriffen (vollständiger Domainname, Candidate, normalisierter Name) nach dem Wikipedia-Artikel."""
search_terms = self._generate_search_terms(company_name, website)
for term in search_terms:
try:
results = wikipedia.search(term, results=Config.WIKIPEDIA_SEARCH_RESULTS)
debug_print(f"Suchergebnisse für '{term}': {results}")
for title in results:
try:
page = wikipedia.page(title, auto_suggest=False)
if self._validate_article(page, company_name, website):
return page
except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e:
debug_print(f"Seitenfehler: {str(e)}")
continue
except Exception as e:
debug_print(f"Suchfehler: {str(e)}")
continue
return None
def _extract_infobox_value(self, soup, target):
"""Extrahiert Werte aus der Infobox (Fallback-Methode)"""
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
if not infobox:
return "k.A."
keywords = {
'branche': ['branche', 'industrie', 'tätigkeit', 'geschäftsfeld', 'sektor', 'produkte', 'leistungen', 'aktivitäten', 'wirtschaftszweig'],
'umsatz': ['umsatz', 'jahresumsatz', 'konzernumsatz', 'gesamtumsatz', 'erlöse', 'umsatzerlöse', 'einnahmen', 'ergebnis', 'jahresergebnis']
}[target]
for row in infobox.find_all('tr'):
header = row.find('th')
if header:
header_text = clean_text(header.get_text()).lower()
if any(kw in header_text for kw in keywords):
value = row.find('td')
if value:
raw_value = clean_text(value.get_text())
if target == 'branche':
clean_val = re.sub(r'\[.*?\]|\(.*?\)', '', raw_value)
return ' '.join(clean_val.split()).strip()
if target == 'umsatz':
match = re.search(r'(\d{1,3}(?:[.,]\d{3})*)\s*(?:Mio\.?|Millionen|Mrd\.?|Milliarden)?\s*€?', raw_value.replace('.', '').replace(',', '.'), re.IGNORECASE)
if match:
num = float(match.group(1))
if 'mrd' in raw_value.lower():
num *= 1000
return f"{num:.1f} Mio €"
return raw_value.strip()
return "k.A."
def extract_full_infobox(self, soup):
"""Extrahiert die komplette Infobox als Text"""
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
if not infobox:
return "k.A."
return clean_text(infobox.get_text(separator=' | '))
def extract_fields_from_infobox_text(self, infobox_text, field_names):
"""Extrahiert die gewünschten Felder aus dem Infobox-Text (getrennt durch ' | ')"""
result = {}
tokens = [token.strip() for token in infobox_text.split("|") if token.strip()]
for i, token in enumerate(tokens):
for field in field_names:
if token.lower() == field.lower():
j = i + 1
while j < len(tokens) and not tokens[j]:
j += 1
result[field] = tokens[j] if j < len(tokens) else "k.A."
return result
def extract_company_data(self, page_url):
"""Extrahiert Daten aus dem Wikipedia-Artikel (Infobox, Branche, Umsatz)"""
if not page_url:
return {'branche': 'k.A.', 'umsatz': 'k.A.', 'url': '', 'full_infobox': 'k.A.'}
try:
response = requests.get(page_url)
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
full_infobox = self.extract_full_infobox(soup)
extracted_fields = self.extract_fields_from_infobox_text(full_infobox, ['Branche', 'Umsatz'])
branche_val = extracted_fields.get('Branche', self._extract_infobox_value(soup, 'branche'))
umsatz_val = extracted_fields.get('Umsatz', self._extract_infobox_value(soup, 'umsatz'))
return {'full_infobox': full_infobox, 'branche': branche_val, 'umsatz': umsatz_val, 'url': page_url}
except Exception as e:
debug_print(f"Extraktionsfehler: {str(e)}")
return {'branche': 'k.A.', 'umsatz': 'k.A.', 'url': page_url, 'full_infobox': 'k.A.'}
# ==================== DATA PROCESSOR ====================
class DataProcessor:
"""Steuerung des Gesamtprozesses"""
def __init__(self):
self.sheet_handler = GoogleSheetHandler()
self.wiki_scraper = WikipediaScraper()
def process_rows(self, num_rows):
"""Verarbeitet die angegebene Anzahl an Zeilen"""
start_index = self.sheet_handler.get_start_index()
print(f"Starte bei Zeile {start_index+1}")
for i in range(start_index, min(start_index + num_rows, len(self.sheet_handler.sheet_values))):
row = self.sheet_handler.sheet_values[i]
self._process_single_row(i+1, row)
def _process_single_row(self, row_num, row_data):
"""Verarbeitung einer einzelnen Zeile"""
company_name = row_data[0] if len(row_data) > 0 else ""
website = row_data[1] if len(row_data) > 1 else ""
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {row_num}: {company_name}")
article = self.wiki_scraper.search_company_article(company_name, website)
if article:
company_data = self.wiki_scraper.extract_company_data(article.url)
else:
company_data = {'branche': 'k.A.', 'umsatz': 'k.A.', 'url': '', 'full_infobox': 'k.A.'}
current_values = self.sheet_handler.sheet.row_values(row_num)
new_values = [
company_data.get('full_infobox', 'k.A.'), # Spalte G: kompletter Infobox-Text
company_data['branche'] if company_data['branche'] != "k.A." else current_values[6] if len(current_values) > 6 else "k.A.",
"k.A.",
company_data['umsatz'] if company_data['umsatz'] != "k.A." else current_values[8] if len(current_values) > 8 else "k.A.",
"k.A.", "k.A.", "k.A.",
company_data['url'] if company_data['url'] else current_values[12] if len(current_values) > 12 else "",
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"k.A.", "k.A.",
Config.VERSION
]
self.sheet_handler.update_row(row_num, new_values)
print(f"✅ Aktualisiert: Branche: {new_values[1]}, Umsatz: {new_values[3]}, URL: {new_values[7]}")
time.sleep(Config.RETRY_DELAY)
# ==================== MAIN ====================
if __name__ == "__main__":
try:
num_rows = int(input("Wieviele Zeilen sollen überprüft werden? "))
except Exception as e:
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
exit(1)
processor = DataProcessor()
processor.process_rows(num_rows)
print("\n✅ Wikipedia-Auswertung abgeschlossen")