Files
Brancheneinstufung2/brancheneinstufung.py
2025-03-30 12:10:55 +00:00

222 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
import csv
import re
import pandas as pd
import gspread
import openai
import wikipedia
from bs4 import BeautifulSoup
import requests
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime
# === KONFIGURATION ===
EXCEL = "Bestandsfirmen.xlsx"
SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo"
CREDENTIALS = "service_account.json"
LANG = "de"
LOG_CSV = "gpt_antworten_log.csv"
DURCHLÄUFE = int(input("Wieviele Zeilen sollen überprüft werden? "))
MAX_RETRIES = 3
RETRY_DELAY = 5
# === OpenAI API-KEY LADEN ===
with open("api_key.txt", "r") as f:
openai.api_key = f.read().strip()
# === GOOGLE SHEET VERBINDUNG ===
scope = ["https://www.googleapis.com/auth/spreadsheets"]
creds = ServiceAccountCredentials.from_json_keyfile_name(CREDENTIALS, scope)
sheet = gspread.authorize(creds).open_by_url(SHEET_URL).sheet1
sheet_values = sheet.get_all_values()
# === STARTINDEX SUCHEN (Spalte N = Index 13) ===
filled_n = [row[13] if len(row) > 13 else '' for row in sheet_values[1:]]
start = next((i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()), len(filled_n) + 1)
print(f"Starte bei Zeile {start+1}")
wikipedia.set_lang(LANG)
# === SYSTEM PROMPT ===
branches = [
"Hersteller / Produzenten > Maschinenbau",
# ... (restliche Branchen wie zuvor)
"Gutachter / Versicherungen > Medizinische Gutachten"
]
system_prompt = {
"role": "system",
"content": (
"Du bist ein Experte für Brancheneinstufung und FSM-Potenzialbewertung. "
"Bitte beziehe dich ausschließlich auf das konkret genannte Unternehmen. Ähnlich klingende Firmennamen dürfen nicht verwendet werden.\n"
"FSM steht für Field Service Management Software zur Planung und Unterstützung mobiler Techniker.\n"
"Ziel ist es, Unternehmen mit >50 Technikern im Außeneinsatz zu identifizieren.\n\n"
"Struktur: Firmenname; Website; Ort; Aktuelle Einstufung; Beschreibung der Branche Extern\n\n"
"Gib deine Antwort im CSV-Format (1 Zeile, 8 Spalten, durch Semikolon getrennt):\n"
"Wikipedia-Branche;LinkedIn-Branche;Umsatz (Mio €);Empfohlene Neueinstufung;Begründung;FSM-Relevanz (Ja/Nein/k.A. mit Begründung);Techniker-Einschätzung;Techniker-Begründung\n\n"
"Falls ein Wikipedia-Link angegeben ist, vertraue ausschließlich den Angaben aus diesem Artikel für Branche und Umsatz.\n"
"Falls kein Wikipedia-Link existiert, gib für 'Wikipedia-Branche' und 'Umsatz (Mio €)' bitte 'k.A.' aus.\n\n"
"Ziel-Branchenschema:\n" + "\n".join(branches)
)
}
WHITELIST_KATEGORIEN = ["unternehmen", "hersteller", "produktion", "industrie",
"maschinenbau", "technik", "dienstleistungsunternehmen",
"chemie", "pharma", "elektrotechnik", "medizintechnik"]
def extract_domain_key(url):
"""Extrahiert den Domain-Schlüssel aus der Website-URL"""
if not url:
return ""
clean_url = url.replace("https://", "").replace("http://", "").split("/")[0]
parts = clean_url.split(".")
return parts[0] if len(parts) > 1 else ""
def parse_infobox(soup):
"""Extrahiert Branche und Umsatz aus der Wikipedia-Infobox"""
infobox = soup.find("table", class_=["infobox", "infobox vcard"])
branche = umsatz = ""
if infobox:
for row in infobox.find_all("tr"):
th = row.find("th")
td = row.find("td")
if not th or not td:
continue
# Branchenerkennung
if "branche" in th.text.lower():
branche = td.get_text(separator=" ", strip=True)
# Umsatzerkennung
if "umsatz" in th.text.lower():
umsatz_text = td.get_text(strip=True)
if "Mio" in umsatz_text:
match = re.search(r"(\d+[\d.,]*)\s*Mio", umsatz_text)
if match:
umsatz = match.group(1).replace(",", ".")
return branche, umsatz
def get_wikipedia_data(name, website_hint=""):
"""Sucht Wikipedia-Daten mit erweiterter Validierung"""
domain_key = extract_domain_key(website_hint)
search_terms = [name, domain_key] if domain_key else [name]
for term in search_terms:
if not term:
continue
for attempt in range(MAX_RETRIES):
try:
results = wikipedia.search(term, results=3)
for title in results:
try:
page = wikipedia.page(title, auto_suggest=False)
html = requests.get(page.url, timeout=10).text
# Validierung der Übereinstimmung
content_check = (
name.split()[0].lower() in page.content.lower() or
(domain_key and domain_key.lower() in html.lower())
)
if content_check:
soup = BeautifulSoup(html, "html.parser")
branche, umsatz = parse_infobox(soup)
# Fallback auf Kategorien
if not branche:
for category in page.categories:
if any(kw in category.lower() for kw in WHITELIST_KATEGORIEN):
branche = category
break
return page.url, branche or "k.A.", umsatz or "k.A."
except (wikipedia.exceptions.PageError,
wikipedia.exceptions.DisambiguationError,
requests.exceptions.RequestException):
continue
except Exception as e:
print(f"⚠️ Wikipedia-Fehler ({term}, Versuch {attempt+1}): {str(e)[:100]}")
time.sleep(RETRY_DELAY)
return "", "k.A.", "k.A."
def classify_company(row, wikipedia_url=""):
"""Verarbeitet die GPT-Klassifizierung mit Wikipedia-Integration"""
user_prompt = {
"role": "user",
"content": f"{row[0]};{row[1]};{row[2]};{row[4]};{row[5]}\nWikipedia-Link: {wikipedia_url}"
}
# GPT-Abfrage mit Retry-Logik
for attempt in range(MAX_RETRIES):
try:
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[system_prompt, user_prompt],
temperature=0,
request_timeout=15
)
full_text = response.choices[0].message.content.strip()
break
except Exception as e:
print(f"⚠️ GPT-Fehler (Versuch {attempt+1}): {str(e)[:100]}")
time.sleep(RETRY_DELAY)
else:
print("❌ GPT 3x fehlgeschlagen setze auf Standardwerte")
full_text = "k.A.;k.A.;k.A.;k.A.;k.A.;k.A.;k.A.;k.A."
# Antwortverarbeitung
csv_line = next((l for l in full_text.splitlines() if ";" in l and not l.startswith("Wikipedia-Branche")), "")
parts = [v.strip().strip('"') for v in csv_line.split(";")] if csv_line else ["k.A."] * 8
parts += ["k.A."] * (8 - len(parts)) # Padding für fehlende Werte
# Logging
with open(LOG_CSV, "a", newline="", encoding="utf-8") as log:
writer = csv.writer(log, delimiter=";")
writer.writerow([datetime.now().strftime("%Y-%m-%d %H:%M:%S"), row[0], *parts, full_text])
return parts
# === HAUPTPROZESS ===
for i in range(start, min(start + DURCHLÄUFE, len(sheet_values))):
row = sheet_values[i]
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {i+1}: {row[0]}")
# Wikipedia-Datenabfrage
url, wiki_branche, umsatz = get_wikipedia_data(row[0], row[1])
# GPT-Klassifizierung
gpt_data = classify_company(row, url)
(wiki_gpt, linkedin, umsatz_gpt,
new_cat, reason, fsm, techniker, techniker_reason) = gpt_data
# Priorisierung der Wikipedia-Daten
final_wiki = wiki_branche if url else "k.A."
final_umsatz = umsatz if url else "k.A."
# Daten für Google Sheet
values = [
final_wiki, # G: Wikipedia-Branche
linkedin, # H: LinkedIn-Branche
final_umsatz, # I: Umsatz
new_cat, # J: Empfohlene Neueinstufung
reason, # K: Begründung
fsm, # L: FSM-Relevanz
url, # M: Wikipedia-URL
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # N: Letzte Prüfung
techniker, # O: Techniker-Einschätzung
techniker_reason # P: Techniker-Begründung
]
# Google Sheet Update
sheet.update(f"G{i+1}:P{i+1}", [values])
print(f"✅ Aktualisiert: {values[:3]}...")
time.sleep(RETRY_DELAY)
print("\n✅ Alle Durchläufe erfolgreich abgeschlossen")