Hauptverbesserungen im überarbeiteten Code Deutlich robustere Infobox-Erkennung: Erweiterte Suche nach Infobox-Tabellen mit mehreren möglichen Klassen Berücksichtigung verschiedener Schreibweisen und Varianten für "Branche" und "Umsatz" Drei-Methoden-Ansatz zur Datenextraktion: Methode 1: Direkte Suche in den Tabellenzeilen der Infobox Methode 2: Volltext-Suche nach spezifischen Mustern mit regulären Ausdrücken Methode 3: Suche in meta-Tags für zusätzliche Kontextinformationen Intelligentere Firmennamen-Verarbeitung: Entfernung von Rechtsformen (GmbH, AG, etc.) für bessere Suchtreffer Extraktion von Kernname für alternative Suche Wiederverwendung bestehender URLs: Der Code prüft jetzt zuerst eine bestehende Wikipedia-URL, bevor er eine neue Suche startet Reduziert unnötige Suchanfragen und verbessert die Konsistenz Detaillierter Debug-Modus: Ausführliches Logging für eine bessere Nachvollziehbarkeit Anzeige von gefundenen Headers in der Infobox für Diagnose-Zwecke Verbesserte Umsatzextaktion: Reguläre Ausdrücke für Währungs- und Zahlenformate Berücksichtigung verschiedener Formate (€, EUR, Mio., Mrd., etc.) Deutlich bessere Datenbereinigung: Umfangreichere Textbereinigung von HTML-Entitäten Sicherer Umgang mit unterschiedlichen Datentypen Diese Änderungen sollten die Probleme bei der Heimbach-Gruppe und ähnlichen Unternehmen beheben, bei denen die Daten trotz gefundenem Wikipedia-Artikel nicht korrekt extrahiert wurden. Der DEBUG-Modus hilft zusätzlich dabei, die genauen Vorgänge nachzuvollziehen und bei zukünftigen Problemen gezielter zu diagnostizieren.
392 lines
16 KiB
Python
392 lines
16 KiB
Python
import os
|
||
import time
|
||
import re
|
||
import gspread
|
||
import wikipedia
|
||
import requests
|
||
import openai
|
||
import csv
|
||
from bs4 import BeautifulSoup
|
||
from oauth2client.service_account import ServiceAccountCredentials
|
||
from datetime import datetime
|
||
from difflib import SequenceMatcher
|
||
|
||
# === KONFIGURATION ===
|
||
VERSION = "1.0.12"
|
||
LANG = "de"
|
||
CREDENTIALS = "service_account.json"
|
||
SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo"
|
||
DURCHLÄUFE = int(input("Wieviele Zeilen sollen überprüft werden? "))
|
||
MAX_RETRIES = 3
|
||
RETRY_DELAY = 5
|
||
LOG_CSV = "gpt_antworten_log.csv"
|
||
SIMILARITY_THRESHOLD = 0.6
|
||
DEBUG = True # Debug-Modus für ausführliche Informationen
|
||
|
||
# === OpenAI API-KEY LADEN ===
|
||
with open("api_key.txt", "r") as f:
|
||
openai.api_key = f.read().strip()
|
||
|
||
# === GOOGLE SHEET VERBINDUNG ===
|
||
scope = ["https://www.googleapis.com/auth/spreadsheets"]
|
||
creds = ServiceAccountCredentials.from_json_keyfile_name(CREDENTIALS, scope)
|
||
sheet = gspread.authorize(creds).open_by_url(SHEET_URL).sheet1
|
||
sheet_values = sheet.get_all_values()
|
||
|
||
# === STARTINDEX SUCHEN (Spalte N = Index 13) ===
|
||
filled_n = [row[13] if len(row) > 13 else '' for row in sheet_values[1:]]
|
||
start = next((i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()), len(filled_n) + 1)
|
||
print(f"Starte bei Zeile {start+1}")
|
||
|
||
wikipedia.set_lang(LANG)
|
||
|
||
def similar(a, b):
|
||
if not a or not b:
|
||
return 0
|
||
return SequenceMatcher(None, str(a).lower().strip(), str(b).lower().strip()).ratio()
|
||
|
||
def clean_text(text):
|
||
"""Bereinigt Text von HTML-Entitäten und überflüssigen Whitespaces"""
|
||
if not text:
|
||
return "k.A."
|
||
# Text in Unicode umwandeln
|
||
if hasattr(text, 'encode'):
|
||
text = text.encode('utf-8', 'ignore').decode('utf-8')
|
||
# Text von BeautifulSoup zu String konvertieren
|
||
if hasattr(text, 'get_text'):
|
||
text = text.get_text()
|
||
# Entfernen von HTML-Tags und Klammern mit Inhalt
|
||
text = re.sub(r'\[.*?\]', '', str(text))
|
||
text = re.sub(r'\(.*?\)', '', text)
|
||
# Entfernen von überflüssigen Whitespaces
|
||
text = re.sub(r'\s+', ' ', text).strip()
|
||
return text if text else "k.A."
|
||
|
||
def debug_print(message):
|
||
"""Debug-Ausgabe, wenn DEBUG=True"""
|
||
if DEBUG:
|
||
print(f"[DEBUG] {message}")
|
||
|
||
def extract_infobox_data(soup, page_url):
|
||
"""Extrahiert Daten aus der Wikipedia-Infobox mit verschiedenen Methoden"""
|
||
branche = "k.A."
|
||
umsatz = "k.A."
|
||
|
||
debug_print(f"Suche Infobox in {page_url}")
|
||
|
||
# Suche nach der Infobox (verschiedene mögliche Klassen)
|
||
infobox = soup.find('table', class_=lambda c: c and ('infobox' in c.lower() or 'vcard' in c.lower()))
|
||
|
||
if not infobox:
|
||
debug_print("Keine Infobox gefunden")
|
||
return branche, umsatz
|
||
|
||
debug_print("Infobox gefunden, suche nach Branche und Umsatz")
|
||
|
||
# Für detaillierte Debug-Ausgabe
|
||
if DEBUG:
|
||
headers = [clean_text(th.get_text()) for th in infobox.find_all('th') if th]
|
||
debug_print(f"Gefundene Headers in Infobox: {headers}")
|
||
|
||
# Branchen-Keywords (auch zusammengesetzte Begriffe)
|
||
branche_keywords = [
|
||
'branche', 'tätigkeitsfeld', 'geschäftsfeld', 'sektor',
|
||
'branche/sektor', 'sektor/branche', 'geschäftsbereich',
|
||
'geschäftszweig', 'wirtschaftszweig', 'aktivität',
|
||
'tätigkeitsbereich', 'industriezweig'
|
||
]
|
||
|
||
# Umsatz-Keywords
|
||
umsatz_keywords = [
|
||
'umsatz', 'umsatzerlös', 'umsatzerlöse', 'jahresumsatz',
|
||
'konzernumsatz', 'umsatz in', 'umsatz (', 'umsätze'
|
||
]
|
||
|
||
# Methode 1: Direkte Suche in den Zeilen der Infobox
|
||
rows = infobox.find_all('tr')
|
||
for row in rows:
|
||
header = row.find('th')
|
||
if not header:
|
||
continue
|
||
|
||
header_text = clean_text(header.get_text()).lower()
|
||
|
||
# Suche nach Branche
|
||
if any(keyword in header_text for keyword in branche_keywords):
|
||
value_cell = row.find('td')
|
||
if value_cell:
|
||
branche_text = clean_text(value_cell)
|
||
if branche_text != "k.A.":
|
||
branche = branche_text
|
||
debug_print(f"Branche gefunden (Methode 1): {branche}")
|
||
|
||
# Suche nach Umsatz
|
||
elif any(keyword in header_text for keyword in umsatz_keywords):
|
||
value_cell = row.find('td')
|
||
if value_cell:
|
||
umsatz_text = clean_text(value_cell)
|
||
if umsatz_text != "k.A.":
|
||
umsatz = umsatz_text
|
||
debug_print(f"Umsatz gefunden (Methode 1): {umsatz}")
|
||
|
||
# Methode 2: Volltext-Suche nach spezifischen Patterns
|
||
if branche == "k.A." or umsatz == "k.A.":
|
||
debug_print("Verwende Methode 2: Volltext-Suche nach Mustern")
|
||
infobox_text = clean_text(infobox.get_text())
|
||
|
||
if branche == "k.A.":
|
||
for keyword in branche_keywords:
|
||
pattern = rf'{keyword}[:\s]+([^\.]*?)[\.|\n]'
|
||
matches = re.search(pattern, infobox_text, re.IGNORECASE)
|
||
if matches:
|
||
branche = clean_text(matches.group(1))
|
||
debug_print(f"Branche gefunden (Methode 2): {branche}")
|
||
break
|
||
|
||
if umsatz == "k.A.":
|
||
for keyword in umsatz_keywords:
|
||
pattern = rf'{keyword}[:\s]+([^\.]*?)[\.|\n]'
|
||
matches = re.search(pattern, infobox_text, re.IGNORECASE)
|
||
if matches:
|
||
umsatz = clean_text(matches.group(1))
|
||
debug_print(f"Umsatz gefunden (Methode 2): {umsatz}")
|
||
break
|
||
|
||
# Methode 3: Suche in meta-Tags
|
||
if branche == "k.A.":
|
||
meta_keywords = soup.find('meta', {'name': 'keywords'})
|
||
if meta_keywords and meta_keywords.get('content'):
|
||
keywords = meta_keywords.get('content').split(',')
|
||
for keyword in keywords:
|
||
if any(bk in keyword.lower() for bk in ['industrie', 'branche', 'sektor']):
|
||
branche = clean_text(keyword)
|
||
debug_print(f"Branche gefunden (Methode 3): {branche}")
|
||
break
|
||
|
||
# Zusätzliche Nachbearbeitung für Umsatz
|
||
if umsatz != "k.A.":
|
||
# Versuche, einen numerischen Wert + Währung zu extrahieren
|
||
currency_pattern = r'(\d[\d\.,]*\s*(?:€|EUR|Euro|Mio\.?\s*€|Mrd\.?\s*€|Millionen|Milliarden))'
|
||
matches = re.search(currency_pattern, umsatz, re.IGNORECASE)
|
||
if matches:
|
||
umsatz = matches.group(1)
|
||
debug_print(f"Umsatz bereinigt: {umsatz}")
|
||
|
||
return branche, umsatz
|
||
|
||
def get_wikipedia_data(name, website_hint=""):
|
||
firmenname = name.strip()
|
||
begriffe = [firmenname]
|
||
|
||
# Füge die ersten zwei Wörter hinzu (oft der Kernname)
|
||
name_parts = firmenname.split()
|
||
if len(name_parts) > 1:
|
||
begriffe.append(" ".join(name_parts[:2]))
|
||
|
||
# Behandle GmbH, AG, etc.
|
||
clean_name = re.sub(r'\s+(?:GmbH|AG|KG|OHG|e\.V\.|mbH).*$', '', firmenname)
|
||
if clean_name != firmenname:
|
||
begriffe.append(clean_name)
|
||
|
||
# Extrahiere Domain-Schlüssel aus Website
|
||
domain_key = ""
|
||
if website_hint:
|
||
parts = website_hint.replace("https://", "").replace("http://", "").replace("www.", "").split(".")
|
||
if len(parts) > 1:
|
||
domain_key = parts[0]
|
||
if domain_key and domain_key not in ["de", "com", "org", "net"]:
|
||
begriffe.append(domain_key)
|
||
|
||
debug_print(f"Suchbegriffe: {begriffe}")
|
||
|
||
for suchbegriff in begriffe:
|
||
try:
|
||
debug_print(f"Suche nach: '{suchbegriff}'")
|
||
results = wikipedia.search(suchbegriff, results=8)
|
||
debug_print(f"Wikipedia-Ergebnisse: {results}")
|
||
|
||
for title in results:
|
||
try:
|
||
debug_print(f"Prüfe Wikipedia-Artikel: {title}")
|
||
page = wikipedia.page(title, auto_suggest=False)
|
||
|
||
# Prüfe Ähnlichkeit des Titels mit dem gesuchten Namen
|
||
title_similarity = similar(page.title, name)
|
||
debug_print(f"Titel-Ähnlichkeit: {title_similarity}")
|
||
|
||
if title_similarity < SIMILARITY_THRESHOLD:
|
||
# Prüfe auch Ähnlichkeit mit bereinigtem Namen
|
||
clean_title_similarity = similar(page.title, clean_name)
|
||
debug_print(f"Bereinigte Titel-Ähnlichkeit: {clean_title_similarity}")
|
||
|
||
if clean_title_similarity < SIMILARITY_THRESHOLD:
|
||
debug_print("Titel nicht ähnlich genug, überspringe")
|
||
continue
|
||
|
||
# Hole HTML-Content
|
||
response = requests.get(page.url)
|
||
html_content = response.text
|
||
|
||
# Prüfe, ob Domain-Schlüssel im Content enthalten ist
|
||
if domain_key and domain_key.lower() not in html_content.lower():
|
||
debug_print(f"Domain-Schlüssel '{domain_key}' nicht im Content gefunden, überspringe")
|
||
continue
|
||
|
||
# Parse HTML mit BeautifulSoup
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
# Extrahiere Branche und Umsatz aus der Infobox
|
||
branche, umsatz = extract_infobox_data(soup, page.url)
|
||
|
||
print(f"Gefunden: {page.title} - Branche: {branche}, Umsatz: {umsatz}")
|
||
return page.url, branche, umsatz
|
||
|
||
except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e:
|
||
debug_print(f"Wikipedia-Fehler bei {title}: {str(e)}")
|
||
continue
|
||
except Exception as e:
|
||
debug_print(f"Allgemeiner Fehler bei {title}: {str(e)}")
|
||
continue
|
||
except Exception as e:
|
||
debug_print(f"Fehler bei Suche nach {suchbegriff}: {str(e)}")
|
||
continue
|
||
|
||
return "", "k.A.", "k.A."
|
||
|
||
# === VERARBEITUNG ===
|
||
for i in range(start, min(start + DURCHLÄUFE, len(sheet_values))):
|
||
row = sheet_values[i]
|
||
firmenname = row[0] if len(row) > 0 else ""
|
||
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {i+1}: {firmenname}")
|
||
|
||
# Fehlersichere Abrufung von Website
|
||
website = row[1] if len(row) > 1 else ""
|
||
|
||
# Bestehende Wikipedia-URL prüfen und ggf. wiederverwenden
|
||
existing_url = row[12] if len(row) > 12 else ""
|
||
|
||
url = ""
|
||
branche = "k.A."
|
||
umsatz = "k.A."
|
||
|
||
# Mehrere Versuche beim Abrufen der Wikipedia-Daten
|
||
for attempt in range(MAX_RETRIES):
|
||
try:
|
||
# Wenn bereits eine URL existiert, versuche sie erneut zu verwenden
|
||
if existing_url and "wikipedia.org" in existing_url:
|
||
debug_print(f"Verwende bestehende URL: {existing_url}")
|
||
response = requests.get(existing_url)
|
||
if response.status_code == 200:
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
branche, umsatz = extract_infobox_data(soup, existing_url)
|
||
url = existing_url
|
||
print(f"Daten aus bestehender URL extrahiert - Branche: {branche}, Umsatz: {umsatz}")
|
||
|
||
# Wenn keine Daten gefunden wurden, suche neu
|
||
if url == "" or branche == "k.A." or umsatz == "k.A.":
|
||
url, branche, umsatz = get_wikipedia_data(firmenname, website)
|
||
|
||
break
|
||
except Exception as e:
|
||
print(f"⚠️ Fehler bei Wikipedia-Abruf (Versuch {attempt+1}): {str(e)[:100]}")
|
||
time.sleep(RETRY_DELAY)
|
||
if attempt == MAX_RETRIES - 1:
|
||
url = existing_url if existing_url else ""
|
||
branche, umsatz = "k.A.", "k.A."
|
||
|
||
# Hole aktuelle Werte aus dem Sheet, um sie nur zu ändern, wenn wir neue Daten haben
|
||
current_values = sheet.row_values(i+1)
|
||
|
||
# Vorbereitung der zu aktualisierenden Werte
|
||
values = [
|
||
branche if branche != "k.A." else (current_values[6] if len(current_values) > 6 else "k.A."),
|
||
"k.A.", # LinkedIn-Branche bleibt unverändert
|
||
umsatz if umsatz != "k.A." else (current_values[8] if len(current_values) > 8 else "k.A."),
|
||
"k.A.", "k.A.", "k.A.", # Die anderen Werte bleiben unverändert
|
||
url if url else (current_values[12] if len(current_values) > 12 else ""),
|
||
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"k.A.", "k.A.",
|
||
VERSION
|
||
]
|
||
|
||
# Aktualisiere das Sheet
|
||
sheet.update(range_name=f"G{i+1}:Q{i+1}", values=[values])
|
||
print(f"✅ Aktualisiert: Branche: {values[0]}, Umsatz: {values[2]}, URL: {values[6]}")
|
||
time.sleep(RETRY_DELAY)
|
||
|
||
print("\n✅ Wikipedia-Auswertung abgeschlossen")
|
||
|
||
|
||
|
||
|
||
|
||
|
||
# === SCHRITT 2: GPT-BEWERTUNG ===
|
||
def classify_company(row, wikipedia_url=""):
|
||
user_prompt = {
|
||
"role": "user",
|
||
"content": f"{row[0]};{row[1]};{row[2]};{row[4]};{row[5]}\nWikipedia-Link: {wikipedia_url}"
|
||
}
|
||
for attempt in range(MAX_RETRIES):
|
||
try:
|
||
response = openai.chat.completions.create(
|
||
model="gpt-3.5-turbo",
|
||
messages=[
|
||
{
|
||
"role": "system",
|
||
"content": (
|
||
"Du bist ein Experte für Brancheneinstufung und FSM-Potenzialbewertung.\n"
|
||
"Bitte beziehe dich ausschließlich auf das konkret genannte Unternehmen.\n"
|
||
"FSM steht für Field Service Management. Ziel ist es, Unternehmen mit >50 Technikern im Außendienst zu identifizieren.\n\n"
|
||
"Struktur: Firmenname; Website; Ort; Aktuelle Einstufung; Beschreibung der Branche Extern\n\n"
|
||
"Gib deine Antwort im CSV-Format zurück (1 Zeile, 8 Spalten):\n"
|
||
"Wikipedia-Branche;LinkedIn-Branche;Umsatz (Mio €);Empfohlene Neueinstufung;Begründung;FSM-Relevanz;Techniker-Einschätzung;Techniker-Begründung"
|
||
)
|
||
},
|
||
user_prompt
|
||
],
|
||
temperature=0,
|
||
timeout=15
|
||
)
|
||
full_text = response.choices[0].message.content.strip()
|
||
break
|
||
except Exception as e:
|
||
print(f"⚠️ GPT-Fehler (Versuch {attempt+1}): {str(e)[:100]}")
|
||
time.sleep(RETRY_DELAY)
|
||
else:
|
||
print("❌ GPT 3x fehlgeschlagen – Standardwerte")
|
||
full_text = "k.A.;k.A.;k.A.;k.A.;k.A.;k.A.;k.A.;k.A."
|
||
|
||
lines = full_text.splitlines()
|
||
csv_line = next((l for l in lines if ";" in l), "")
|
||
parts = [v.strip() for v in csv_line.split(";")] if csv_line else ["k.A."] * 8
|
||
|
||
with open(LOG_CSV, "a", newline="", encoding="utf-8") as log:
|
||
writer = csv.writer(log, delimiter=";")
|
||
writer.writerow([datetime.now().strftime("%Y-%m-%d %H:%M:%S"), row[0], *parts, full_text])
|
||
|
||
return parts
|
||
|
||
# === SCHRITT 2 DURCHFÜHREN ===
|
||
for i in range(start, min(start + DURCHLÄUFE, len(sheet_values))):
|
||
row = sheet_values[i]
|
||
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] GPT-Bewertung für Zeile {i+1}: {row[0]}")
|
||
wiki_url = row[12] if len(row) > 12 else ""
|
||
wiki, linkedin, umsatz_chat, new_cat, reason, fsm, techniker, techniker_reason = classify_company(row, wikipedia_url=wiki_url)
|
||
values = [
|
||
wiki,
|
||
linkedin,
|
||
umsatz_chat,
|
||
new_cat,
|
||
reason,
|
||
fsm,
|
||
wiki_url,
|
||
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
techniker,
|
||
techniker_reason
|
||
]
|
||
sheet.update(range_name=f"G{i+1}:P{i+1}", values=[values])
|
||
time.sleep(RETRY_DELAY)
|
||
|
||
print("\n✅ GPT-Bewertung abgeschlossen")
|