Files
Brancheneinstufung2/brancheneinstufung.py
Floke be0eaf2562 Extraktion von Umsatz und Mitarbeiterzahl optimiert – Umsatz in Mio € und Mitarbeiterzahl als ganze
Umsatz-Extraktion:

Die Regex erfasst nun den Zahlenstring (z. B. „10,0“) und entfernt Tausendertrennzeichen (Punkte, Leerzeichen).

Wird im Text "mrd" gefunden, wird der Wert mit 1000 multipliziert, ansonsten bleibt er (bei "mio") unverändert.

Fehlt eine Einheit, wird der Wert als in Euro angegeben angenommen und durch 1.000.000 geteilt, sodass der Umsatz in Mio € resultiert.

Mitarbeiter-Extraktion:

Spezifische Suche mittels Regex nach dem Schlüsselwort "mitarbeiterzahl" (oder "mitarbeiter") gefolgt von einem Zahlenformat (z. B. "4.175").

Falls der gefundene Zahlenstring einen Punkt als Tausendertrennzeichen enthält (und kein Komma), wird dieser entfernt, sodass "4.175" zu "4175" wird.

Re-Evaluierungsmodus:

Alle Zeilen mit einem "x" in Spalte A werden verarbeitet, und der komplette Infobox-Inhalt wird in der Konsole ausgegeben.

Spaltenanpassungen:

Normalmodus: Firmenname in Spalte A, Website in Spalte B; Ausgabe in G:K, Datum in N, Version in Q.

Re‑Evaluierungsmodus: Firmenname in Spalte B, Website in Spalte C; Ausgabe in H:L, Datum in O, Version in R.
2025-04-01 04:03:57 +00:00

