Umsatz-Extraktion:
Der Zahlenstring wird robust normalisiert, indem Tausendertrennzeichen (Punkte, Leerzeichen) entfernt und das Komma als Dezimaltrenner berücksichtigt wird.
Falls keine Einheit ("mio" oder "mrd") angegeben ist, wird der Wert als Euro angenommen und durch 1.000.000 geteilt.
Mitarbeiter-Extraktion:
Die Regex wurde erweitert, um gezielt nach dem Begriff "mitarbeiterzahl" (oder "mitarbeiter") zu suchen und alle Ziffern (einschließlich Tausendertrennzeichen) zu extrahieren, sodass z. B. aus "4.175" der Wert "4175" wird.
Re-Evaluierungsmodus:
Im Modus "2" werden alle Zeilen mit "x" in Spalte A verarbeitet, und der komplette Infobox-Inhalt wird in der Konsole ausgegeben.
Spaltenanpassungen:
Normalmodus: Firmenname in Spalte A, Website in Spalte B; Ausgabe in G:K, Datum in N, Version in Q.
Re‑Evaluierungsmodus: Firmenname in Spalte B, Website in Spalte C; Ausgabe in H:L, Datum in O, Version in R.
346 lines
17 KiB
Python
346 lines
17 KiB
Python
import os
|
||
import time
|
||
import re
|
||
import gspread
|
||
import wikipedia
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from oauth2client.service_account import ServiceAccountCredentials
|
||
from datetime import datetime
|
||
from difflib import SequenceMatcher
|
||
import csv
|
||
|
||
# ==================== KONFIGURATION ====================
|
||
class Config:
|
||
VERSION = "1.1.10" # Neue Version mit verbesserter Umsatz- und Mitarbeiterextraktion
|
||
LANG = "de"
|
||
CREDENTIALS_FILE = "service_account.json"
|
||
SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo"
|
||
MAX_RETRIES = 3
|
||
RETRY_DELAY = 5
|
||
LOG_CSV = "gpt_antworten_log.csv"
|
||
SIMILARITY_THRESHOLD = 0.65
|
||
DEBUG = True
|
||
WIKIPEDIA_SEARCH_RESULTS = 5
|
||
HTML_PARSER = "html.parser"
|
||
|
||
# ==================== HELPER FUNCTIONS ====================
|
||
def retry_on_failure(func):
|
||
def wrapper(*args, **kwargs):
|
||
for attempt in range(Config.MAX_RETRIES):
|
||
try:
|
||
return func(*args, **kwargs)
|
||
except Exception as e:
|
||
print(f"⚠️ Fehler bei {func.__name__} (Versuch {attempt+1}): {str(e)[:100]}")
|
||
time.sleep(Config.RETRY_DELAY)
|
||
return None
|
||
return wrapper
|
||
|
||
def debug_print(message):
|
||
if Config.DEBUG:
|
||
print(f"[DEBUG] {message}")
|
||
|
||
def clean_text(text):
|
||
if not text:
|
||
return "k.A."
|
||
text = str(text)
|
||
text = re.sub(r'\[\d+\]', '', text)
|
||
text = re.sub(r'\s+', ' ', text).strip()
|
||
return text if text else "k.A."
|
||
|
||
def normalize_company_name(name):
|
||
if not name:
|
||
return ""
|
||
forms = [
|
||
r'gmbh', r'g\.m\.b\.h\.', r'ug', r'u\.g\.', r'ug \(haftungsbeschränkt\)',
|
||
r'u\.g\. \(haftungsbeschränkt\)', r'ag', r'a\.g\.', r'ohg', r'o\.h\.g\.',
|
||
r'kg', r'k\.g\.', r'gmbh & co\.?\s*kg', r'g\.m\.b\.h\. & co\.?\s*k\.g\.',
|
||
r'ag & co\.?\s*kg', r'a\.g\. & co\.?\s*k\.g\.', r'e\.k\.', r'e\.kfm\.',
|
||
r'e\.kfr\.', r'ltd\.', r'ltd & co\.?\s*kg', r's\.a r\.l\.', r'stiftung',
|
||
r'genossenschaft', r'ggmbh', r'gug', r'partg', r'partgmbb', r'kgaa', r'se',
|
||
r'og', r'o\.g\.', r'e\.u\.', r'ges\.n\.b\.r\.', r'genmbh', r'verein',
|
||
r'kollektivgesellschaft', r'kommanditgesellschaft', r'einzelfirma', r'sàrl',
|
||
r'sa', r'sagl', r'gmbh & co\.?\s*ohg', r'ag & co\.?\s*ohg', r'gmbh & co\.?\s*kgaa',
|
||
r'ag & co\.?\s*kgaa', r's\.a\.', r's\.p\.a\.', r'b\.v\.', r'n\.v\.'
|
||
]
|
||
pattern = r'\b(' + '|'.join(forms) + r')\b'
|
||
normalized = re.sub(pattern, '', name, flags=re.IGNORECASE)
|
||
normalized = re.sub(r'[\-–]', ' ', normalized)
|
||
normalized = re.sub(r'\s+', ' ', normalized).strip()
|
||
return normalized.lower()
|
||
|
||
# ==================== GOOGLE SHEET HANDLER ====================
|
||
class GoogleSheetHandler:
|
||
def __init__(self):
|
||
self.sheet = None
|
||
self.sheet_values = []
|
||
self._connect()
|
||
def _connect(self):
|
||
scope = ["https://www.googleapis.com/auth/spreadsheets"]
|
||
creds = ServiceAccountCredentials.from_json_keyfile_name(Config.CREDENTIALS_FILE, scope)
|
||
self.sheet = gspread.authorize(creds).open_by_url(Config.SHEET_URL).sheet1
|
||
self.sheet_values = self.sheet.get_all_values()
|
||
def get_start_index(self):
|
||
filled_n = [row[13] if len(row) > 13 else '' for row in self.sheet_values[1:]]
|
||
return next((i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()), len(filled_n) + 1)
|
||
# Update-Aufrufe erfolgen separat.
|
||
|
||
# ==================== WIKIPEDIA SCRAPER ====================
|
||
class WikipediaScraper:
|
||
def __init__(self):
|
||
wikipedia.set_lang(Config.LANG)
|
||
def _get_full_domain(self, website):
|
||
if not website:
|
||
return ""
|
||
website = website.lower().strip()
|
||
website = re.sub(r'^https?:\/\/', '', website)
|
||
website = re.sub(r'^www\.', '', website)
|
||
return website.split('/')[0]
|
||
def _generate_search_terms(self, company_name, website):
|
||
terms = []
|
||
full_domain = self._get_full_domain(website)
|
||
if full_domain:
|
||
terms.append(full_domain)
|
||
normalized_name = normalize_company_name(company_name)
|
||
candidate = " ".join(normalized_name.split()[:2]).strip()
|
||
if candidate and candidate not in terms:
|
||
terms.append(candidate)
|
||
if normalized_name and normalized_name not in terms:
|
||
terms.append(normalized_name)
|
||
debug_print(f"Generierte Suchbegriffe: {terms}")
|
||
return terms
|
||
def _validate_article(self, page, company_name, website):
|
||
full_domain = self._get_full_domain(website)
|
||
domain_found = False
|
||
if full_domain:
|
||
try:
|
||
html_raw = requests.get(page.url).text
|
||
soup = BeautifulSoup(html_raw, Config.HTML_PARSER)
|
||
infobox = soup.find('table', class_=lambda c: c and 'infobox' in c.lower())
|
||
if infobox:
|
||
links = infobox.find_all('a', href=True)
|
||
for link in links:
|
||
href = link.get('href').lower()
|
||
if href.startswith('/wiki/datei:'):
|
||
continue
|
||
if full_domain in href:
|
||
debug_print(f"Definitiver Link-Match in Infobox gefunden: {href}")
|
||
domain_found = True
|
||
break
|
||
if not domain_found and hasattr(page, 'externallinks'):
|
||
for ext_link in page.externallinks:
|
||
if full_domain in ext_link.lower():
|
||
debug_print(f"Definitiver Link-Match in externen Links gefunden: {ext_link}")
|
||
domain_found = True
|
||
break
|
||
except Exception as e:
|
||
debug_print(f"Fehler beim Extrahieren von Links: {str(e)}")
|
||
normalized_title = normalize_company_name(page.title)
|
||
normalized_company = normalize_company_name(company_name)
|
||
similarity = SequenceMatcher(None, normalized_title, normalized_company).ratio()
|
||
debug_print(f"Ähnlichkeit (normalisiert): {similarity:.2f} ({normalized_title} vs {normalized_company})")
|
||
threshold = 0.60 if domain_found else Config.SIMILARITY_THRESHOLD
|
||
return similarity >= threshold
|
||
def extract_first_paragraph(self, page_url):
|
||
try:
|
||
response = requests.get(page_url)
|
||
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
|
||
paragraphs = soup.find_all('p')
|
||
for p in paragraphs:
|
||
text = clean_text(p.get_text())
|
||
if len(text) > 50:
|
||
return text
|
||
return "k.A."
|
||
except Exception as e:
|
||
debug_print(f"Fehler beim Extrahieren des ersten Absatzes: {e}")
|
||
return "k.A."
|
||
def _extract_infobox_value(self, soup, target):
|
||
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
|
||
if not infobox:
|
||
return "k.A."
|
||
keywords_map = {
|
||
'branche': ['branche', 'industrie', 'tätigkeit', 'geschäftsfeld', 'sektor', 'produkte', 'leistungen', 'aktivitäten', 'wirtschaftszweig'],
|
||
'umsatz': ['umsatz', 'jahresumsatz', 'konzernumsatz', 'gesamtumsatz', 'erlöse', 'umsatzerlöse', 'einnahmen', 'ergebnis', 'jahresergebnis'],
|
||
'mitarbeiter': ['mitarbeiter', 'beschäftigte', 'personal', 'mitarbeiterzahl', 'angestellte', 'belegschaft', 'personalstärke']
|
||
}
|
||
keywords = keywords_map.get(target, [])
|
||
for row in infobox.find_all('tr'):
|
||
header = row.find('th')
|
||
if header:
|
||
header_text = clean_text(header.get_text()).lower()
|
||
if any(kw in header_text for kw in keywords):
|
||
value = row.find('td')
|
||
if value:
|
||
raw_value = clean_text(value.get_text())
|
||
if target == 'branche':
|
||
clean_val = re.sub(r'\[.*?\]|\(.*?\)', '', raw_value)
|
||
return ' '.join(clean_val.split()).strip()
|
||
if target == 'umsatz':
|
||
raw = raw_value.lower()
|
||
match = re.search(r'([\d.,]+)', raw)
|
||
if match:
|
||
num_str = match.group(1)
|
||
if ',' in num_str:
|
||
num_str = num_str.replace('.', '').replace(',', '.')
|
||
else:
|
||
num_str = num_str.replace(' ', '').replace('.', '')
|
||
try:
|
||
num = float(num_str)
|
||
except Exception as e:
|
||
debug_print(f"Umsatz-Umwandlungsfehler: {e} für {num_str}")
|
||
return raw_value.strip()
|
||
if 'mrd' in raw or 'milliarden' in raw:
|
||
num *= 1000
|
||
elif 'mio' in raw or 'millionen' in raw:
|
||
pass
|
||
else:
|
||
num /= 1e6
|
||
return str(int(round(num)))
|
||
return raw_value.strip()
|
||
if target == 'mitarbeiter':
|
||
raw = raw_value.lower()
|
||
# Suche nach "mitarbeiterzahl" oder "mitarbeiter" gefolgt von Zahlen (inkl. Tausendertrennzeichen)
|
||
match = re.search(r'(?:mitarbeiterzahl|mitarbeiter)[^\d]*([\d.,\s]+)', raw)
|
||
if match:
|
||
num_str = match.group(1)
|
||
num_str = num_str.replace(" ", "")
|
||
# Entferne alle Zeichen, die keine Ziffern sind
|
||
num_str = re.sub(r'[^\d]', '', num_str)
|
||
if num_str:
|
||
debug_print(f"Mitarbeiterzahl gefunden: {num_str} in Text: {raw_value}")
|
||
return num_str
|
||
# Fallback: Kombiniere alle gefundenen Zahlen
|
||
numbers = re.findall(r'\d+', raw)
|
||
if numbers:
|
||
return "".join(numbers)
|
||
return raw_value.strip()
|
||
return "k.A."
|
||
def extract_full_infobox(self, soup):
|
||
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
|
||
if not infobox:
|
||
return "k.A."
|
||
return clean_text(infobox.get_text(separator=' | '))
|
||
def extract_fields_from_infobox_text(self, infobox_text, field_names):
|
||
result = {}
|
||
tokens = [token.strip() for token in infobox_text.split("|") if token.strip()]
|
||
for i, token in enumerate(tokens):
|
||
for field in field_names:
|
||
if token.lower() == field.lower():
|
||
j = i + 1
|
||
while j < len(tokens) and not tokens[j]:
|
||
j += 1
|
||
result[field] = tokens[j] if j < len(tokens) else "k.A."
|
||
return result
|
||
def extract_company_data(self, page_url):
|
||
if not page_url:
|
||
return {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'full_infobox': 'k.A.'}
|
||
try:
|
||
response = requests.get(page_url)
|
||
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
|
||
full_infobox = self.extract_full_infobox(soup)
|
||
extracted_fields = self.extract_fields_from_infobox_text(full_infobox, ['Branche', 'Umsatz', 'Mitarbeiter'])
|
||
branche_val = extracted_fields.get('Branche', self._extract_infobox_value(soup, 'branche'))
|
||
umsatz_val = extracted_fields.get('Umsatz', self._extract_infobox_value(soup, 'umsatz'))
|
||
mitarbeiter_val = extracted_fields.get('Mitarbeiter', self._extract_infobox_value(soup, 'mitarbeiter'))
|
||
first_paragraph = self.extract_first_paragraph(page_url)
|
||
return {
|
||
'url': page_url,
|
||
'first_paragraph': first_paragraph,
|
||
'branche': branche_val,
|
||
'umsatz': umsatz_val,
|
||
'mitarbeiter': mitarbeiter_val,
|
||
'full_infobox': full_infobox
|
||
}
|
||
except Exception as e:
|
||
debug_print(f"Extraktionsfehler: {str(e)}")
|
||
return {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'full_infobox': 'k.A.'}
|
||
@retry_on_failure
|
||
def search_company_article(self, company_name, website):
|
||
search_terms = self._generate_search_terms(company_name, website)
|
||
for term in search_terms:
|
||
try:
|
||
results = wikipedia.search(term, results=Config.WIKIPEDIA_SEARCH_RESULTS)
|
||
debug_print(f"Suchergebnisse für '{term}': {results}")
|
||
for title in results:
|
||
try:
|
||
page = wikipedia.page(title, auto_suggest=False)
|
||
if self._validate_article(page, company_name, website):
|
||
return page
|
||
except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e:
|
||
debug_print(f"Seitenfehler: {str(e)}")
|
||
continue
|
||
except Exception as e:
|
||
debug_print(f"Suchfehler: {str(e)}")
|
||
continue
|
||
return None
|
||
|
||
# ==================== DATA PROCESSOR ====================
|
||
class DataProcessor:
|
||
def __init__(self):
|
||
self.sheet_handler = GoogleSheetHandler()
|
||
self.wiki_scraper = WikipediaScraper()
|
||
def process_rows(self, num_rows=None):
|
||
if MODE == "2":
|
||
print("Re-Evaluierungsmodus: Verarbeitung aller Zeilen mit 'x' in Spalte A.")
|
||
else:
|
||
start_index = self.sheet_handler.get_start_index()
|
||
print(f"Starte bei Zeile {start_index+1}")
|
||
for i, row in enumerate(self.sheet_handler.sheet_values[1:], start=2):
|
||
if MODE == "2":
|
||
if row[0].strip().lower() == "x":
|
||
self._process_single_row(i, row)
|
||
else:
|
||
if i >= self.sheet_handler.get_start_index():
|
||
self._process_single_row(i, row)
|
||
def _process_single_row(self, row_num, row_data):
|
||
if MODE == "2":
|
||
company_name = row_data[1] if len(row_data) > 1 else ""
|
||
website = row_data[2] if len(row_data) > 2 else ""
|
||
update_range = f"H{row_num}:L{row_num}"
|
||
dt_range = f"O{row_num}"
|
||
ver_range = f"R{row_num}"
|
||
else:
|
||
company_name = row_data[0] if len(row_data) > 0 else ""
|
||
website = row_data[1] if len(row_data) > 1 else ""
|
||
update_range = f"G{row_num}:K{row_num}"
|
||
dt_range = f"N{row_num}"
|
||
ver_range = f"Q{row_num}"
|
||
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {row_num}: {company_name}")
|
||
article = self.wiki_scraper.search_company_article(company_name, website)
|
||
if article:
|
||
company_data = self.wiki_scraper.extract_company_data(article.url)
|
||
else:
|
||
company_data = {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'full_infobox': 'k.A.'}
|
||
self.sheet_handler.sheet.update(values=[[
|
||
company_data.get('url', 'k.A.'),
|
||
company_data.get('first_paragraph', 'k.A.'),
|
||
company_data.get('branche', 'k.A.'),
|
||
company_data.get('umsatz', 'k.A.'),
|
||
company_data.get('mitarbeiter', 'k.A.')
|
||
]], range_name=update_range)
|
||
current_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
self.sheet_handler.sheet.update(values=[[current_dt]], range_name=dt_range)
|
||
self.sheet_handler.sheet.update(values=[[Config.VERSION]], range_name=ver_range)
|
||
print(f"✅ Aktualisiert: URL: {company_data.get('url', 'k.A.')}, Erster Absatz: {company_data.get('first_paragraph', 'k.A.')[:30]}..., Branche: {company_data.get('branche', 'k.A.')}, Umsatz: {company_data.get('umsatz', 'k.A.')}, Mitarbeiter: {company_data.get('mitarbeiter', 'k.A.')}")
|
||
if MODE == "2":
|
||
print("----- Vollständiger Infobox-Inhalt -----")
|
||
print(company_data.get("full_infobox", "k.A."))
|
||
print("----------------------------------------")
|
||
time.sleep(Config.RETRY_DELAY)
|
||
|
||
# ==================== MAIN ====================
|
||
if __name__ == "__main__":
|
||
mode_input = input("Wählen Sie den Modus: 1 für normalen Modus, 2 für Re-Evaluierungsmodus: ").strip()
|
||
MODE = "2" if mode_input == "2" else "1"
|
||
if MODE == "1":
|
||
try:
|
||
num_rows = int(input("Wieviele Zeilen sollen überprüft werden? "))
|
||
except Exception as e:
|
||
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
|
||
exit(1)
|
||
else:
|
||
num_rows = None # Im Re-Evaluierungsmodus werden alle markierten Zeilen verarbeitet.
|
||
processor = DataProcessor()
|
||
processor.process_rows(num_rows)
|
||
print("\n✅ Wikipedia-Auswertung abgeschlossen")
|