Files
Brancheneinstufung2/brancheneinstufung.py
Floke 59d6aed51a Optimierung der Wikipedia-Auswertung: Neue Spaltenreihenfolge, Datum/Uhrzeit und Version
Spaltenreihenfolge angepasst:

G: Wikipedia URL

H: Erster Absatz des Wikipedia-Artikels

I: Branche (aus Infobox)

J: Umsatz (als Zahl in Mio €, z. B. "159")

K: Anzahl Mitarbeiter (aus Infobox)

Neue Felder:

Spalte N: Aktuelles Datum und Zeit

Spalte Q: Version

Infobox-Ausgabe entfernt:
Die komplette Infobox wird nicht mehr ausgegeben.

Normalisierung der Firmennamen:
Erweiterte Regex entfernt nun eine umfangreiche Liste gängiger Firmierungsformen (u.a. GmbH, G.m.b.H., UG, AG, OHG, KG, GmbH & Co. KG, AG & Co. KG, e.K., Ltd., S.a r.l., Stiftung, Genossenschaft, gGmbH, gUG, PartG, KGaA, SE, OG, e.U., etc.).

Erster Absatz:
Eine neue Methode extrahiert den ersten sinnvollen Absatz des Wikipedia-Artikels.

Umsatzformatierung:
Umsatz wird als reine Zahl (in Mio €) ausgegeben.

Google Sheet Update:
Aktualisierung erfolgt nun in separaten Ranges für die gewünschten Spalten (G:K, N und Q).
2025-03-31 19:52:12 +00:00