354 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
import re
import gspread
import wikipedia
import requests
from bs4 import BeautifulSoup
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime
from difflib import SequenceMatcher
import csv
# ==================== KONFIGURATION ====================
class Config:
VERSION = "1.1.11" # Neue Version mit verbesserter Umsatz- und Mitarbeiterextraktion
LANG = "de"
CREDENTIALS_FILE = "service_account.json"
SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo"
MAX_RETRIES = 3
RETRY_DELAY = 5
LOG_CSV = "gpt_antworten_log.csv"
SIMILARITY_THRESHOLD = 0.65
DEBUG = True
WIKIPEDIA_SEARCH_RESULTS = 5
HTML_PARSER = "html.parser"
# ==================== HELPER FUNCTIONS ====================
def retry_on_failure(func):
def wrapper(*args, **kwargs):
for attempt in range(Config.MAX_RETRIES):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"⚠️ Fehler bei {func.__name__} (Versuch {attempt+1}): {str(e)[:100]}")
time.sleep(Config.RETRY_DELAY)
return None
return wrapper
def debug_print(message):
if Config.DEBUG:
print(f"[DEBUG] {message}")
def clean_text(text):
if not text:
return "k.A."
text = str(text)
text = re.sub(r'\[\d+\]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text if text else "k.A."
def normalize_company_name(name):
if not name:
return ""
forms = [
r'gmbh', r'g\.m\.b\.h\.', r'ug', r'u\.g\.', r'ug \(haftungsbeschränkt\)',
r'u\.g\. \(haftungsbeschränkt\)', r'ag', r'a\.g\.', r'ohg', r'o\.h\.g\.',
r'kg', r'k\.g\.', r'gmbh & co\.?\s*kg', r'g\.m\.b\.h\. & co\.?\s*k\.g\.',
r'ag & co\.?\s*kg', r'a\.g\. & co\.?\s*k\.g\.', r'e\.k\.', r'e\.kfm\.',
r'e\.kfr\.', r'ltd\.', r'ltd & co\.?\s*kg', r's\.a r\.l\.', r'stiftung',
r'genossenschaft', r'ggmbh', r'gug', r'partg', r'partgmbb', r'kgaa', r'se',
r'og', r'o\.g\.', r'e\.u\.', r'ges\.n\.b\.r\.', r'genmbh', r'verein',
r'kollektivgesellschaft', r'kommanditgesellschaft', r'einzelfirma', r'sàrl',
r'sa', r'sagl', r'gmbh & co\.?\s*ohg', r'ag & co\.?\s*ohg', r'gmbh & co\.?\s*kgaa',
r'ag & co\.?\s*kgaa', r's\.a\.', r's\.p\.a\.', r'b\.v\.', r'n\.v\.'
]
pattern = r'\b(' + '|'.join(forms) + r')\b'
normalized = re.sub(pattern, '', name, flags=re.IGNORECASE)
normalized = re.sub(r'[\-]', ' ', normalized)
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized.lower()
# ==================== GOOGLE SHEET HANDLER ====================
class GoogleSheetHandler:
def __init__(self):
self.sheet = None
self.sheet_values = []
self._connect()
def _connect(self):
scope = ["https://www.googleapis.com/auth/spreadsheets"]
creds = ServiceAccountCredentials.from_json_keyfile_name(Config.CREDENTIALS_FILE, scope)
self.sheet = gspread.authorize(creds).open_by_url(Config.SHEET_URL).sheet1
self.sheet_values = self.sheet.get_all_values()
def get_start_index(self):
filled_n = [row[13] if len(row) > 13 else '' for row in self.sheet_values[1:]]
return next((i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()), len(filled_n) + 1)
# Update-Aufrufe erfolgen separat.
# ==================== WIKIPEDIA SCRAPER ====================
class WikipediaScraper:
def __init__(self):
wikipedia.set_lang(Config.LANG)
def _get_full_domain(self, website):
if not website:
return ""
website = website.lower().strip()
website = re.sub(r'^https?:\/\/', '', website)
website = re.sub(r'^www\.', '', website)
return website.split('/')[0]
def _generate_search_terms(self, company_name, website):
terms = []
full_domain = self._get_full_domain(website)
if full_domain:
terms.append(full_domain)
normalized_name = normalize_company_name(company_name)
candidate = " ".join(normalized_name.split()[:2]).strip()
if candidate and candidate not in terms:
terms.append(candidate)
if normalized_name and normalized_name not in terms:
terms.append(normalized_name)
debug_print(f"Generierte Suchbegriffe: {terms}")
return terms
def _validate_article(self, page, company_name, website):
full_domain = self._get_full_domain(website)
domain_found = False
if full_domain:
try:
html_raw = requests.get(page.url).text
soup = BeautifulSoup(html_raw, Config.HTML_PARSER)
infobox = soup.find('table', class_=lambda c: c and 'infobox' in c.lower())
if infobox:
links = infobox.find_all('a', href=True)
for link in links:
href = link.get('href').lower()
if href.startswith('/wiki/datei:'):
continue
if full_domain in href:
debug_print(f"Definitiver Link-Match in Infobox gefunden: {href}")
domain_found = True
break
if not domain_found and hasattr(page, 'externallinks'):
for ext_link in page.externallinks:
if full_domain in ext_link.lower():
debug_print(f"Definitiver Link-Match in externen Links gefunden: {ext_link}")
domain_found = True
break
except Exception as e:
debug_print(f"Fehler beim Extrahieren von Links: {str(e)}")
normalized_title = normalize_company_name(page.title)
normalized_company = normalize_company_name(company_name)
similarity = SequenceMatcher(None, normalized_title, normalized_company).ratio()
debug_print(f"Ähnlichkeit (normalisiert): {similarity:.2f} ({normalized_title} vs {normalized_company})")
threshold = 0.60 if domain_found else Config.SIMILARITY_THRESHOLD
return similarity >= threshold
def extract_first_paragraph(self, page_url):
try:
response = requests.get(page_url)
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
paragraphs = soup.find_all('p')
for p in paragraphs:
text = clean_text(p.get_text())
if len(text) > 50:
return text
return "k.A."
except Exception as e:
debug_print(f"Fehler beim Extrahieren des ersten Absatzes: {e}")
return "k.A."
def _extract_infobox_value(self, soup, target):
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
if not infobox:
return "k.A."
keywords_map = {
'branche': ['branche', 'industrie', 'tätigkeit', 'geschäftsfeld', 'sektor', 'produkte', 'leistungen', 'aktivitäten', 'wirtschaftszweig'],
'umsatz': ['umsatz', 'jahresumsatz', 'konzernumsatz', 'gesamtumsatz', 'erlöse', 'umsatzerlöse', 'einnahmen', 'ergebnis', 'jahresergebnis'],
'mitarbeiter': ['mitarbeiter', 'beschäftigte', 'personal', 'mitarbeiterzahl', 'angestellte', 'belegschaft', 'personalstärke']
}
keywords = keywords_map.get(target, [])
for row in infobox.find_all('tr'):
header = row.find('th')
if header:
header_text = clean_text(header.get_text()).lower()
if any(kw in header_text for kw in keywords):
value = row.find('td')
if value:
raw_value = clean_text(value.get_text())
if target == 'branche':
clean_val = re.sub(r'\[.*?\]|\(.*?\)', '', raw_value)
return ' '.join(clean_val.split()).strip()
if target == 'umsatz':
raw = raw_value.lower()
match = re.search(r'([\d.,]+)', raw)
if match:
num_str = match.group(1)
# Falls Komma vorhanden: Punkte als Tausendertrennzeichen entfernen, Komma als Dezimaltrenner ersetzen
if ',' in num_str:
num_str = num_str.replace('.', '').replace(',', '.')
else:
num_str = num_str.replace(' ', '').replace('.', '')
try:
num = float(num_str)
except Exception as e:
debug_print(f"Umsatz-Umwandlungsfehler: {e} für {num_str}")
return raw_value.strip()
if 'mrd' in raw or 'milliarden' in raw:
num *= 1000
elif 'mio' in raw or 'millionen' in raw:
pass
else:
num /= 1e6
return str(int(round(num)))
return raw_value.strip()
if target == 'mitarbeiter':
raw = raw_value.lower()
# Spezifisch nach "mitarbeiterzahl" suchen, um den direkt folgenden Zahlenwert zu extrahieren.
match = re.search(r'(?:mitarbeiterzahl|mitarbeiter)[^\d]*([\d.,\s]+)', raw)
if match:
num_str = match.group(1)
# Entferne Tausendertrennzeichen (Punkte oder Leerzeichen)
num_str = num_str.replace(" ", "")
# Falls es nur Punkte und keine Kommas gibt, behandeln wir diese als Tausendertrennzeichen.
if '.' in num_str and ',' not in num_str:
num_str = num_str.replace('.', '')
# Entferne eventuell verbleibende nicht-numerische Zeichen.
num_str = re.sub(r'[^\d]', '', num_str)
if num_str:
debug_print(f"Mitarbeiterzahl gefunden: {num_str} in Text: {raw_value}")
return num_str
# Fallback: Falls keine spezifische Suche funktioniert, versuche die erste Zahl zu extrahieren.
match = re.search(r'(\d{1,3}(?:\.\d{3})+|\d+)', raw)
if match:
num_str = match.group(1)
if '.' in num_str and ',' not in num_str:
num_str = num_str.replace('.', '')
return num_str
return raw_value.strip()
return "k.A."
def extract_full_infobox(self, soup):
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
if not infobox:
return "k.A."
return clean_text(infobox.get_text(separator=' | '))
def extract_fields_from_infobox_text(self, infobox_text, field_names):
result = {}
tokens = [token.strip() for token in infobox_text.split("|") if token.strip()]
for i, token in enumerate(tokens):
for field in field_names:
if token.lower() == field.lower():
j = i + 1
while j < len(tokens) and not tokens[j]:
j += 1
result[field] = tokens[j] if j < len(tokens) else "k.A."
return result
def extract_company_data(self, page_url):
if not page_url:
return {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'full_infobox': 'k.A.'}
try:
response = requests.get(page_url)
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
full_infobox = self.extract_full_infobox(soup)
extracted_fields = self.extract_fields_from_infobox_text(full_infobox, ['Branche', 'Umsatz', 'Mitarbeiter'])
branche_val = extracted_fields.get('Branche', self._extract_infobox_value(soup, 'branche'))
umsatz_val = extracted_fields.get('Umsatz', self._extract_infobox_value(soup, 'umsatz'))
mitarbeiter_val = extracted_fields.get('Mitarbeiter', self._extract_infobox_value(soup, 'mitarbeiter'))
first_paragraph = self.extract_first_paragraph(page_url)
return {
'url': page_url,
'first_paragraph': first_paragraph,
'branche': branche_val,
'umsatz': umsatz_val,
'mitarbeiter': mitarbeiter_val,
'full_infobox': full_infobox
}
except Exception as e:
debug_print(f"Extraktionsfehler: {str(e)}")
return {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'full_infobox': 'k.A.'}
@retry_on_failure
def search_company_article(self, company_name, website):
search_terms = self._generate_search_terms(company_name, website)
for term in search_terms:
try:
results = wikipedia.search(term, results=Config.WIKIPEDIA_SEARCH_RESULTS)
debug_print(f"Suchergebnisse für '{term}': {results}")
for title in results:
try:
page = wikipedia.page(title, auto_suggest=False)
if self._validate_article(page, company_name, website):
return page
except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e:
debug_print(f"Seitenfehler: {str(e)}")
continue
except Exception as e:
debug_print(f"Suchfehler: {str(e)}")
continue
return None
# ==================== DATA PROCESSOR ====================
class DataProcessor:
def __init__(self):
self.sheet_handler = GoogleSheetHandler()
self.wiki_scraper = WikipediaScraper()
def process_rows(self, num_rows=None):
if MODE == "2":
print("Re-Evaluierungsmodus: Verarbeitung aller Zeilen mit 'x' in Spalte A.")
else:
start_index = self.sheet_handler.get_start_index()
print(f"Starte bei Zeile {start_index+1}")
for i, row in enumerate(self.sheet_handler.sheet_values[1:], start=2):
if MODE == "2":
if row[0].strip().lower() == "x":
self._process_single_row(i, row)
else:
if i >= self.sheet_handler.get_start_index():
self._process_single_row(i, row)
def _process_single_row(self, row_num, row_data):
if MODE == "2":
company_name = row_data[1] if len(row_data) > 1 else ""
website = row_data[2] if len(row_data) > 2 else ""
update_range = f"H{row_num}:L{row_num}"
dt_range = f"O{row_num}"
ver_range = f"R{row_num}"
else:
company_name = row_data[0] if len(row_data) > 0 else ""
website = row_data[1] if len(row_data) > 1 else ""
update_range = f"G{row_num}:K{row_num}"
dt_range = f"N{row_num}"
ver_range = f"Q{row_num}"
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {row_num}: {company_name}")
article = self.wiki_scraper.search_company_article(company_name, website)
if article:
company_data = self.wiki_scraper.extract_company_data(article.url)
else:
company_data = {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'full_infobox': 'k.A.'}
self.sheet_handler.sheet.update(values=[[
company_data.get('url', 'k.A.'),
company_data.get('first_paragraph', 'k.A.'),
company_data.get('branche', 'k.A.'),
company_data.get('umsatz', 'k.A.'),
company_data.get('mitarbeiter', 'k.A.')
]], range_name=update_range)
current_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.sheet_handler.sheet.update(values=[[current_dt]], range_name=dt_range)
self.sheet_handler.sheet.update(values=[[Config.VERSION]], range_name=ver_range)
print(f"✅ Aktualisiert: URL: {company_data.get('url', 'k.A.')}, Erster Absatz: {company_data.get('first_paragraph', 'k.A.')[:30]}..., Branche: {company_data.get('branche', 'k.A.')}, Umsatz: {company_data.get('umsatz', 'k.A.')}, Mitarbeiter: {company_data.get('mitarbeiter', 'k.A.')}")
if MODE == "2":
print("----- Vollständiger Infobox-Inhalt -----")
print(company_data.get("full_infobox", "k.A."))
print("----------------------------------------")
time.sleep(Config.RETRY_DELAY)
# ==================== MAIN ====================
if __name__ == "__main__":
mode_input = input("Wählen Sie den Modus: 1 für normalen Modus, 2 für Re-Evaluierungsmodus: ").strip()
MODE = "2" if mode_input == "2" else "1"
if MODE == "1":
try:
num_rows = int(input("Wieviele Zeilen sollen überprüft werden? "))
except Exception as e:
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
exit(1)
else:
num_rows = None # Im Re-Evaluierungsmodus werden alle markierten Zeilen verarbeitet.
processor = DataProcessor()
processor.process_rows(num_rows)
print("\n✅ Wikipedia-Auswertung abgeschlossen")