syntax fix

This commit is contained in:
2025-03-31 14:32:29 +00:00
parent 11e987dec4
commit 1ba0359cec

View File

@@ -1,16 +1,15 @@
import os import os
import time import time
import re import re
import gspread import gspread
import wikipedia import wikipedia
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from oauth2client.service_account import ServiceAccountCredentials from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime from datetime import datetime
from difflib import SequenceMatcher from difflib import SequenceMatcher
import csv import csv
# ==================== KONFIGURATION ====================
# ==================== KONFIGURATION ====================
class Config: class Config:
VERSION = "1.1.1" VERSION = "1.1.1"
LANG = "de" LANG = "de"
@@ -23,9 +22,8 @@ class Config:
DEBUG = True DEBUG = True
WIKIPEDIA_SEARCH_RESULTS = 8 WIKIPEDIA_SEARCH_RESULTS = 8
HTML_PARSER = "html.parser" HTML_PARSER = "html.parser"
# ==================== HELPER FUNCTIONS ====================
# ==================== HELPER FUNCTIONS ==================== def retry_on_failure(func):
def retry_on_failure(func):
"""Decorator für Wiederholungsversuche bei Fehlern""" """Decorator für Wiederholungsversuche bei Fehlern"""
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
for attempt in range(Config.MAX_RETRIES): for attempt in range(Config.MAX_RETRIES):
@@ -36,17 +34,14 @@ def retry_on_failure(func):
time.sleep(Config.RETRY_DELAY) time.sleep(Config.RETRY_DELAY)
return None return None
return wrapper return wrapper
def debug_print(message):
def debug_print(message):
"""Debug-Ausgabe, wenn Config.DEBUG=True""" """Debug-Ausgabe, wenn Config.DEBUG=True"""
if Config.DEBUG: if Config.DEBUG:
print(f"[DEBUG] {message}") print(f"[DEBUG] {message}")
def clean_text(text):
def clean_text(text):
"""Bereinigt Text von HTML-Entitäten und überflüssigen Whitespaces""" """Bereinigt Text von HTML-Entitäten und überflüssigen Whitespaces"""
if not text: if not text:
return "k.A." return "k.A."
# Konvertierung und Säuberung # Konvertierung und Säuberung
text = str(text) text = str(text)
text = re.sub(r'\[.*?\]', '', text) # Entferne eckige Klammern mit Inhalt text = re.sub(r'\[.*?\]', '', text) # Entferne eckige Klammern mit Inhalt
@@ -54,16 +49,13 @@ def clean_text(text):
text = re.sub(r'<.*?>', '', text) # Entferne HTML-Tags text = re.sub(r'<.*?>', '', text) # Entferne HTML-Tags
text = re.sub(r'\s+', ' ', text).strip() text = re.sub(r'\s+', ' ', text).strip()
return text if text else "k.A." return text if text else "k.A."
# ==================== GOOGLE SHEET HANDLER ====================
# ==================== GOOGLE SHEET HANDLER ====================
class GoogleSheetHandler: class GoogleSheetHandler:
"""Klasse zur Handhabung der Google Sheets Interaktion""" """Klasse zur Handhabung der Google Sheets Interaktion"""
def __init__(self): def __init__(self):
self.sheet = None self.sheet = None
self.sheet_values = [] self.sheet_values = []
self._connect() self._connect()
def _connect(self): def _connect(self):
"""Stellt Verbindung zum Google Sheet her""" """Stellt Verbindung zum Google Sheet her"""
scope = ["https://www.googleapis.com/auth/spreadsheets"] scope = ["https://www.googleapis.com/auth/spreadsheets"]
@@ -72,7 +64,6 @@ class GoogleSheetHandler:
) )
self.sheet = gspread.authorize(creds).open_by_url(Config.SHEET_URL).sheet1 self.sheet = gspread.authorize(creds).open_by_url(Config.SHEET_URL).sheet1
self.sheet_values = self.sheet.get_all_values() self.sheet_values = self.sheet.get_all_values()
def get_start_index(self): def get_start_index(self):
"""Ermittelt die erste leere Zeile in Spalte N (Index 13)""" """Ermittelt die erste leere Zeile in Spalte N (Index 13)"""
filled_n = [row[13] if len(row) > 13 else '' for row in self.sheet_values[1:]] filled_n = [row[13] if len(row) > 13 else '' for row in self.sheet_values[1:]]
@@ -80,22 +71,18 @@ class GoogleSheetHandler:
(i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()), (i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()),
len(filled_n) + 1 len(filled_n) + 1
) )
def update_row(self, row_num, values): def update_row(self, row_num, values):
"""Aktualisiert eine Zeile im Sheet""" """Aktualisiert eine Zeile im Sheet"""
self.sheet.update( self.sheet.update(
range_name=f"G{row_num}:Q{row_num}", range_name=f"G{row_num}:Q{row_num}",
values=[values] values=[values]
) )
# ==================== WIKIPEDIA SCRAPER ====================
# ==================== WIKIPEDIA SCRAPER ==================== class_=lambda c: c and any(
class_=lambda c: c and any(
kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen'] kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']
) ))
if not infobox: if not infobox:
return "k.A." return "k.A."
keywords = { keywords = {
'branche': [ 'branche': [
'branche', 'industrie', 'tätigkeitsfeld', 'geschäftsfeld', 'branche', 'industrie', 'tätigkeitsfeld', 'geschäftsfeld',
@@ -106,7 +93,6 @@ class_=lambda c: c and any(
'konzernumsatz', 'umsatzentwicklung', 'ergebnis' 'konzernumsatz', 'umsatzentwicklung', 'ergebnis'
] ]
}.get(target, []) }.get(target, [])
# Durchsuche alle Zeilen und Zellen # Durchsuche alle Zeilen und Zellen
value = "k.A." value = "k.A."
for row in infobox.find_all('tr'): for row in infobox.find_all('tr'):
@@ -114,7 +100,6 @@ class_=lambda c: c and any(
header_cells = row.find_all(['th', 'td'], attrs={'colspan': False}) header_cells = row.find_all(['th', 'td'], attrs={'colspan': False})
for header in header_cells: for header in header_cells:
header_text = clean_text(header.get_text()).lower() header_text = clean_text(header.get_text()).lower()
if any(kw in header_text for kw in keywords): if any(kw in header_text for kw in keywords):
# Hole nächste Zelle, ignoriere verschachtelte Tabellen # Hole nächste Zelle, ignoriere verschachtelte Tabellen
value_cell = header.find_next_sibling(['td', 'th']) value_cell = header.find_next_sibling(['td', 'th'])
@@ -125,7 +110,6 @@ class_=lambda c: c and any(
value = ', '.join(clean_text(li.get_text()) for li in list_items) value = ', '.join(clean_text(li.get_text()) for li in list_items)
else: else:
value = clean_text(value_cell.get_text()) value = clean_text(value_cell.get_text())
# Extrahiere numerische Umsatzwerte mit Regex # Extrahiere numerische Umsatzwerte mit Regex
if target == 'umsatz': if target == 'umsatz':
match = re.search( match = re.search(
@@ -135,13 +119,10 @@ class_=lambda c: c and any(
if match: if match:
value = match.group(1).replace('.', '').replace(',', '.') value = match.group(1).replace('.', '').replace(',', '.')
return value return value
return "k.A." return "k.A."
class WikipediaScraper:
class WikipediaScraper:
def __init__(self): def __init__(self):
wikipedia.set_lang(Config.LANG) wikipedia.set_lang(Config.LANG)
def _extract_domain_hint(self, website): def _extract_domain_hint(self, website):
"""Extrahiert den Domain-Schlüssel aus der Website-URL""" """Extrahiert den Domain-Schlüssel aus der Website-URL"""
if not website: if not website:
@@ -150,66 +131,52 @@ class WikipediaScraper:
clean_url = website.lower().replace("https://", "").replace("http://", "").replace("www.", "") clean_url = website.lower().replace("https://", "").replace("http://", "").replace("www.", "")
domain_parts = clean_url.split(".") domain_parts = clean_url.split(".")
return domain_parts[0] if domain_parts else "" return domain_parts[0] if domain_parts else ""
def _generate_search_terms(self, company_name, website_hint=""): def _generate_search_terms(self, company_name, website_hint=""):
"""Generiert Suchbegriffe aus Firmenname und Website""" """Generiert Suchbegriffe aus Firmenname und Website"""
search_terms = [company_name.strip()] search_terms = [company_name.strip()]
# Bereinigung von Rechtsformen und Sonderzeichen # Bereinigung von Rechtsformen und Sonderzeichen
clean_name = re.sub( clean_name = re.sub(
r'\s+(?:GmbH|AG|KG|OHG|e\.V\.|mbH|& Co\. KG| GmbH & Co\. KG).*$', r'\s+(?:GmbH|AG|KG|OHG|e\.V\.|mbH|& Co\. KG| GmbH & Co\. KG).*$',
'', '',
company_name company_name
).strip() ).strip()
# Füge bereinigten Namen hinzu, wenn unterschiedlich # Füge bereinigten Namen hinzu, wenn unterschiedlich
if clean_name and clean_name != company_name: if clean_name and clean_name != company_name:
search_terms.append(clean_name) search_terms.append(clean_name)
# Extrahiere erste zwei relevante Wörter # Extrahiere erste zwei relevante Wörter
name_words = [w for w in re.split(r'\W+', clean_name) if w] name_words = [w for w in re.split(r'\W+', clean_name) if w]
if len(name_words) >= 2: if len(name_words) >= 2:
search_terms.append(" ".join(name_words[:2])) search_terms.append(" ".join(name_words[:2]))
# Domain-Hint hinzufügen # Domain-Hint hinzufügen
domain_hint = self._extract_domain_hint(website_hint) domain_hint = self._extract_domain_hint(website_hint)
if domain_hint and domain_hint not in ["de", "com", "org", "net"]: if domain_hint and domain_hint not in ["de", "com", "org", "net"]:
search_terms.append(domain_hint) search_terms.append(domain_hint)
debug_print(f"Generierte Suchbegriffe: {search_terms}") debug_print(f"Generierte Suchbegriffe: {search_terms}")
return list(set(search_terms)) # Duplikate entfernen return list(set(search_terms)) # Duplikate entfernen
def _validate_article(self, page, company_name, domain_hint=""): def _validate_article(self, page, company_name, domain_hint=""):
"""Überprüft ob der Artikel zum Unternehmen passt""" """Überprüft ob der Artikel zum Unternehmen passt"""
# Normalisiere beide Namen # Normalisiere beide Namen
page_title = re.sub(r'\(.*?\)', '', page.title).strip().lower() page_title = re.sub(r'\(.*?\)', '', page.title).strip().lower()
search_name = re.sub(r'[^a-zA-Z0-9äöüß ]', '', company_name).strip().lower() search_name = re.sub(r'[^a-zA-Z0-9äöüß ]', '', company_name).strip().lower()
# Ähnlichkeitsprüfung # Ähnlichkeitsprüfung
similarity = SequenceMatcher(None, page_title, search_name).ratio() similarity = SequenceMatcher(None, page_title, search_name).ratio()
debug_print(f"Ähnlichkeit '{page_title}' vs '{search_name}': {similarity:.2f}") debug_print(f"Ähnlichkeit '{page_title}' vs '{search_name}': {similarity:.2f}")
# Zusätzliche Domain-Prüfung # Zusätzliche Domain-Prüfung
if domain_hint: if domain_hint:
html_content = requests.get(page.url).text.lower() html_content = requests.get(page.url).text.lower()
if domain_hint not in html_content: if domain_hint not in html_content:
debug_print(f"Domain-Hint '{domain_hint}' nicht im Artikel gefunden") debug_print(f"Domain-Hint '{domain_hint}' nicht im Artikel gefunden")
return False return False
return similarity >= Config.SIMILARITY_THRESHOLD return similarity >= Config.SIMILARITY_THRESHOLD
@retry_on_failure @retry_on_failure
def search_company_article(self, company_name, website_hint=""): def search_company_article(self, company_name, website_hint=""):
"""Hauptfunktion zur Artikelsuche""" """Hauptfunktion zur Artikelsuche"""
search_terms = self._generate_search_terms(company_name, website_hint) search_terms = self._generate_search_terms(company_name, website_hint)
domain_hint = self._extract_domain_hint(website_hint) domain_hint = self._extract_domain_hint(website_hint)
for term in search_terms: for term in search_terms:
try: try:
results = wikipedia.search(term, results=Config.WIKIPEDIA_SEARCH_RESULTS) results = wikipedia.search(term, results=Config.WIKIPEDIA_SEARCH_RESULTS)
debug_print(f"Suchergebnisse für '{term}': {results}") debug_print(f"Suchergebnisse für '{term}': {results}")
for title in results: for title in results:
try: try:
page = wikipedia.page(title, auto_suggest=False) page = wikipedia.page(title, auto_suggest=False)
@@ -221,63 +188,49 @@ class WikipediaScraper:
debug_print(f"Fehler bei Suche nach {term}: {str(e)}") debug_print(f"Fehler bei Suche nach {term}: {str(e)}")
continue continue
return None return None
def extract_company_data(self, page_url): def extract_company_data(self, page_url):
"""Extrahiert Branche und Umsatz aus dem Wikipedia-Artikel""" """Extrahiert Branche und Umsatz aus dem Wikipedia-Artikel"""
response = requests.get(page_url) response = requests.get(page_url)
soup = BeautifulSoup(response.text, Config.HTML_PARSER) soup = BeautifulSoup(response.text, Config.HTML_PARSER)
return { return {
'branche': self._extract_infobox_value(soup, 'branche'), 'branche': self._extract_infobox_value(soup, 'branche'),
'umsatz': self._extract_infobox_value(soup, 'umsatz'), 'umsatz': self._extract_infobox_value(soup, 'umsatz'),
'url': page_url 'url': page_url
} }
# ==================== WIKIPEDIA SCRAPER ==================== # ==================== WIKIPEDIA SCRAPER ====================
class WikipediaScraper: class WikipediaScraper:
def _extract_infobox_value(self, soup, target): def _extract_infobox_value(self, soup, target):
"""Extrahiert spezifischen Wert aus der Infobox mit erweiterten Suchmustern""" """Extrahiert spezifischen Wert aus der Infobox mit erweiterten Suchmustern"""
# Erweiterte Infobox-Erkennung # Erweiterte Infobox-Erkennung
infobox = soup.find('table', infobox = soup.find('table',
# ==================== DATA PROCESSOR ====================
# ==================== DATA PROCESSOR ====================
class DataProcessor: class DataProcessor:
"""Klasse zur Steuerung des Gesamtprozesses""" """Klasse zur Steuerung des Gesamtprozesses"""
def __init__(self): def __init__(self):
self.sheet_handler = GoogleSheetHandler() self.sheet_handler = GoogleSheetHandler()
self.wiki_scraper = WikipediaScraper() self.wiki_scraper = WikipediaScraper()
def process_rows(self, num_rows): def process_rows(self, num_rows):
"""Verarbeitet die angegebene Anzahl an Zeilen""" """Verarbeitet die angegebene Anzahl an Zeilen"""
start_index = self.sheet_handler.get_start_index() start_index = self.sheet_handler.get_start_index()
print(f"Starte bei Zeile {start_index+1}") print(f"Starte bei Zeile {start_index+1}")
for i in range(start_index, min(start_index + num_rows, len(self.sheet_handler.sheet_values))): for i in range(start_index, min(start_index + num_rows, len(self.sheet_handler.sheet_values))):
row = self.sheet_handler.sheet_values[i] row = self.sheet_handler.sheet_values[i]
self._process_single_row(i+1, row) self._process_single_row(i+1, row)
def _process_single_row(self, row_num, row_data): def _process_single_row(self, row_num, row_data):
"""Verarbeitet eine einzelne Zeile""" """Verarbeitet eine einzelne Zeile"""
company_name = row_data[0] if len(row_data) > 0 else "" company_name = row_data[0] if len(row_data) > 0 else ""
website = row_data[1] if len(row_data) > 1 else "" website = row_data[1] if len(row_data) > 1 else ""
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {row_num}: {company_name}") print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {row_num}: {company_name}")
# Schritt 1: Wikipedia-Artikel finden # Schritt 1: Wikipedia-Artikel finden
article = self.wiki_scraper.search_company_article(company_name, website) article = self.wiki_scraper.search_company_article(company_name, website)
# Schritt 2: Daten extrahieren # Schritt 2: Daten extrahieren
if article: if article:
company_data = self.wiki_scraper.extract_company_data(article.url) company_data = self.wiki_scraper.extract_company_data(article.url)
else: else:
company_data = {'branche': 'k.A.', 'umsatz': 'k.A.', 'url': ''} company_data = {'branche': 'k.A.', 'umsatz': 'k.A.', 'url': ''}
# Aktualisiere Daten im Sheet # Aktualisiere Daten im Sheet
self._update_sheet(row_num, company_data) self._update_sheet(row_num, company_data)
time.sleep(Config.RETRY_DELAY) time.sleep(Config.RETRY_DELAY)
def _update_sheet(self, row_num, data): def _update_sheet(self, row_num, data):
"""Aktualisiert die Zeile mit den neuen Daten""" """Aktualisiert die Zeile mit den neuen Daten"""
current_values = self.sheet_handler.sheet.row_values(row_num) current_values = self.sheet_handler.sheet.row_values(row_num)
@@ -293,9 +246,8 @@ class DataProcessor:
] ]
self.sheet_handler.update_row(row_num, new_values) self.sheet_handler.update_row(row_num, new_values)
print(f"✅ Aktualisiert: Branche: {new_values[0]}, Umsatz: {new_values[2]}, URL: {new_values[6]}") print(f"✅ Aktualisiert: Branche: {new_values[0]}, Umsatz: {new_values[2]}, URL: {new_values[6]}")
# ==================== MAIN EXECUTION ====================
# ==================== MAIN EXECUTION ==================== if __name__ == "__main__":
if __name__ == "__main__":
num_rows = int(input("Wieviele Zeilen sollen überprüft werden? ")) num_rows = int(input("Wieviele Zeilen sollen überprüft werden? "))
processor = DataProcessor() processor = DataProcessor()
processor.process_rows(num_rows) processor.process_rows(num_rows)