Files
Brancheneinstufung2/brancheneinstufung.py
Floke e0ea9f8e4d v1.3.5 Erweiterung: FSM-Eignungsprüfung & Servicetechniker-Explanation, Sheet-Update-Check, automati
Wikipedia-Artikelvorschlag aus Spalte K wird bevorzugt genutzt.

Nach dem Schreiben der Wikipedia-Daten wird überprüft, ob das Update abgeschlossen ist (mit einer max. 5‑s‑Wartezeit) und es folgt eine 3‑s Pause.

Der FSM-Eignungsparser wurde angepasst, um auch freiere Antworten zu verarbeiten.

Bei Diskrepanzen in der Servicetechniker-Schätzung wird nun zusätzlich eine detaillierte Erklärung von ChatGPT angefordert.

Die Spalten AF und AG werden vorerst mit "XX" befüllt.

Alle Debug-Ausgaben werden automatisch in einer Log-Datei im Ordner "Log" mit Datum und Versionsnummer gespeichert.
2025-04-02 13:58:23 +00:00

787 lines
37 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
import re
import gspread
import wikipedia
import requests
import openai
from bs4 import BeautifulSoup
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime
from difflib import SequenceMatcher
import unicodedata
import csv
# ==================== KONFIGURATION ====================
class Config:
VERSION = "v1.3.5" # v1.3.5: FSM-Prüfung mit flexiblem Parser, Servicetechniker-Explanation, Log-Datei, Warten bis Sheet-Update.
LANG = "de"
CREDENTIALS_FILE = "service_account.json"
SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo"
MAX_RETRIES = 3
RETRY_DELAY = 5
LOG_CSV = "gpt_antworten_log.csv"
SIMILARITY_THRESHOLD = 0.65
DEBUG = True
WIKIPEDIA_SEARCH_RESULTS = 5
HTML_PARSER = "html.parser"
# Log-Datei vorbereiten
if not os.path.exists("Log"):
os.makedirs("Log")
LOG_FILE = os.path.join("Log", f"{datetime.now().strftime('%d-%m-%Y_%H-%M')}_{Config.VERSION.replace('.', '')}.txt")
def debug_print(message):
if Config.DEBUG:
print(f"[DEBUG] {message}")
try:
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[DEBUG] {message}\n")
except Exception as e:
print(f"[DEBUG] Log-Schreibfehler: {e}")
def clean_text(text):
if not text:
return "k.A."
text = unicodedata.normalize("NFKC", str(text))
text = re.sub(r'\[\d+\]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text if text else "k.A."
def normalize_company_name(name):
if not name:
return ""
forms = [
r'gmbh', r'g\.m\.b\.h\.', r'ug', r'u\.g\.', r'ug \(haftungsbeschränkt\)',
r'u\.g\. \(haftungsbeschränkt\)', r'ag', r'a\.g\.', r'ohg', r'o\.h\.g\.',
r'kg', r'k\.g\.', r'gmbh & co\.?\s*kg', r'g\.m\.b\.h\. & co\.?\s*k\.g\.',
r'ag & co\.?\s*kg', r'a\.g\. & co\.?\s*k\.g\.', r'e\.k\.', r'e\.kfm\.',
r'e\.kfr\.', r'ltd\.', r'ltd & co\.?\s*kg', r's\.a r\.l\.', r'stiftung',
r'genossenschaft', r'ggmbh', r'gug', r'partg', r'partgmbb', r'kgaa', r'se',
r'og', r'o\.g\.', r'e\.u\.', r'ges\.n\.b\.r\.', r'genmbh', r'verein',
r'kollektivgesellschaft', r'kommanditgesellschaft', r'einzelfirma', r'sàrl',
r'sa', r'sagl', r'gmbh & co\.?\s*ohg', r'ag & co\.?\s*ohg', r'gmbh & co\.?\s*kgaa',
r'ag & co\.?\s*kgaa', r's\.a\.', r's\.p\.a\.', r'b\.v\.', r'n\.v\.'
]
pattern = r'\b(' + '|'.join(forms) + r')\b'
normalized = re.sub(pattern, '', name, flags=re.IGNORECASE)
normalized = re.sub(r'[\-]', ' ', normalized)
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized.lower()
def extract_numeric_value(raw_value, is_umsatz=False):
raw_value = raw_value.strip()
if not raw_value:
return "k.A."
raw_value = re.sub(r'\b(ca\.?|circa|über)\b', '', raw_value, flags=re.IGNORECASE)
raw = raw_value.lower().replace("\xa0", " ")
match = re.search(r'([\d.,]+)', raw, flags=re.UNICODE)
if not match or not match.group(1).strip():
debug_print(f"Keine numerischen Zeichen gefunden im Rohtext: '{raw_value}'")
return "k.A."
num_str = match.group(1)
if ',' in num_str:
num_str = num_str.replace('.', '').replace(',', '.')
try:
num = float(num_str)
except Exception as e:
debug_print(f"Fehler bei der Umwandlung von '{num_str}' (Rohtext: '{raw_value}'): {e}")
return raw_value
else:
num_str = num_str.replace(' ', '').replace('.', '')
try:
num = float(num_str)
except Exception as e:
debug_print(f"Fehler bei der Umwandlung von '{num_str}' (Rohtext: '{raw_value}'): {e}")
return raw_value
if is_umsatz:
if "mrd" in raw or "milliarden" in raw:
num *= 1000
elif "mio" in raw or "millionen" in raw:
pass
else:
num /= 1e6
return str(int(round(num)))
else:
return str(int(round(num)))
def compare_umsatz_values(crm, wiki):
debug_print(f"Vergleich CRM Umsatz: '{crm}' mit Wikipedia Umsatz: '{wiki}'")
try:
crm_val = float(crm)
wiki_val = float(wiki)
except Exception as e:
debug_print(f"Fehler beim Umwandeln der Werte: CRM='{crm}', Wiki='{wiki}': {e}")
return "Daten unvollständig"
if crm_val == 0:
return "CRM Umsatz 0"
diff = abs(crm_val - wiki_val) / crm_val
if diff < 0.1:
return "OK"
else:
diff_mio = abs(crm_val - wiki_val)
return f"Abweichung: {int(round(diff_mio))} Mio €"
def evaluate_umsatz_chatgpt(company_name, wiki_umsatz):
try:
with open("api_key.txt", "r") as f:
api_key = f.read().strip()
except Exception as e:
debug_print(f"Fehler beim Lesen des API-Tokens: {e}")
return "k.A."
openai.api_key = api_key
prompt = (
f"Bitte schätze den Umsatz in Mio. Euro für das Unternehmen '{company_name}'. "
f"Die Wikipedia-Daten zeigen: '{wiki_umsatz}'. "
"Antworte nur mit der Zahl."
)
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.0
)
result = response.choices[0].message.content.strip()
debug_print(f"ChatGPT Umsatzschätzung: '{result}'")
try:
value = float(result.replace(',', '.'))
return str(int(round(value)))
except Exception as conv_e:
debug_print(f"Fehler bei der Verarbeitung der Umsatzschätzung '{result}': {conv_e}")
return result
except Exception as e:
debug_print(f"Fehler beim Aufruf der ChatGPT API für Umsatzschätzung: {e}")
return "k.A."
def validate_article_with_chatgpt(crm_data, wiki_data):
crm_headers = "Firmenname;Website;Ort;Beschreibung;Aktuelle Branche;Beschreibung Branche extern;Anzahl Techniker;Umsatz (CRM);Anzahl Mitarbeiter (CRM)"
wiki_headers = "Wikipedia URL;Wikipedia Absatz;Wikipedia Branche;Wikipedia Umsatz;Wikipedia Mitarbeiter;Wikipedia Kategorien"
prompt_text = (
"Bitte überprüfe, ob die folgenden beiden Datensätze zum gleichen Unternehmen gehören. "
"Die erste Zeile sind Daten aus unserem CRM-System, die zweite Zeile stammen aus Wikipedia. "
"Vergleiche insbesondere den Firmennamen, den Ort und die Branche. Unterschiede im Umsatz können toleriert werden, "
"solange sie im Rahmen von 10% liegen. Falls die Datensätze übereinstimmen, antworte ausschließlich mit 'OK'. "
"Falls nicht, nenne bitte den wichtigsten Grund (z.B. abweichender Firmenname oder Ort). \n\n"
f"CRM-Daten:\n{crm_headers}\n{crm_data}\n\n"
f"Wikipedia-Daten:\n{wiki_headers}\n{wiki_data}\n\n"
"Antwort: "
)
try:
with open("api_key.txt", "r") as f:
api_key = f.read().strip()
except Exception as e:
debug_print(f"Fehler beim Lesen des API-Tokens: {e}")
return "k.A."
openai.api_key = api_key
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": prompt_text}],
temperature=0.0
)
result = response.choices[0].message.content.strip()
debug_print(f"Validierungsantwort ChatGPT: '{result}'")
return result
except Exception as e:
debug_print(f"Fehler beim Validierungs-API-Aufruf: {e}")
return "k.A."
# ==================== NEUE FUNKTION: FSM-EIGNUNGSPRÜFUNG ====================
def evaluate_fsm_suitability(company_name, company_data):
try:
with open("api_key.txt", "r") as f:
api_key = f.read().strip()
except Exception as e:
debug_print(f"Fehler beim Lesen des API-Tokens (FSM): {e}")
return {"suitability": "k.A.", "justification": "k.A."}
openai.api_key = api_key
prompt = (
f"Bitte bewerte, ob das Unternehmen '{company_name}' für den Einsatz einer Field Service Management Lösung geeignet ist. "
"Berücksichtige, dass ein Unternehmen mit einem technischen Außendienst, idealerweise mit über 50 Technikern und "
"Disponenten, die mit der Planung mobiler Ressourcen beschäftigt sind, als geeignet gilt. Nutze dabei verifizierte "
"Wikipedia-Daten und deine eigene Einschätzung. Antworte ausschließlich mit 'Ja' oder 'Nein' und gib eine kurze Begründung."
)
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": prompt}],
temperature=0.0
)
result = response.choices[0].message.content.strip()
debug_print(f"FSM-Eignungsantwort ChatGPT: '{result}'")
# Flexibler Parser: Falls keine Zeilen mit ":" vorhanden sind, nimm den ersten Satz.
suitability = "k.A."
justification = ""
lines = result.split("\n")
if len(lines) == 1:
parts = result.split(" ", 1)
suitability = parts[0].strip()
justification = parts[1].strip() if len(parts) > 1 else ""
else:
for line in lines:
if line.lower().startswith("eignung:"):
suitability = line.split(":", 1)[1].strip()
elif line.lower().startswith("begründung:"):
justification = line.split(":", 1)[1].strip()
if suitability not in ["Ja", "Nein"]:
parts = result.split(" ", 1)
suitability = parts[0].strip()
justification = " ".join(result.split()[1:]).strip()
return {"suitability": suitability, "justification": justification}
except Exception as e:
debug_print(f"Fehler beim Aufruf der ChatGPT API für FSM-Eignungsprüfung: {e}")
return {"suitability": "k.A.", "justification": "k.A."}
# ==================== NEUE FUNKTION: SCHÄTZUNG DER ANZAHL SERVICETECHNIKER ====================
def evaluate_servicetechnicians_estimate(company_name, company_data):
try:
with open("api_key.txt", "r") as f:
api_key = f.read().strip()
except Exception as e:
debug_print(f"Fehler beim Lesen des API-Tokens (Servicetechniker): {e}")
return "k.A."
openai.api_key = api_key
prompt = (
f"Bitte schätze auf Basis öffentlich zugänglicher Informationen (vor allem verifizierte Wikipedia-Daten) "
f"die Anzahl der Servicetechniker des Unternehmens '{company_name}' ein. "
"Gib die Antwort ausschließlich in einer der folgenden Kategorien aus: "
"'<50 Techniker', '>100 Techniker', '>200 Techniker', '>500 Techniker'."
)
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": prompt}],
temperature=0.0
)
result = response.choices[0].message.content.strip()
debug_print(f"Schätzung Servicetechniker ChatGPT: '{result}'")
return result
except Exception as e:
debug_print(f"Fehler beim Aufruf der ChatGPT API für Servicetechniker-Schätzung: {e}")
return "k.A."
def evaluate_servicetechnicians_explanation(company_name, st_estimate, company_data):
try:
with open("api_key.txt", "r") as f:
api_key = f.read().strip()
except Exception as e:
debug_print(f"Fehler beim Lesen des API-Tokens (ST-Erklärung): {e}")
return "k.A."
openai.api_key = api_key
prompt = (
f"Bitte erkläre, warum du für das Unternehmen '{company_name}' die Anzahl der Servicetechniker als '{st_estimate}' geschätzt hast. "
"Berücksichtige dabei öffentlich zugängliche Informationen wie Branche, Umsatz, Mitarbeiterzahl und andere relevante Daten."
)
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": prompt}],
temperature=0.0
)
result = response.choices[0].message.content.strip()
debug_print(f"Servicetechniker-Erklärung ChatGPT: '{result}'")
return result
except Exception as e:
debug_print(f"Fehler beim Aufruf der ChatGPT API für Servicetechniker-Erklärung: {e}")
return "k.A."
def map_internal_technicians(value):
try:
num = int(value)
except Exception:
return "k.A."
if num < 50:
return "<50 Techniker"
elif num < 100:
return ">100 Techniker"
elif num < 200:
return ">200 Techniker"
else:
return ">500 Techniker"
# ==================== WARTEN BIS ZELLE AKTUALISIERT IST ====================
def wait_for_sheet_update(sheet, cell, expected_value, timeout=5):
start_time = time.time()
while time.time() - start_time < timeout:
try:
current_value = sheet.acell(cell).value
if current_value == expected_value:
return True
except Exception as e:
debug_print(f"Fehler beim Lesen von Zelle {cell}: {e}")
time.sleep(0.5)
return False
# ==================== BRANCHENABGLEICH PER CHATGPT ====================
def load_target_branches():
try:
with open("ziel_Branchenschema.csv", "r", encoding="utf-8") as csvfile:
reader = csv.reader(csvfile)
branches = [row[0] for row in reader if row]
return branches
except Exception as e:
debug_print(f"Fehler beim Laden des Ziel-Branchenschemas: {e}")
return []
focus_branches = [
"Gutachter / Versicherungen > Baugutachter",
"Gutachter / Versicherungen > Technische Gutachten",
"Gutachter / Versicherungen > Versicherungsgutachten",
"Gutachter / Versicherungen > Medizinische Gutachten",
"Hersteller / Produzenten > Anlagenbau",
"Hersteller / Produzenten > Automaten (Vending, Slot)",
"Hersteller / Produzenten > Gebäudetechnik Allgemein",
"Hersteller / Produzenten > Gebäudetechnik Heizung, Lüftung, Klima",
"Hersteller / Produzenten > Maschinenbau",
"Hersteller / Produzenten > Medizintechnik",
"Service provider (Dienstleister) > Aufzüge und Rolltreppen",
"Service provider (Dienstleister) > Feuer- und Sicherheitssysteme",
"Service provider (Dienstleister) > Servicedienstleister / Reparatur ohne Produktion",
"Service provider (Dienstleister) > Facility Management",
"Versorger > Telekommunikation"
]
def evaluate_branche_chatgpt(crm_branche, beschreibung, wiki_branche, wiki_kategorien):
target_branches = load_target_branches()
target_branches_str = "\n".join(target_branches)
focus_branches_str = "\n".join(focus_branches)
try:
with open("api_key.txt", "r") as f:
api_key = f.read().strip()
except Exception as e:
debug_print(f"Fehler beim Lesen des API-Tokens (Branche): {e}")
return {"branch": "k.A.", "consistency": "k.A.", "justification": "k.A."}
openai.api_key = api_key
system_prompt = (
"Du bist ein Experte im Field Service Management. Deine Aufgabe ist es, ein Unternehmen basierend auf folgenden Angaben einer Branche zuzuordnen.\n\n"
f"CRM-Branche (Spalte F): {crm_branche}\n"
f"Branchenbeschreibung (Spalte G): {beschreibung}\n"
f"Wikipedia-Branche (Spalte N): {wiki_branche}\n"
f"Wikipedia-Kategorien (Spalte Q): {wiki_kategorien}\n\n"
"Das Ziel-Branchenschema umfasst ALLE gültigen Branchen, also sowohl Fokusbranchen als auch weitere, z.B. 'Housing > Sozialbau Unternehmen'.\n"
"Das vollständige Ziel-Branchenschema lautet:\n"
f"{target_branches_str}\n\n"
"Falls das Unternehmen mehreren Branchen zugeordnet werden könnte, wähle bitte bevorzugt eine Branche aus der folgenden Fokusliste, sofern zutreffend:\n"
f"{focus_branches_str}\n\n"
"Gewichtung der Angaben:\n"
"1. Wikipedia-Branche (Spalte N) zusammen mit Wikipedia-Kategorien (Spalte Q) (höchste Priorität, wenn verifiziert)\n"
"2. Branchenbeschreibung (Spalte G)\n"
"3. CRM-Branche (Spalte F)\n\n"
"Ordne das Unternehmen exakt einer der oben genannten Branchen zu (es dürfen keine zusätzlichen Branchen erfunden werden). "
"Bitte antworte in folgendem Format (ohne zusätzliche Informationen):\n"
"Branche: <vorgeschlagene Branche>\n"
"Übereinstimmung: <ok oder X>\n"
"Begründung: <kurze Begründung, falls abweichend, ansonsten leer>"
)
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": system_prompt}],
temperature=0.0
)
result = response.choices[0].message.content.strip()
debug_print(f"Branchenabgleich ChatGPT Antwort: '{result}'")
branch = "k.A."
consistency = "k.A."
justification = ""
for line in result.split("\n"):
if line.lower().startswith("branche:"):
branch = line.split(":", 1)[1].strip()
elif line.lower().startswith("übereinstimmung:"):
consistency = line.split(":", 1)[1].strip()
elif line.lower().startswith("begründung:"):
justification = line.split(":", 1)[1].strip()
return {"branch": branch, "consistency": consistency, "justification": justification}
except Exception as e:
debug_print(f"Fehler beim Aufruf der ChatGPT API für Branchenabgleich: {e}")
return {"branch": "k.A.", "consistency": "k.A.", "justification": "k.A."}
# ==================== GOOGLE SHEET HANDLER ====================
class GoogleSheetHandler:
def __init__(self):
self.sheet = None
self.sheet_values = []
self._connect()
def _connect(self):
scope = ["https://www.googleapis.com/auth/spreadsheets"]
creds = ServiceAccountCredentials.from_json_keyfile_name(Config.CREDENTIALS_FILE, scope)
self.sheet = gspread.authorize(creds).open_by_url(Config.SHEET_URL).sheet1
self.sheet_values = self.sheet.get_all_values()
def get_start_index(self):
filled_n = [row[13] if len(row) > 13 else '' for row in self.sheet_values[1:]]
return next((i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()), len(filled_n) + 1)
# ==================== ALIGNMENT DEMO (Modus 3) ====================
def alignment_demo(sheet):
new_headers = [
"Spalte A (ReEval Flag)",
"Spalte B (Firmenname)",
"Spalte C (Website)",
"Spalte D (Ort)",
"Spalte E (Beschreibung)",
"Spalte F (Aktuelle Branche)",
"Spalte G (Beschreibung Branche extern)",
"Spalte H (Anzahl Techniker CRM)",
"Spalte I (Umsatz CRM)",
"Spalte J (Anzahl Mitarbeiter CRM)",
"Spalte K (Vorschlag Wiki URL)",
"Spalte L (Wikipedia URL)",
"Spalte M (Wikipedia Absatz)",
"Spalte N (Wikipedia Branche)",
"Spalte O (Wikipedia Umsatz)",
"Spalte P (Wikipedia Mitarbeiter)",
"Spalte Q (Wikipedia Kategorien)",
"Spalte R (Konsistenzprüfung)",
"Spalte S (Begründung bei Inkonsistenz)",
"Spalte T (Vorschlag Wiki Artikel ChatGPT)",
"Spalte U (Begründung bei Abweichung)",
"Spalte V (Vorschlag neue Branche)",
"Spalte W (Konsistenzprüfung Branche)",
"Spalte X (Begründung Abweichung Branche)",
"Spalte Y (FSM Relevanz Ja / Nein)",
"Spalte Z (Begründung für FSM Relevanz)",
"Spalte AA (Schätzung Anzahl Mitarbeiter)",
"Spalte AB (Konsistenzprüfung Mitarbeiterzahl)",
"Spalte AC (Begründung für Abweichung Mitarbeiterzahl)",
"Spalte AD (Einschätzung Anzahl Servicetechniker)",
"Spalte AE (Begründung bei Abweichung Anzahl Servicetechniker)",
"Spalte AF (Schätzung Umsatz ChatGPT)",
"Spalte AG (Begründung für Abweichung Umsatz)",
"Spalte AH (Timestamp letzte Prüfung)",
"Spalte AI (Version)"
]
header_range = "A11200:AI11200"
sheet.update(values=[new_headers], range_name=header_range)
print("Alignment-Demo abgeschlossen: Neue Spaltenüberschriften in Zeile 11200 geschrieben.")
# ==================== WIKIPEDIA SCRAPER ====================
class WikipediaScraper:
def __init__(self):
wikipedia.set_lang(Config.LANG)
def _get_full_domain(self, website):
if not website:
return ""
website = website.lower().strip()
website = re.sub(r'^https?:\/\/', '', website)
website = re.sub(r'^www\.', '', website)
return website.split('/')[0]
def _generate_search_terms(self, company_name, website):
terms = []
full_domain = self._get_full_domain(website)
if full_domain:
terms.append(full_domain)
normalized_name = normalize_company_name(company_name)
candidate = " ".join(normalized_name.split()[:2]).strip()
if candidate and candidate not in terms:
terms.append(candidate)
if normalized_name and normalized_name not in terms:
terms.append(normalized_name)
debug_print(f"Generierte Suchbegriffe: {terms}")
return terms
def _validate_article(self, page, company_name, website):
full_domain = self._get_full_domain(website)
domain_found = False
if full_domain:
try:
html_raw = requests.get(page.url).text
soup = BeautifulSoup(html_raw, Config.HTML_PARSER)
infobox = soup.find('table', class_=lambda c: c and 'infobox' in c.lower())
if infobox:
links = infobox.find_all('a', href=True)
for link in links:
href = link.get('href').lower()
if href.startswith('/wiki/datei:'):
continue
if full_domain in href:
debug_print(f"Definitiver Link-Match in Infobox gefunden: {href}")
domain_found = True
break
if not domain_found and hasattr(page, 'externallinks'):
for ext_link in page.externallinks:
if full_domain in ext_link.lower():
debug_print(f"Definitiver Link-Match in externen Links gefunden: {ext_link}")
domain_found = True
break
except Exception as e:
debug_print(f"Fehler beim Extrahieren von Links: {str(e)}")
normalized_title = normalize_company_name(page.title)
normalized_company = normalize_company_name(company_name)
similarity = SequenceMatcher(None, normalized_title, normalized_company).ratio()
debug_print(f"Ähnlichkeit (normalisiert): {similarity:.2f} ({normalized_title} vs {normalized_company})")
threshold = 0.60 if domain_found else Config.SIMILARITY_THRESHOLD
return similarity >= threshold
def extract_first_paragraph(self, page_url):
try:
response = requests.get(page_url)
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
paragraphs = soup.find_all('p')
for p in paragraphs:
text = clean_text(p.get_text())
if len(text) > 50:
return text
return "k.A."
except Exception as e:
debug_print(f"Fehler beim Extrahieren des ersten Absatzes: {e}")
return "k.A."
def extract_categories(self, soup):
cat_div = soup.find('div', id="mw-normal-catlinks")
if cat_div:
ul = cat_div.find('ul')
if ul:
cats = [clean_text(li.get_text()) for li in ul.find_all('li')]
return ", ".join(cats)
return "k.A."
def _extract_infobox_value(self, soup, target):
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
if not infobox:
return "k.A."
keywords_map = {
'branche': ['branche', 'industrie', 'tätigkeit', 'geschäftsfeld', 'sektor', 'produkte', 'leistungen', 'aktivitäten', 'wirtschaftszweig'],
'umsatz': ['umsatz', 'jahresumsatz', 'konzernumsatz', 'gesamtumsatz', 'erlöse', 'umsatzerlöse', 'einnahmen', 'ergebnis', 'jahresergebnis'],
'mitarbeiter': ['mitarbeiter', 'beschäftigte', 'personal', 'mitarbeiterzahl', 'angestellte', 'belegschaft', 'personalstärke']
}
keywords = keywords_map.get(target, [])
for row in infobox.find_all('tr'):
header = row.find('th')
if header:
header_text = clean_text(header.get_text()).lower()
if any(kw in header_text for kw in keywords):
value = row.find('td')
if value:
raw_value = clean_text(value.get_text())
if target == 'branche':
clean_val = re.sub(r'\[.*?\]|\(.*?\)', '', raw_value)
return ' '.join(clean_val.split()).strip()
if target == 'umsatz':
return extract_numeric_value(raw_value, is_umsatz=True)
if target == 'mitarbeiter':
return extract_numeric_value(raw_value, is_umsatz=False)
return "k.A."
def extract_full_infobox(self, soup):
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
if not infobox:
return "k.A."
return clean_text(infobox.get_text(separator=' | '))
def extract_fields_from_infobox_text(self, infobox_text, field_names):
result = {}
tokens = [token.strip() for token in infobox_text.split("|") if token.strip()]
for i, token in enumerate(tokens):
for field in field_names:
if field.lower() in token.lower():
j = i + 1
while j < len(tokens) and not tokens[j]:
j += 1
result[field] = tokens[j] if j < len(tokens) else "k.A."
return result
def extract_company_data(self, page_url):
if not page_url:
return {
'url': 'k.A.',
'first_paragraph': 'k.A.',
'branche': 'k.A.',
'umsatz': 'k.A.',
'mitarbeiter': 'k.A.',
'categories': 'k.A.',
'full_infobox': 'k.A.'
}
try:
response = requests.get(page_url)
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
full_infobox = self.extract_full_infobox(soup)
extracted_fields = self.extract_fields_from_infobox_text(full_infobox, ['Branche', 'Umsatz', 'Mitarbeiter'])
raw_branche = extracted_fields.get('Branche', self._extract_infobox_value(soup, 'branche'))
raw_umsatz = extracted_fields.get('Umsatz', self._extract_infobox_value(soup, 'umsatz'))
raw_mitarbeiter = extracted_fields.get('Mitarbeiter', self._extract_infobox_value(soup, 'mitarbeiter'))
umsatz_val = extract_numeric_value(raw_umsatz, is_umsatz=True)
mitarbeiter_val = extract_numeric_value(raw_mitarbeiter, is_umsatz=False)
categories_val = self.extract_categories(soup)
first_paragraph = self.extract_first_paragraph(page_url)
return {
'url': page_url,
'first_paragraph': first_paragraph,
'branche': raw_branche,
'umsatz': umsatz_val,
'mitarbeiter': mitarbeiter_val,
'categories': categories_val,
'full_infobox': full_infobox
}
except Exception as e:
debug_print(f"Extraktionsfehler: {str(e)}")
return {
'url': 'k.A.',
'first_paragraph': 'k.A.',
'branche': 'k.A.',
'umsatz': 'k.A.',
'mitarbeiter': 'k.A.',
'categories': 'k.A.',
'full_infobox': 'k.A.'
}
@retry_on_failure
def search_company_article(self, company_name, website):
# Zuerst prüfen: Gibt es in Spalte K bereits einen Wikipedia-Vorschlag?
search_terms = self._generate_search_terms(company_name, website)
for term in search_terms:
try:
results = wikipedia.search(term, results=Config.WIKIPEDIA_SEARCH_RESULTS)
debug_print(f"Suchergebnisse für '{term}': {results}")
for title in results:
try:
page = wikipedia.page(title, auto_suggest=False)
if self._validate_article(page, company_name, website):
return page
except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e:
debug_print(f"Seitenfehler: {str(e)}")
continue
except Exception as e:
debug_print(f"Suchfehler: {str(e)}")
continue
return None
# ==================== DATA PROCESSOR ====================
class DataProcessor:
def __init__(self):
self.sheet_handler = GoogleSheetHandler()
self.wiki_scraper = WikipediaScraper()
def process_rows(self, num_rows=None):
if MODE == "2":
print("Re-Evaluierungsmodus: Verarbeitung aller Zeilen mit 'x' in Spalte A.")
elif MODE == "3":
print("Alignment-Demo-Modus: Schreibe neue Spaltenüberschriften in Zeile 11200.")
alignment_demo(self.sheet_handler.sheet)
return
else:
start_index = self.sheet_handler.get_start_index()
print(f"Starte bei Zeile {start_index+1}")
for i, row in enumerate(self.sheet_handler.sheet_values[1:], start=2):
if MODE == "2":
if row[0].strip().lower() == "x":
self._process_single_row(i, row)
else:
if i >= self.sheet_handler.get_start_index():
self._process_single_row(i, row)
def _process_single_row(self, row_num, row_data):
company_name = row_data[1] if len(row_data) > 1 else ""
website = row_data[2] if len(row_data) > 2 else ""
wiki_update_range = f"K{row_num}:Q{row_num}"
chatgpt_range = f"AF{row_num}"
abgleich_range = f"AG{row_num}"
valid_range = f"R{row_num}"
dt_range = f"AH{row_num}"
ver_range = f"AI{row_num}"
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {row_num}: {company_name}")
# Prüfen: Wikipedia-Vorschlag in Spalte K?
if len(row_data) > 10 and row_data[10].strip() not in ["", "k.A."]:
wiki_url = row_data[10].strip()
try:
company_data = self.wiki_scraper.extract_company_data(wiki_url)
except Exception as e:
debug_print(f"Fehler beim Laden des vorgeschlagenen Wikipedia-Artikels: {e}")
article = self.wiki_scraper.search_company_article(company_name, website)
company_data = self.wiki_scraper.extract_company_data(article.url) if article else {
'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.',
'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.',
'full_infobox': 'k.A.'
}
else:
article = self.wiki_scraper.search_company_article(company_name, website)
company_data = self.wiki_scraper.extract_company_data(article.url) if article else {
'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.',
'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.',
'full_infobox': 'k.A.'
}
wiki_values = [
row_data[10] if len(row_data) > 10 and row_data[10].strip() not in ["", "k.A."] else "k.A.",
company_data.get('url', 'k.A.'),
company_data.get('first_paragraph', 'k.A.'),
company_data.get('branche', 'k.A.'),
company_data.get('umsatz', 'k.A.'),
company_data.get('mitarbeiter', 'k.A.'),
company_data.get('categories', 'k.A.')
]
self.sheet_handler.sheet.update(values=[wiki_values], range_name=wiki_update_range)
# Warten, bis das Update im Sheet übernommen wurde (prüfe Zelle K{row_num})
wait_for_sheet_update(self.sheet_handler.sheet, f"K{row_num}", wiki_values[0])
time.sleep(3)
# Umsatz-Schätzung (Spalte AF soll "XX" erhalten)
self.sheet_handler.sheet.update(values=[["XX"]], range_name=chatgpt_range)
# Umsatz-Abgleich (Spalte AG)
crm_umsatz = row_data[8] if len(row_data) > 8 else "k.A."
abgleich_result = compare_umsatz_values(crm_umsatz, company_data.get('umsatz', 'k.A.'))
self.sheet_handler.sheet.update(values=[[abgleich_result]], range_name=abgleich_range)
# Validierung
crm_data = ";".join(row_data[1:10])
wiki_data = ";".join(row_data[11:17])
valid_result = validate_article_with_chatgpt(crm_data, wiki_data)
self.sheet_handler.sheet.update(values=[[valid_result]], range_name=valid_range)
# Branchenabgleich
crm_branche = row_data[5] if len(row_data) > 5 else "k.A."
beschreibung_branche = row_data[6] if len(row_data) > 6 else "k.A."
wiki_branche = company_data.get('branche', 'k.A.')
wiki_kategorien = company_data.get('categories', 'k.A.')
branche_result = evaluate_branche_chatgpt(crm_branche, beschreibung_branche, wiki_branche, wiki_kategorien)
branche_v_range = f"V{row_num}"
branche_w_range = f"W{row_num}"
branche_x_range = f"X{row_num}"
self.sheet_handler.sheet.update(values=[[branche_result["branch"]]], range_name=branche_v_range)
self.sheet_handler.sheet.update(values=[[branche_result["consistency"]]], range_name=branche_w_range)
self.sheet_handler.sheet.update(values=[[branche_result["justification"]]], range_name=branche_x_range)
# FSM-Eignungsprüfung (Spalte Y/Z)
fsm_result = evaluate_fsm_suitability(company_name, company_data)
self.sheet_handler.sheet.update(values=[[fsm_result["suitability"]]], range_name=f"Y{row_num}")
self.sheet_handler.sheet.update(values=[[fsm_result["justification"]]], range_name=f"Z{row_num}")
# Servicetechniker-Schätzung (Spalte AD) und Vergleich (Spalte AE)
st_estimate = evaluate_servicetechnicians_estimate(company_name, company_data)
self.sheet_handler.sheet.update(values=[[st_estimate]], range_name=f"AD{row_num}")
internal_value = row_data[7] if len(row_data) > 7 else "k.A."
internal_category = map_internal_technicians(internal_value) if internal_value != "k.A." else "k.A."
if internal_category != "k.A." and st_estimate != internal_category:
# Hole detaillierte Erklärung von ChatGPT, warum die Schätzung so ist.
explanation = evaluate_servicetechnicians_explanation(company_name, st_estimate, company_data)
discrepancy = explanation
else:
discrepancy = "ok"
self.sheet_handler.sheet.update(values=[[discrepancy]], range_name=f"AE{row_num}")
# Spalten AF und AG sollen "XX" enthalten
self.sheet_handler.sheet.update(values=[["XX"]], range_name="AF" + str(row_num))
self.sheet_handler.sheet.update(values=[["XX"]], range_name="AG" + str(row_num))
# Timestamp und Version
current_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.sheet_handler.sheet.update(values=[[current_dt]], range_name=dt_range)
self.sheet_handler.sheet.update(values=[[Config.VERSION]], range_name=ver_range)
debug_print(f"✅ Aktualisiert: URL: {company_data.get('url', 'k.A.')}, "
f"Branche: {company_data.get('branche', 'k.A.')}, Umsatz-Abgleich: {abgleich_result}, "
f"Validierung: {valid_result}, Branchenvorschlag: {branche_result['branch']}, "
f"FSM: {fsm_result['suitability']}, Servicetechniker-Schätzung: {st_estimate}")
time.sleep(Config.RETRY_DELAY)
if __name__ == "__main__":
mode_input = input("Wählen Sie den Modus: 1 für normalen Modus, 2 für Re-Evaluierungsmodus, 3 für Alignment-Demo: ").strip()
if mode_input == "2":
MODE = "2"
elif mode_input == "3":
MODE = "3"
else:
MODE = "1"
if MODE == "1":
try:
num_rows = int(input("Wieviele Zeilen sollen überprüft werden? "))
except Exception as e:
print("Ungültige Eingabe. Bitte eine Zahl eingeben.")
exit(1)
else:
num_rows = None
processor = DataProcessor()
processor.process_rows(num_rows)
print(f"\n✅ Wikipedia-Auswertung abgeschlossen ({Config.VERSION})")