diff --git a/brancheneinstufung.py b/brancheneinstufung.py index 054d4729..b4576a7b 100644 --- a/brancheneinstufung.py +++ b/brancheneinstufung.py @@ -7,13 +7,12 @@ import requests import openai import csv from bs4 import BeautifulSoup -from lxml import html as lh from oauth2client.service_account import ServiceAccountCredentials from datetime import datetime from difflib import SequenceMatcher # === KONFIGURATION === -VERSION = "1.1.0" +VERSION = "1.0.12" LANG = "de" CREDENTIALS = "service_account.json" SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo" @@ -22,6 +21,7 @@ MAX_RETRIES = 3 RETRY_DELAY = 5 LOG_CSV = "gpt_antworten_log.csv" SIMILARITY_THRESHOLD = 0.6 +DEBUG = True # Debug-Modus für ausführliche Informationen # === OpenAI API-KEY LADEN === with open("api_key.txt", "r") as f: @@ -41,98 +41,214 @@ print(f"Starte bei Zeile {start+1}") wikipedia.set_lang(LANG) def similar(a, b): - return SequenceMatcher(None, a.lower(), b.lower()).ratio() + if not a or not b: + return 0 + return SequenceMatcher(None, str(a).lower().strip(), str(b).lower().strip()).ratio() def clean_text(text): """Bereinigt Text von HTML-Entitäten und überflüssigen Whitespaces""" if not text: return "k.A." + # Text in Unicode umwandeln + if hasattr(text, 'encode'): + text = text.encode('utf-8', 'ignore').decode('utf-8') + # Text von BeautifulSoup zu String konvertieren + if hasattr(text, 'get_text'): + text = text.get_text() # Entfernen von HTML-Tags und Klammern mit Inhalt - text = re.sub(r'\[.*?\]', '', text) + text = re.sub(r'\[.*?\]', '', str(text)) text = re.sub(r'\(.*?\)', '', text) # Entfernen von überflüssigen Whitespaces text = re.sub(r'\s+', ' ', text).strip() return text if text else "k.A." -def extract_infobox_data(soup): - """Extrahiert Daten aus der Wikipedia-Infobox mit BeautifulSoup""" +def debug_print(message): + """Debug-Ausgabe, wenn DEBUG=True""" + if DEBUG: + print(f"[DEBUG] {message}") + +def extract_infobox_data(soup, page_url): + """Extrahiert Daten aus der Wikipedia-Infobox mit verschiedenen Methoden""" branche = "k.A." umsatz = "k.A." - # Suche nach der Infobox (table mit class=infobox) - infobox = soup.find('table', class_='infobox') + debug_print(f"Suche Infobox in {page_url}") + + # Suche nach der Infobox (verschiedene mögliche Klassen) + infobox = soup.find('table', class_=lambda c: c and ('infobox' in c.lower() or 'vcard' in c.lower())) + if not infobox: + debug_print("Keine Infobox gefunden") return branche, umsatz - # Durchsuche alle Zeilen der Infobox + debug_print("Infobox gefunden, suche nach Branche und Umsatz") + + # Für detaillierte Debug-Ausgabe + if DEBUG: + headers = [clean_text(th.get_text()) for th in infobox.find_all('th') if th] + debug_print(f"Gefundene Headers in Infobox: {headers}") + + # Branchen-Keywords (auch zusammengesetzte Begriffe) + branche_keywords = [ + 'branche', 'tätigkeitsfeld', 'geschäftsfeld', 'sektor', + 'branche/sektor', 'sektor/branche', 'geschäftsbereich', + 'geschäftszweig', 'wirtschaftszweig', 'aktivität', + 'tätigkeitsbereich', 'industriezweig' + ] + + # Umsatz-Keywords + umsatz_keywords = [ + 'umsatz', 'umsatzerlös', 'umsatzerlöse', 'jahresumsatz', + 'konzernumsatz', 'umsatz in', 'umsatz (', 'umsätze' + ] + + # Methode 1: Direkte Suche in den Zeilen der Infobox rows = infobox.find_all('tr') for row in rows: - # Überprüfe, ob die Zeile einen Header (th) enthält header = row.find('th') if not header: continue - header_text = header.get_text().lower() + header_text = clean_text(header.get_text()).lower() # Suche nach Branche - if any(term in header_text for term in ['branche', 'tätigkeitsfeld', 'geschäftsfeld', 'sektor']): + if any(keyword in header_text for keyword in branche_keywords): value_cell = row.find('td') if value_cell: - branche = clean_text(value_cell.get_text()) + branche_text = clean_text(value_cell) + if branche_text != "k.A.": + branche = branche_text + debug_print(f"Branche gefunden (Methode 1): {branche}") # Suche nach Umsatz - elif 'umsatz' in header_text: + elif any(keyword in header_text for keyword in umsatz_keywords): value_cell = row.find('td') if value_cell: - umsatz_text = value_cell.get_text() - # Versuche, den Umsatz zu extrahieren (z.B. "123,4 Mio. €") - umsatz = clean_text(umsatz_text) + umsatz_text = clean_text(value_cell) + if umsatz_text != "k.A.": + umsatz = umsatz_text + debug_print(f"Umsatz gefunden (Methode 1): {umsatz}") + + # Methode 2: Volltext-Suche nach spezifischen Patterns + if branche == "k.A." or umsatz == "k.A.": + debug_print("Verwende Methode 2: Volltext-Suche nach Mustern") + infobox_text = clean_text(infobox.get_text()) + + if branche == "k.A.": + for keyword in branche_keywords: + pattern = rf'{keyword}[:\s]+([^\.]*?)[\.|\n]' + matches = re.search(pattern, infobox_text, re.IGNORECASE) + if matches: + branche = clean_text(matches.group(1)) + debug_print(f"Branche gefunden (Methode 2): {branche}") + break + + if umsatz == "k.A.": + for keyword in umsatz_keywords: + pattern = rf'{keyword}[:\s]+([^\.]*?)[\.|\n]' + matches = re.search(pattern, infobox_text, re.IGNORECASE) + if matches: + umsatz = clean_text(matches.group(1)) + debug_print(f"Umsatz gefunden (Methode 2): {umsatz}") + break + + # Methode 3: Suche in meta-Tags + if branche == "k.A.": + meta_keywords = soup.find('meta', {'name': 'keywords'}) + if meta_keywords and meta_keywords.get('content'): + keywords = meta_keywords.get('content').split(',') + for keyword in keywords: + if any(bk in keyword.lower() for bk in ['industrie', 'branche', 'sektor']): + branche = clean_text(keyword) + debug_print(f"Branche gefunden (Methode 3): {branche}") + break + + # Zusätzliche Nachbearbeitung für Umsatz + if umsatz != "k.A.": + # Versuche, einen numerischen Wert + Währung zu extrahieren + currency_pattern = r'(\d[\d\.,]*\s*(?:€|EUR|Euro|Mio\.?\s*€|Mrd\.?\s*€|Millionen|Milliarden))' + matches = re.search(currency_pattern, umsatz, re.IGNORECASE) + if matches: + umsatz = matches.group(1) + debug_print(f"Umsatz bereinigt: {umsatz}") return branche, umsatz def get_wikipedia_data(name, website_hint=""): - begriffe = [name.strip(), " ".join(name.split()[:2])] + firmenname = name.strip() + begriffe = [firmenname] + + # Füge die ersten zwei Wörter hinzu (oft der Kernname) + name_parts = firmenname.split() + if len(name_parts) > 1: + begriffe.append(" ".join(name_parts[:2])) + + # Behandle GmbH, AG, etc. + clean_name = re.sub(r'\s+(?:GmbH|AG|KG|OHG|e\.V\.|mbH).*$', '', firmenname) + if clean_name != firmenname: + begriffe.append(clean_name) + + # Extrahiere Domain-Schlüssel aus Website domain_key = "" if website_hint: - parts = website_hint.replace("https://", "").replace("http://", "").split(".") + parts = website_hint.replace("https://", "").replace("http://", "").replace("www.", "").split(".") if len(parts) > 1: domain_key = parts[0] - if domain_key not in ["www", "de", "com"]: # Ignoriere generische Domains + if domain_key and domain_key not in ["de", "com", "org", "net"]: begriffe.append(domain_key) - + + debug_print(f"Suchbegriffe: {begriffe}") + for suchbegriff in begriffe: try: - results = wikipedia.search(suchbegriff, results=5) + debug_print(f"Suche nach: '{suchbegriff}'") + results = wikipedia.search(suchbegriff, results=8) + debug_print(f"Wikipedia-Ergebnisse: {results}") + for title in results: try: + debug_print(f"Prüfe Wikipedia-Artikel: {title}") page = wikipedia.page(title, auto_suggest=False) # Prüfe Ähnlichkeit des Titels mit dem gesuchten Namen - if similar(page.title, name) < SIMILARITY_THRESHOLD: - continue + title_similarity = similar(page.title, name) + debug_print(f"Titel-Ähnlichkeit: {title_similarity}") - # Hole HTML-Content und überprüfe Domain-Schlüssel + if title_similarity < SIMILARITY_THRESHOLD: + # Prüfe auch Ähnlichkeit mit bereinigtem Namen + clean_title_similarity = similar(page.title, clean_name) + debug_print(f"Bereinigte Titel-Ähnlichkeit: {clean_title_similarity}") + + if clean_title_similarity < SIMILARITY_THRESHOLD: + debug_print("Titel nicht ähnlich genug, überspringe") + continue + + # Hole HTML-Content response = requests.get(page.url) html_content = response.text + + # Prüfe, ob Domain-Schlüssel im Content enthalten ist if domain_key and domain_key.lower() not in html_content.lower(): + debug_print(f"Domain-Schlüssel '{domain_key}' nicht im Content gefunden, überspringe") continue # Parse HTML mit BeautifulSoup soup = BeautifulSoup(html_content, 'html.parser') # Extrahiere Branche und Umsatz aus der Infobox - branche, umsatz = extract_infobox_data(soup) + branche, umsatz = extract_infobox_data(soup, page.url) print(f"Gefunden: {page.title} - Branche: {branche}, Umsatz: {umsatz}") return page.url, branche, umsatz - except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError): + except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e: + debug_print(f"Wikipedia-Fehler bei {title}: {str(e)}") continue except Exception as e: - print(f"Fehler bei {title}: {str(e)}") + debug_print(f"Allgemeiner Fehler bei {title}: {str(e)}") continue except Exception as e: - print(f"Fehler bei Suche nach {suchbegriff}: {str(e)}") + debug_print(f"Fehler bei Suche nach {suchbegriff}: {str(e)}") continue return "", "k.A.", "k.A." @@ -140,21 +256,43 @@ def get_wikipedia_data(name, website_hint=""): # === VERARBEITUNG === for i in range(start, min(start + DURCHLÄUFE, len(sheet_values))): row = sheet_values[i] - print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {i+1}: {row[0]}") + firmenname = row[0] if len(row) > 0 else "" + print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {i+1}: {firmenname}") # Fehlersichere Abrufung von Website website = row[1] if len(row) > 1 else "" + # Bestehende Wikipedia-URL prüfen und ggf. wiederverwenden + existing_url = row[12] if len(row) > 12 else "" + + url = "" + branche = "k.A." + umsatz = "k.A." + # Mehrere Versuche beim Abrufen der Wikipedia-Daten for attempt in range(MAX_RETRIES): try: - url, branche, umsatz = get_wikipedia_data(row[0], website) + # Wenn bereits eine URL existiert, versuche sie erneut zu verwenden + if existing_url and "wikipedia.org" in existing_url: + debug_print(f"Verwende bestehende URL: {existing_url}") + response = requests.get(existing_url) + if response.status_code == 200: + soup = BeautifulSoup(response.text, 'html.parser') + branche, umsatz = extract_infobox_data(soup, existing_url) + url = existing_url + print(f"Daten aus bestehender URL extrahiert - Branche: {branche}, Umsatz: {umsatz}") + + # Wenn keine Daten gefunden wurden, suche neu + if url == "" or branche == "k.A." or umsatz == "k.A.": + url, branche, umsatz = get_wikipedia_data(firmenname, website) + break except Exception as e: print(f"⚠️ Fehler bei Wikipedia-Abruf (Versuch {attempt+1}): {str(e)[:100]}") time.sleep(RETRY_DELAY) if attempt == MAX_RETRIES - 1: - url, branche, umsatz = "", "k.A.", "k.A." + url = existing_url if existing_url else "" + branche, umsatz = "k.A.", "k.A." # Hole aktuelle Werte aus dem Sheet, um sie nur zu ändern, wenn wir neue Daten haben current_values = sheet.row_values(i+1) @@ -183,13 +321,6 @@ print("\n✅ Wikipedia-Auswertung abgeschlossen") - - - - - - - # === SCHRITT 2: GPT-BEWERTUNG === def classify_company(row, wikipedia_url=""): user_prompt = {