350 lines
16 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
import re
import gspread
import wikipedia
import requests
from bs4 import BeautifulSoup
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime
from difflib import SequenceMatcher
import csv
# ==================== KONFIGURATION ====================
class Config:
VERSION = "1.1.4"
LANG = "de"
CREDENTIALS_FILE = "service_account.json"
SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo"
MAX_RETRIES = 3
RETRY_DELAY = 5
LOG_CSV = "gpt_antworten_log.csv"
SIMILARITY_THRESHOLD = 0.65
DEBUG = True
WIKIPEDIA_SEARCH_RESULTS = 5
HTML_PARSER = "html.parser"
# ==================== HELPER FUNCTIONS ====================
def retry_on_failure(func):
"""Decorator für Wiederholungsversuche bei Fehlern"""
def wrapper(*args, **kwargs):
for attempt in range(Config.MAX_RETRIES):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"⚠️ Fehler bei {func.__name__} (Versuch {attempt+1}): {str(e)[:100]}")
time.sleep(Config.RETRY_DELAY)
return None
return wrapper
def debug_print(message):
"""Debug-Ausgabe, wenn Config.DEBUG=True"""
if Config.DEBUG:
print(f"[DEBUG] {message}")
def clean_text(text):
"""Bereinigt Text von unerwünschten Zeichen"""
if not text:
return "k.A."
text = str(text)
text = re.sub(r'\[\d+\]', '', text) # Entferne Referenznummern
text = re.sub(r'\s+', ' ', text).strip()
return text if text else "k.A."
def normalize_company_name(name):
"""Entfernt gängige Firmierungsformen und normalisiert den Namen."""
if not name:
return ""
# Liste gängiger Firmierungsformen
forms = [
r'gmbh', r'g\.m\.b\.h\.', r'ug', r'u\.g\.', r'ug \(haftungsbeschränkt\)',
r'u\.g\. \(haftungsbeschränkt\)', r'ag', r'a\.g\.', r'ohg', r'o\.h\.g\.',
r'kg', r'k\.g\.', r'gmbh & co\. kg', r'g\.m\.b\.h\. & co\. k\.g\.',
r'ag & co\. kg', r'a\.g\. & co\. k\.g\.', r'e\.k\.', r'e\.kfm\.', r'e\.kfr\.',
r'ltd\.', r'ltd & co\. kg', r's\.a r\.l\.', r'stiftung', r'genossenschaft',
r'ggmbh', r'gug', r'partg', r'partgmbb', r'kgaa', r'se', r'og', r'o\.g\.',
r'e\.u\.', r'ges\.n\.b\.r\.', r'genmbh', r'verein', r'kollektivgesellschaft',
r'kommanditgesellschaft', r'einzelfirma', r'sàrl', r'sa', r'sagl',
r'gmbh & co\. ohg', r'ag & co\. ohg', r'gmbh & co\. kgaa', r'ag & co\. kgaa',
r's\.a\.', r's\.p\.a\.', r'b\.v\.', r'n\.v\.'
]
pattern = r'\b(' + '|'.join(forms) + r')\b'
normalized = re.sub(pattern, '', name, flags=re.IGNORECASE)
normalized = re.sub(r'[\-]', ' ', normalized) # Ersetze Bindestriche durch Leerzeichen
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized.lower()
# ==================== GOOGLE SHEET HANDLER ====================
class GoogleSheetHandler:
"""Handhabung der Google Sheets Interaktion"""
def __init__(self):
self.sheet = None
self.sheet_values = []
self._connect()
def _connect(self):
"""Stellt Verbindung zum Google Sheet her"""
scope = ["https://www.googleapis.com/auth/spreadsheets"]
creds = ServiceAccountCredentials.from_json_keyfile_name(Config.CREDENTIALS_FILE, scope)
self.sheet = gspread.authorize(creds).open_by_url(Config.SHEET_URL).sheet1
self.sheet_values = self.sheet.get_all_values()
def get_start_index(self):
"""Ermittelt die erste leere Zeile in Spalte N (Index 14)"""
filled_n = [row[13] if len(row) > 13 else '' for row in self.sheet_values[1:]]
return next((i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()), len(filled_n) + 1)
# Für die neuen Updates nutzen wir separate Update-Aufrufe
# ==================== WIKIPEDIA SCRAPER ====================
class WikipediaScraper:
"""Klasse zur Handhabung der Wikipedia-Suche und Datenextraktion"""
def __init__(self):
wikipedia.set_lang(Config.LANG)
def _get_full_domain(self, website):
"""Extrahiert den vollständigen Domainnamen (inklusive Topleveldomain) aus der URL."""
if not website:
return ""
website = website.lower().strip()
website = re.sub(r'^https?:\/\/', '', website)
website = re.sub(r'^www\.', '', website)
website = website.split('/')[0]
return website
def _generate_search_terms(self, company_name, website):
"""
Generiert Suchbegriffe in folgender Reihenfolge:
1. Vollständiger Domainname (z.B. "heimbach.com")
2. Die ersten zwei Wörter des normalisierten Firmennamens
3. Der vollständige, normalisierte Firmenname
"""
terms = []
full_domain = self._get_full_domain(website)
if full_domain:
terms.append(full_domain)
normalized_name = normalize_company_name(company_name)
candidate = " ".join(normalized_name.split()[:2]).strip()
if candidate and candidate not in terms:
terms.append(candidate)
if normalized_name and normalized_name not in terms:
terms.append(normalized_name)
debug_print(f"Generierte Suchbegriffe: {terms}")
return terms
def _validate_article(self, page, company_name, website):
"""
Validiert den Artikel:
- Sucht in der Infobox und in den externen Links nach Links, die den vollständigen Domainnamen enthalten.
Wird ein solcher Link gefunden, wird ein niedrigerer Schwellenwert (0.60) angewendet.
- Andernfalls werden der normalisierte Wikipedia-Titel und der normalisierte Firmenname verglichen.
"""
full_domain = self._get_full_domain(website)
domain_found = False
if full_domain:
try:
html_raw = requests.get(page.url).text
soup = BeautifulSoup(html_raw, Config.HTML_PARSER)
# Suche in der Infobox
infobox = soup.find('table', class_=lambda c: c and 'infobox' in c.lower())
if infobox:
links = infobox.find_all('a', href=True)
for link in links:
href = link.get('href').lower()
if href.startswith('/wiki/datei:'):
continue
if full_domain in href:
debug_print(f"Definitiver Link-Match in Infobox gefunden: {href}")
domain_found = True
break
# Suche in externen Links, falls vorhanden
if not domain_found and hasattr(page, 'externallinks'):
for ext_link in page.externallinks:
if full_domain in ext_link.lower():
debug_print(f"Definitiver Link-Match in externen Links gefunden: {ext_link}")
domain_found = True
break
except Exception as e:
debug_print(f"Fehler beim Extrahieren von Links: {str(e)}")
normalized_title = normalize_company_name(page.title)
normalized_company = normalize_company_name(company_name)
similarity = SequenceMatcher(None, normalized_title, normalized_company).ratio()
debug_print(f"Ähnlichkeit (normalisiert): {similarity:.2f} ({normalized_title} vs {normalized_company})")
threshold = 0.60 if domain_found else Config.SIMILARITY_THRESHOLD
return similarity >= threshold
def extract_first_paragraph(self, page_url):
"""Extrahiert den ersten sinnvollen Absatz aus dem Wikipedia-Artikel."""
try:
response = requests.get(page_url)
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
paragraphs = soup.find_all('p')
for p in paragraphs:
text = clean_text(p.get_text())
if len(text) > 50:
return text
return "k.A."
except Exception as e:
debug_print(f"Fehler beim Extrahieren des ersten Absatzes: {e}")
return "k.A."
def _extract_infobox_value(self, soup, target):
"""Extrahiert Werte aus der Infobox (Fallback-Methode) für 'branche', 'umsatz' und 'mitarbeiter'."""
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
if not infobox:
return "k.A."
keywords_map = {
'branche': ['branche', 'industrie', 'tätigkeit', 'geschäftsfeld', 'sektor', 'produkte', 'leistungen', 'aktivitäten', 'wirtschaftszweig'],
'umsatz': ['umsatz', 'jahresumsatz', 'konzernumsatz', 'gesamtumsatz', 'erlöse', 'umsatzerlöse', 'einnahmen', 'ergebnis', 'jahresergebnis'],
'mitarbeiter': ['mitarbeiter', 'beschäftigte', 'personal', 'anzahl mitarbeiter']
}
keywords = keywords_map.get(target, [])
for row in infobox.find_all('tr'):
header = row.find('th')
if header:
header_text = clean_text(header.get_text()).lower()
if any(kw in header_text for kw in keywords):
value = row.find('td')
if value:
raw_value = clean_text(value.get_text())
if target == 'branche':
clean_val = re.sub(r'\[.*?\]|\(.*?\)', '', raw_value)
return ' '.join(clean_val.split()).strip()
if target == 'umsatz':
match = re.search(r'(\d{1,3}(?:[.,]\d{3})*|\d+)', raw_value.replace('.', '').replace(',', '.'))
if match:
num = float(match.group(1))
return str(int(round(num)))
return raw_value.strip()
if target == 'mitarbeiter':
match = re.search(r'(\d{1,3}(?:[.,]\d{3})*|\d+)', raw_value.replace('.', '').replace(',', '.'))
if match:
return match.group(1)
return raw_value.strip()
return "k.A."
def extract_full_infobox(self, soup):
"""Extrahiert die komplette Infobox als Text (nicht mehr ausgegeben)"""
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
if not infobox:
return "k.A."
return clean_text(infobox.get_text(separator=' | '))
def extract_fields_from_infobox_text(self, infobox_text, field_names):
"""Extrahiert die gewünschten Felder aus dem Infobox-Text (getrennt durch ' | ')"""
result = {}
tokens = [token.strip() for token in infobox_text.split("|") if token.strip()]
for i, token in enumerate(tokens):
for field in field_names:
if token.lower() == field.lower():
j = i + 1
while j < len(tokens) and not tokens[j]:
j += 1
result[field] = tokens[j] if j < len(tokens) else "k.A."
return result
def extract_company_data(self, page_url):
"""Extrahiert Daten aus dem Wikipedia-Artikel (Erster Absatz, Branche, Umsatz, Mitarbeiter)"""
if not page_url:
return {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.'}
try:
response = requests.get(page_url)
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
full_infobox = self.extract_full_infobox(soup)
extracted_fields = self.extract_fields_from_infobox_text(full_infobox, ['Branche', 'Umsatz', 'Mitarbeiter'])
branche_val = extracted_fields.get('Branche', self._extract_infobox_value(soup, 'branche'))
umsatz_val = extracted_fields.get('Umsatz', self._extract_infobox_value(soup, 'umsatz'))
mitarbeiter_val = extracted_fields.get('Mitarbeiter', self._extract_infobox_value(soup, 'mitarbeiter'))
first_paragraph = self.extract_first_paragraph(page_url)
return {
'url': page_url,
'first_paragraph': first_paragraph,
'branche': branche_val,
'umsatz': umsatz_val,
'mitarbeiter': mitarbeiter_val
}
except Exception as e:
debug_print(f"Extraktionsfehler: {str(e)}")
return {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.'}
@retry_on_failure
def search_company_article(self, company_name, website):
"""Sucht mit optimierten Suchbegriffen (vollständiger Domainname, Candidate, normalisierter Name) nach dem Wikipedia-Artikel."""
search_terms = self._generate_search_terms(company_name, website)
for term in search_terms:
try:
results = wikipedia.search(term, results=Config.WIKIPEDIA_SEARCH_RESULTS)
debug_print(f"Suchergebnisse für '{term}': {results}")
for title in results:
try:
page = wikipedia.page(title, auto_suggest=False)
if self._validate_article(page, company_name, website):
return page
except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e:
debug_print(f"Seitenfehler: {str(e)}")
continue
except Exception as e:
debug_print(f"Suchfehler: {str(e)}")
continue
return None
# ==================== DATA PROCESSOR ====================
class DataProcessor:
"""Steuerung des Gesamtprozesses"""
def __init__(self):
self.sheet_handler = GoogleSheetHandler()
self.wiki_scraper = WikipediaScraper()
def process_rows(self, num_rows):
"""Verarbeitet die angegebene Anzahl an Zeilen"""
start_index = self.sheet_handler.get_start_index()
print(f"Starte bei Zeile {start_index+1}")
for i in range(start_index, min(start_index + num_rows, len(self.sheet_handler.sheet_values))):
row = self.sheet_handler.sheet_values[i]
self._process_single_row(i+1, row)
def _process_single_row(self, row_num, row_data):
"""Verarbeitung einer einzelnen Zeile"""
company_name = row_data[0] if len(row_data) > 0 else ""
website = row_data[1] if len(row_data) > 1 else ""
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {row_num}: {company_name}")
article = self.wiki_scraper.search_company_article(company_name, website)
if article:
company_data = self.wiki_scraper.extract_company_data(article.url)
else:
company_data = {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.'}
# Update der Spalten:
# G: URL, H: erster Absatz, I: Branche, J: Umsatz (als Zahl), K: Mitarbeiter
self.sheet_handler.sheet.update(f"G{row_num}:K{row_num}", [[
company_data.get('url', 'k.A.'),
company_data.get('first_paragraph', 'k.A.'),
company_data.get('branche', 'k.A.'),
company_data.get('umsatz', 'k.A.'),
company_data.get('mitarbeiter', 'k.A.')
]])
# Spalte N: Datum und aktuelle Zeit
current_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.sheet_handler.sheet.update(f"N{row_num}", [[current_dt]])
# Spalte Q: Version
self.sheet_handler.sheet.update(f"Q{row_num}", [[Config.VERSION]])
print(f"✅ Aktualisiert: URL: {company_data.get('url', 'k.A.')}, Erster Absatz: {company_data.get('first_paragraph', 'k.A.')[:30]}..., Branche: {company_data.get('branche', 'k.A.')}, Umsatz: {company_data.get('umsatz', 'k.A.')}, Mitarbeiter: {company_data.get('mitarbeiter', 'k.A.')}")
time.sleep(Config.RETRY_DELAY)
# ==================== MAIN ====================
if __name__ == "__main__":
try:
num_rows = int(input("Wieviele Zeilen sollen überprüft werden? "))
except Exception as e:
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
exit(1)
processor = DataProcessor()
processor.process_rows(num_rows)
print("\n✅ Wikipedia-Auswertung abgeschlossen")