Files
Brancheneinstufung2/brancheneinstufung.py
2025-04-07 19:01:03 +00:00

749 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
import re
import gspread
import wikipedia
import requests
import openai
from bs4 import BeautifulSoup
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime
from difflib import SequenceMatcher
import unicodedata
import csv
try:
import tiktoken
except ImportError:
tiktoken = None
# ==================== KONFIGURATION ====================
class Config:
VERSION = "v1.4.0" # Version 1.4.0
LANG = "de"
CREDENTIALS_FILE = "service_account.json"
SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo"
MAX_RETRIES = 3
RETRY_DELAY = 5
LOG_CSV = "gpt_antworten_log.csv"
SIMILARITY_THRESHOLD = 0.65
DEBUG = True
WIKIPEDIA_SEARCH_RESULTS = 5
HTML_PARSER = "html.parser"
BATCH_SIZE = 10
TOKEN_MODEL = "gpt-3.5-turbo"
# ==================== RETRY-DECORATOR ====================
def retry_on_failure(func):
def wrapper(*args, **kwargs):
for attempt in range(Config.MAX_RETRIES):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"⚠️ Fehler bei {func.__name__} (Versuch {attempt+1}): {str(e)[:100]}")
time.sleep(Config.RETRY_DELAY)
return None
return wrapper
# ==================== LOGGING & HELPER FUNCTIONS ====================
if not os.path.exists("Log"):
os.makedirs("Log")
timestamp = datetime.now().strftime('%d-%m-%Y_%H-%M')
version = Config.VERSION
modi_mapping = {"2": "ReEvaluierung", "3": "AlignmentDemo", "4": "WikiOnly", "5": "ChatGPTOnly", "51": "Verifizierung", "8": "BatchTokenCount"}
def debug_print(message):
if Config.DEBUG:
print(f"[DEBUG] {message}")
try:
with open(os.path.join("Log", f"{timestamp}_{version}.txt"), "a", encoding="utf-8") as f:
f.write(f"[DEBUG] {message}\n")
except Exception as e:
print(f"[DEBUG] Log-Schreibfehler: {e}")
def clean_text(text):
if not text:
return "k.A."
text = unicodedata.normalize("NFKC", str(text))
text = re.sub(r'\[\d+\]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text if text else "k.A."
def normalize_company_name(name):
if not name:
return ""
forms = [
r'gmbh', r'g\.m\.b\.h\.', r'ug', r'u\.g\.', r'ug \(haftungsbeschränkt\)',
r'u\.g\. \(haftungsbeschränkt\)', r'ag', r'a\.g\.', r'ohg', r'o\.h\.g\.',
r'kg', r'k\.g\.', r'gmbh & co\.?\s*kg', r'g\.m\.b\.h\. & co\.?\s*k\.g\.',
r'ag & co\.?\s*kg', r'a\.g\. & co\.?\s*k\.g\.', r'e\.k\.', r'e\.kfm\.',
r'e\.kfr\.', r'ltd\.', r'ltd & co\.?\s*kg', r's\.a r\.l\.', r'stiftung',
r'genossenschaft', r'ggmbh', r'gug', r'partg', r'partgmbb', r'kgaa', r'se',
r'og', r'o\.g\.', r'e\.u\.', r'ges\.n\.b\.r\.', r'genmbh', r'verein',
r'kollektivgesellschaft', r'kommanditgesellschaft', r'einzelfirma', r'sàrl',
r'sa', r'sagl', r'gmbh & co\.?\s*ohg', r'ag & co\.?\s*ohg', r'gmbh & co\.?\s*kgaa',
r'ag & co\.?\s*kgaa', r's\.a\.', r's\.p\.a\.', r'b\.v\.', r'n\.v\.'
]
pattern = r'\b(' + '|'.join(forms) + r')\b'
normalized = re.sub(pattern, '', name, flags=re.IGNORECASE)
normalized = re.sub(r'[\-]', ' ', normalized)
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized.lower()
def extract_numeric_value(raw_value, is_umsatz=False):
raw_value = raw_value.strip()
if not raw_value:
return "k.A."
raw_value = re.sub(r'\b(ca\.?|circa|über)\b', '', raw_value, flags=re.IGNORECASE)
raw = raw_value.lower().replace("\xa0", " ")
match = re.search(r'([\d.,]+)', raw, flags=re.UNICODE)
if not match or not match.group(1).strip():
debug_print(f"Keine numerischen Zeichen gefunden im Rohtext: '{raw_value}'")
return "k.A."
num_str = match.group(1)
if ',' in num_str:
num_str = num_str.replace('.', '').replace(',', '.')
try:
num = float(num_str)
except Exception as e:
debug_print(f"Fehler bei der Umwandlung von '{num_str}' (Rohtext: '{raw_value}'): {e}")
return raw_value
else:
num_str = num_str.replace(' ', '').replace('.', '')
try:
num = float(num_str)
except Exception as e:
debug_print(f"Fehler bei der Umwandlung von '{num_str}' (Rohtext: '{raw_value}'): {e}")
return raw_value
if is_umsatz:
if "mrd" in raw or "milliarden" in raw:
num *= 1000
elif "mio" in raw or "millionen" in raw:
pass
else:
num /= 1e6
return str(int(round(num)))
else:
return str(int(round(num)))
def compare_umsatz_values(crm, wiki):
debug_print(f"Vergleich CRM Umsatz: '{crm}' mit Wikipedia Umsatz: '{wiki}'")
try:
crm_val = float(crm)
wiki_val = float(wiki)
except Exception as e:
debug_print(f"Fehler beim Umwandeln der Werte: CRM='{crm}', Wiki='{wiki}': {e}")
return "Daten unvollständig"
if crm_val == 0:
return "CRM Umsatz 0"
diff = abs(crm_val - wiki_val) / crm_val
if diff < 0.1:
return "OK"
else:
diff_mio = abs(crm_val - wiki_val)
return f"Abweichung: {int(round(diff_mio))} Mio €"
def map_internal_technicians(value):
try:
num = int(value)
except Exception:
return "k.A."
if num < 50:
return "<50 Techniker"
elif num < 100:
return ">100 Techniker"
elif num < 200:
return ">200 Techniker"
else:
return ">500 Techniker"
# ==================== CHATGPT CALL WRAPPERS ====================
def chatgpt_call(prompt, input_text):
try:
response = openai.ChatCompletion.create(
model=Config.TOKEN_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.0
)
result = response.choices[0].message.content.strip()
tokens = 0
if tiktoken:
try:
enc = tiktoken.encoding_for_model(Config.TOKEN_MODEL)
tokens = len(enc.encode(prompt))
except Exception as e:
debug_print(f"Tokenzählung fehlgeschlagen: {e}")
return result, tokens
except Exception as e:
debug_print(f"Fehler im chatgpt_call: {e}")
return "k.A.", 0
def safe_chatgpt_call(prompt, input_text, wiki_value):
if wiki_value == "k.A.":
return "Skipped (k.A.)", 0
return chatgpt_call(prompt, input_text)
# ==================== PROMPT ÜBERSICHT ====================
def prompt_overview():
data = [
["Funktion", "Prompt"],
["Wiki Artikel Vorschlag", "Bitte schlage einen passenden Wikipedia-Artikel für das folgende Unternehmen vor: {company_name}"],
["Begründung Wiki Inkonsistenz", "Bitte begründe, warum die Angaben aus Wikipedia von den folgenden CRM-Daten abweichen: {crm_data}"],
["Mitarbeiter Schätzung", "Wie viele Mitarbeiter hat das folgende Unternehmen? Kontext: {wiki_text}"],
["Mitarbeiter Konsistenz", "Vergleiche die Mitarbeiterzahlen aus CRM ({crm_value}) und Wikipedia ({wiki_value}). Gibt es Abweichungen? Welche Zahl erscheint plausibler?"],
["Umsatz Schätzung", "Wie hoch ist der geschätzte Jahresumsatz des Unternehmens basierend auf den folgenden Angaben? Kontext: {crm_data}, Wikipedia: {wiki_text}"],
["Branchenvorschlag", "Welcher Branche gehört das folgende Unternehmen an? Kontext: {wiki_text}, {crm_data}"]
]
return data
def print_prompt_overview():
print("----- Prompt Übersicht -----")
prompts = prompt_overview()
for row in prompts:
print(f"{row[0]}: {row[1]}")
print("-----------------------------\n")
# ==================== INITIALISIERUNG DES LOGS ====================
def initialize_log(modus):
filename = f"{timestamp}_{version}_{modi_mapping.get(modus, f'Modus{modus}')}.txt"
logfile = open(filename, "w", encoding="utf-8")
logfile.write(f"Modus: {modus} - {modi_mapping.get(modus, f'Modus{modus}')}\n")
logfile.write(f"Version: {version}\n")
logfile.write(f"Timestamp: {timestamp}\n\n")
print_prompt_overview() # Ausgabe der Prompt Übersicht
return logfile
# ==================== WIKIPEDIA SCRAPER ====================
class WikipediaScraper:
def __init__(self):
wikipedia.set_lang(Config.LANG)
def _get_full_domain(self, website):
if not website:
return ""
website = website.lower().strip()
website = re.sub(r'^https?:\/\/', '', website)
website = re.sub(r'^www\.', '', website)
return website.split('/')[0]
def _generate_search_terms(self, company_name, website):
terms = []
full_domain = self._get_full_domain(website)
if full_domain:
terms.append(full_domain)
normalized_name = normalize_company_name(company_name)
candidate = " ".join(normalized_name.split()[:2]).strip()
if candidate and candidate not in terms:
terms.append(candidate)
if normalized_name and normalized_name not in terms:
terms.append(normalized_name)
debug_print(f"Generierte Suchbegriffe: {terms}")
return terms
def _validate_article(self, page, company_name, website):
full_domain = self._get_full_domain(website)
domain_found = False
if full_domain:
try:
html_raw = requests.get(page.url).text
soup = BeautifulSoup(html_raw, Config.HTML_PARSER)
infobox = soup.find('table', class_=lambda c: c and 'infobox' in c.lower())
if infobox:
links = infobox.find_all('a', href=True)
for link in links:
href = link.get('href').lower()
if href.startswith('/wiki/datei:'):
continue
if full_domain in href:
debug_print(f"Definitiver Link-Match in Infobox gefunden: {href}")
domain_found = True
break
if not domain_found and hasattr(page, 'externallinks'):
for ext_link in page.externallinks:
if full_domain in ext_link.lower():
debug_print(f"Definitiver Link-Match in externen Links gefunden: {ext_link}")
domain_found = True
break
except Exception as e:
debug_print(f"Fehler beim Extrahieren von Links: {str(e)}")
normalized_title = normalize_company_name(page.title)
normalized_company = normalize_company_name(company_name)
similarity = SequenceMatcher(None, normalized_title, normalized_company).ratio()
debug_print(f"Ähnlichkeit (normalisiert): {similarity:.2f} ({normalized_title} vs {normalized_company})")
threshold = 0.60 if domain_found else Config.SIMILARITY_THRESHOLD
return similarity >= threshold
def extract_first_paragraph(self, page_url):
try:
response = requests.get(page_url)
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
paragraphs = soup.find_all('p')
for p in paragraphs:
text = clean_text(p.get_text())
if len(text) > 50:
return text
return "k.A."
except Exception as e:
debug_print(f"Fehler beim Extrahieren des ersten Absatzes: {e}")
return "k.A."
def extract_categories(self, soup):
cat_div = soup.find('div', id="mw-normal-catlinks")
if cat_div:
ul = cat_div.find('ul')
if ul:
cats = [clean_text(li.get_text()) for li in ul.find_all('li')]
return ", ".join(cats)
return "k.A."
def _extract_infobox_value(self, soup, target):
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
if not infobox:
return "k.A."
keywords_map = {
'branche': ['branche', 'industrie', 'tätigkeit', 'geschäftsfeld', 'sektor', 'produkte', 'leistungen', 'aktivitäten', 'wirtschaftszweig'],
'umsatz': ['umsatz', 'jahresumsatz', 'konzernumsatz', 'gesamtumsatz', 'erlöse', 'umsatzerlöse', 'einnahmen', 'ergebnis', 'jahresergebnis'],
'mitarbeiter': ['mitarbeiter', 'beschäftigte', 'personal', 'mitarbeiterzahl', 'angestellte', 'belegschaft', 'personalstärke']
}
keywords = keywords_map.get(target, [])
for row in infobox.find_all('tr'):
header = row.find('th')
if header:
header_text = clean_text(header.get_text()).lower()
if any(kw in header_text for kw in keywords):
value = row.find('td')
if value:
raw_value = clean_text(value.get_text())
if target == 'branche':
clean_val = re.sub(r'\[.*?\]|\(.*?\)', '', raw_value)
return ' '.join(clean_val.split()).strip()
if target == 'umsatz':
return extract_numeric_value(raw_value, is_umsatz=True)
if target == 'mitarbeiter':
return extract_numeric_value(raw_value, is_umsatz=False)
return "k.A."
def extract_full_infobox(self, soup):
infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen']))
if not infobox:
return "k.A."
return clean_text(infobox.get_text(separator=' | '))
def extract_fields_from_infobox_text(self, infobox_text, field_names):
result = {}
tokens = [token.strip() for token in infobox_text.split("|") if token.strip()]
for i, token in enumerate(tokens):
for field in field_names:
if field.lower() in token.lower():
j = i + 1
while j < len(tokens) and not tokens[j]:
j += 1
result[field] = tokens[j] if j < len(tokens) else "k.A."
return result
def extract_company_data(self, page_url):
if not page_url:
return {
'url': 'k.A.',
'first_paragraph': 'k.A.',
'branche': 'k.A.',
'umsatz': 'k.A.',
'mitarbeiter': 'k.A.',
'categories': 'k.A.',
'full_infobox': 'k.A.'
}
try:
response = requests.get(page_url)
soup = BeautifulSoup(response.text, Config.HTML_PARSER)
full_infobox = self.extract_full_infobox(soup)
extracted_fields = self.extract_fields_from_infobox_text(full_infobox, ['Branche', 'Umsatz', 'Mitarbeiter'])
raw_branche = extracted_fields.get('Branche', self._extract_infobox_value(soup, 'branche'))
raw_umsatz = extracted_fields.get('Umsatz', self._extract_infobox_value(soup, 'umsatz'))
raw_mitarbeiter = extracted_fields.get('Mitarbeiter', self._extract_infobox_value(soup, 'mitarbeiter'))
umsatz_val = extract_numeric_value(raw_umsatz, is_umsatz=True)
mitarbeiter_val = extract_numeric_value(raw_mitarbeiter, is_umsatz=False)
categories_val = self.extract_categories(soup)
first_paragraph = self.extract_first_paragraph(page_url)
return {
'url': page_url,
'first_paragraph': first_paragraph,
'branche': raw_branche,
'umsatz': umsatz_val,
'mitarbeiter': mitarbeiter_val,
'categories': categories_val,
'full_infobox': full_infobox
}
except Exception as e:
debug_print(f"Extraktionsfehler: {str(e)}")
return {
'url': 'k.A.',
'first_paragraph': 'k.A.',
'branche': 'k.A.',
'umsatz': 'k.A.',
'mitarbeiter': 'k.A.',
'categories': 'k.A.',
'full_infobox': 'k.A.'
}
@retry_on_failure
def search_company_article(self, company_name, website):
search_terms = self._generate_search_terms(company_name, website)
for term in search_terms:
try:
results = wikipedia.search(term, results=Config.WIKIPEDIA_SEARCH_RESULTS)
debug_print(f"Suchergebnisse für '{term}': {results}")
for title in results:
try:
page = wikipedia.page(title, auto_suggest=False)
if self._validate_article(page, company_name, website):
return page
except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e:
debug_print(f"Seitenfehler: {str(e)}")
continue
except Exception as e:
debug_print(f"Suchfehler: {str(e)}")
continue
return None
# ==================== GOOGLE SHEET HANDLER ====================
class GoogleSheetHandler:
def __init__(self):
self.sheet = None
self.sheet_values = []
self._connect()
def _connect(self):
scope = ["https://www.googleapis.com/auth/spreadsheets"]
creds = ServiceAccountCredentials.from_json_keyfile_name(Config.CREDENTIALS_FILE, scope)
self.sheet = gspread.authorize(creds).open_by_url(Config.SHEET_URL).sheet1
self.sheet_values = self.sheet.get_all_values()
def get_start_index(self):
filled_n = [row[13] if len(row) > 13 else '' for row in self.sheet_values[1:]]
return next((i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()), len(filled_n) + 1)
# ==================== ALIGNMENT DEMO ====================
def alignment_demo(sheet):
new_headers = [
[ # Spaltenname (Zeile 1)
"ReEval Flag", # A
"CRM Name", # B
"CRM Kurzform", # C
"CRM Website", # D
"CRM Ort", # E
"CRM Beschreibung", # F
"CRM Branche", # G
"CRM Beschreibung Branche extern", # H
"CRM Anzahl Techniker", # I
"CRM Umsatz", # J
"CRM Anzahl Mitarbeiter", # K
"CRM Vorschlag Wiki URL", # L
"Wiki URL", # M
"Wiki Absatz", # N
"Wiki Branche", # O
"Wiki Umsatz", # P
"Wiki Mitarbeiter", # Q
"Wiki Kategorien", # R
"Chat Wiki Konsistenzprüfung", # S
"Chat Begründung Wiki Inkonsistenz", # T
"Chat Vorschlag Wiki Artikel", # U
"Begründung bei Abweichung", # V
"Chat Vorschlag Branche", # W
"Chat Konsistenz Branche", # X
"Chat Begründung Abweichung Branche", # Y
"Chat Prüfung FSM Relevanz", # Z
"Chat Begründung für FSM Relevanz", # AA
"Chat Schätzung Anzahl Mitarbeiter", # AB
"Chat Konsistenzprüfung Mitarbeiterzahl", # AC
"Chat Begründung Abweichung Mitarbeiterzahl", # AD
"Chat Einschätzung Anzahl Servicetechniker", # AE
"Chat Begründung Abweichung Servicetechniker", # AF
"Chat Schätzung Umsatz", # AG
"Chat Begründung Abweichung Umsatz", # AH
"Linked Serviceleiter gefunden", # AI
"Linked It-Leiter gefunden", # AJ
"Linked Management gefunden", # AK
"Linked Disponent gefunden", # AL
"Contact Search Timestamp", # AM
"Wikipedia Timestamp", # AN
"Timestamp letzte Prüfung", # AO
"Version", # AP
"Tokens" # AQ
],
[ # Quelle der Daten
"CRM", "CRM", "CRM", "CRM", "CRM", "CRM", "CRM", "CRM", "CRM", "CRM",
"CRM", "CRM", "Wikipediascraper", "Wikipediascraper", "Wikipediascraper",
"Wikipediascraper", "Wikipediascraper", "Wikipediascraper", "Chat GPT API",
"Chat GPT API", "Chat GPT API", "Chat GPT API", "Chat GPT API", "Chat GPT API",
"Chat GPT API", "Chat GPT API", "Chat GPT API", "Chat GPT API", "Chat GPT API",
"Chat GPT API", "Chat GPT API", "Chat GPT API", "Chat GPT API", "LinkedIn (via SerpApi)",
"LinkedIn (via SerpApi)", "LinkedIn (via SerpApi)", "LinkedIn (via SerpApi)", "System",
"System", "System", "System", "System"
],
[ # Feldkategorie
"Prozess", "Firmenname", "Firmenname", "Website", "Ort", "Beschreibung (Text)",
"Branche", "Branche", "Anzahl Servicetechniker", "Umsatz", "Anzahl Mitarbeiter",
"Wikipedia Artikel URL", "Wikipedia Artikel", "Beschreibung (Text)", "Branche",
"Umsatz", "Anzahl Mitarbeiter", "Kategorien (Text)", "Verifizierung",
"Begründung bei Abweichung", "Wikipedia Artikel", "Wikipedia Artikel", "Branche",
"Branche", "Branche", "FSM Relevanz", "FSM Relevanz", "Anzahl Mitarbeiter",
"Anzahl Mitarbeiter", "Anzahl Mitarbeiter", "Anzahl Servicetechniker",
"Anzahl Servicetechniker", "Umsatz", "Umsatz", "Kontakte zur Firma",
"Kontakte zur Firma", "Kontakte zur Firma", "Kontakte zur Firma", "Timestamp",
"Timestamp", "Timestamp", "Version des Skripts die verwendet wurde", "ChatGPT Tokens"
],
[ # Kurzbeschreibung
"Systemspalte, irrelevant für den Prompt. Wird zur manuellen Neuprüfung genutzt.",
"Enthält den Firmennamen (CRM).", "Enthält die manuell gepflegte Kurzform.",
"Ermittelte Website des Unternehmens.", "Ermittelter Ort des Unternehmens.",
"Kurze Unternehmensbeschreibung.", "Aktuelle Branchenzuweisung (CRM).",
"Externe Branchenbeschreibung (z.B. von Dealfront).", "Recherchierte Anzahl Servicetechniker.",
"Recherchierter Umsatz in Mio. €.", "Recherchierte Anzahl Mitarbeiter.",
"Vorschlag für Wikipedia URL.", "Wikipedia URL aus laufender Suche.",
"Erster Absatz des Wikipedia-Artikels.", "Branche aus Wikipedia.",
"Umsatz laut Wikipedia.", "Mitarbeiterzahl laut Wikipedia.",
"Wikipedia Kategorien.", "\"OK\" oder \"X\" bei Wiki-Validierung.",
"Begründung bei Wiki Inkonsistenz.", "Neu recherchierter Wikipedia Artikel.",
"Begründung bei Abweichung.", "ChatGPT basierte Branchenzuordnung.",
"Vergleich CRM vs. ChatGPT Branche.", "Begründung bei Branchenabweichung.",
"Prüfung FSM-Relevanz (ChatGPT).", "Begründung zur FSM-Eignung.",
"Schätzung Mitarbeiterzahl (ChatGPT).", "Konsistenzprüfung Mitarbeiterzahl (ChatGPT).",
"Begründung bei Mitarbeiterabweichung.", "Schätzung Servicetechniker (ChatGPT).",
"Begründung bei Technikerabweichung.", "Schätzung Umsatz (ChatGPT).",
"Begründung bei Umsatzabweichung.", "LinkedIn Serviceleiter Kontakte.",
"LinkedIn IT-Leiter Kontakte.", "LinkedIn Management Kontakte.",
"LinkedIn Disponent Kontakte.", "Timestamp Kontaktsuche.",
"Timestamp Wikipedia-Abruf.", "Timestamp ChatGPT-Prüfung.",
"Skriptversion.", "Gesamte GPT-Tokens."
],
[ # Aufgabe / Funktion
"Datenquelle", "Datenquelle", "Datenquelle", "Datenquelle", "Datenquelle", "Datenquelle",
"Datenquelle", "Datenquelle", "Datenquelle", "Datenquelle", "Datenquelle", "Datenquelle",
"Wird durch Wikipedia Scraper bereitgestellt.", "Wird als Vergleich genutzt (Validierung).",
"Für finale Branchenermittlung.", "Zur Umsatzvalidierung.", "Zur Validierung der Mitarbeiterzahl.",
"Zur Kategorisierung des Wikipedia-Artikels.", "Prüfung, ob Wikipedia-Artikel passt.",
"Begründung bei Wiki Inkonsistenz.", "Neues Recherchieren, falls unpassend.", "Begründung bei Abweichung.",
"ChatGPT Branchenzuordnung.", "Vergleich CRM vs. ChatGPT Branche.", "Begründung bei Branchenabweichung.",
"Prüfung FSM-Relevanz (ChatGPT).", "Begründung zur FSM-Eignung.", "Schätzung Mitarbeiterzahl (ChatGPT).",
"Konsistenzprüfung Mitarbeiterzahl.", "Begründung bei Mitarbeiterabweichung.",
"Schätzung Servicetechniker (ChatGPT).", "Begründung bei Technikerabweichung.",
"Schätzung Umsatz (ChatGPT).", "Begründung bei Umsatzabweichung.",
"LinkedIn Suche Serviceleiter.", "LinkedIn Suche IT-Leiter.", "LinkedIn Suche Management.",
"LinkedIn Suche Disponent.", "Timestamp Kontaktsuche.", "Timestamp Wikipedia-Abruf.",
"Timestamp ChatGPT-Prüfung.", "Skriptversion.", "Gesamte GPT-Tokens."
]
]
header_range = "A1:AQ5"
sheet.update(values=new_headers, range_name=header_range)
print("Alignment-Demo abgeschlossen: Neues Spaltenschema in Zeilen A1 bis AQ5 geschrieben.")
def alignment_demo_full():
alignment_demo(GoogleSheetHandler().sheet)
gc = gspread.authorize(ServiceAccountCredentials.from_json_keyfile_name(
Config.CREDENTIALS_FILE, ["https://www.googleapis.com/auth/spreadsheets"]))
sh = gc.open_by_url(Config.SHEET_URL)
try:
contacts_sheet = sh.worksheet("Contacts")
except gspread.exceptions.WorksheetNotFound:
contacts_sheet = sh.add_worksheet(title="Contacts", rows="1000", cols="10")
header = ["Firmenname", "Website", "Kurzform", "Vorname", "Nachname", "Position", "Anrede", "E-Mail"]
contacts_sheet.update(values=[header], range_name="A1:H1")
debug_print("Neues Blatt 'Contacts' erstellt und Header eingetragen.")
alignment_demo(contacts_sheet)
debug_print("Alignment-Demo für Hauptblatt und Contacts abgeschlossen.")
# ==================== DATA PROCESSOR ====================
class DataProcessor:
def __init__(self):
self.sheet_handler = GoogleSheetHandler()
self.wiki_scraper = WikipediaScraper()
def process_rows(self, num_rows=None):
MODE = "default" # Ersetze mit tatsächlicher Moduswahl
if MODE == "2":
print("Re-Evaluierungsmodus: Verarbeitung aller Zeilen mit 'x' in Spalte A.")
for i, row in enumerate(self.sheet_handler.sheet_values[1:], start=2):
if row[0].strip().lower() == "x":
self._process_single_row(i, row)
elif MODE == "3":
print("Alignment-Demo-Modus: Schreibe neue Spaltenüberschriften in Hauptblatt und Contacts.")
alignment_demo_full()
elif MODE == "4":
for i, row in enumerate(self.sheet_handler.sheet_values[1:], start=2):
if len(row) <= 39 or row[39].strip() == "":
self._process_single_row(i, row, process_wiki=True, process_chatgpt=False)
elif MODE == "5":
for i, row in enumerate(self.sheet_handler.sheet_values[1:], start=2):
if len(row) <= 40 or row[40].strip() == "":
self._process_single_row(i, row, process_wiki=False, process_chatgpt=True)
elif MODE == "51":
for i, row in enumerate(self.sheet_handler.sheet_values[1:], start=2):
if len(row) <= 25 or row[24].strip() == "":
self._process_verification_row(i, row)
elif MODE == "8":
process_batch_token_count()
else:
start_index = self.sheet_handler.get_start_index()
print(f"Starte bei Zeile {start_index+1}")
rows_processed = 0
for i, row in enumerate(self.sheet_handler.sheet_values[1:], start=2):
if i < start_index:
continue
if num_rows is not None and rows_processed >= num_rows:
break
self._process_single_row(i, row)
rows_processed += 1
def _process_single_row(self, row_num, row_data, process_wiki=True, process_chatgpt=True):
total_tokens = 0
company_name = row_data[1] if len(row_data) > 1 else ""
website = row_data[3] if len(row_data) > 3 else ""
# Default-Initialisierung für company_data, falls Wiki-Auswertung übersprungen wird
company_data = {
'url': 'k.A.',
'first_paragraph': 'k.A.',
'branche': 'k.A.',
'umsatz': 'k.A.',
'mitarbeiter': 'k.A.',
'categories': 'k.A.',
'full_infobox': 'k.A.'
}
# Wiki-Daten werden in Spalten L bis R abgelegt
wiki_update_range = f"L{row_num}:R{row_num}"
dt_wiki_range = f"AN{row_num}"
dt_chat_range = f"AO{row_num}"
ver_range = f"AP{row_num}"
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {row_num}: {company_name}")
current_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Wiki-Verarbeitung
if process_wiki:
if len(row_data) <= 39 or row_data[39].strip() == "":
if len(row_data) > 10 and row_data[10].strip() not in ["", "k.A."]:
wiki_url = row_data[10].strip()
try:
company_data = self.wiki_scraper.extract_company_data(wiki_url)
except Exception as e:
debug_print(f"Fehler beim Laden des vorgeschlagenen Wikipedia-Artikels: {e}")
article = self.wiki_scraper.search_company_article(company_name, website)
company_data = self.wiki_scraper.extract_company_data(article.url) if article else {
'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.',
'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.',
'full_infobox': 'k.A.'
}
else:
wiki_url = "k.A."
article = self.wiki_scraper.search_company_article(company_name, website)
company_data = self.wiki_scraper.extract_company_data(article.url) if article else {
'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.',
'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.',
'full_infobox': 'k.A.'
}
wiki_values = [
row_data[10] if len(row_data) > 10 and row_data[10].strip() not in ["", "k.A."] else "k.A.",
company_data.get('url', 'k.A.'),
company_data.get('first_paragraph', 'k.A.'),
company_data.get('branche', 'k.A.'),
company_data.get('umsatz', 'k.A.'),
company_data.get('mitarbeiter', 'k.A.'),
company_data.get('categories', 'k.A.')
]
self.sheet_handler.sheet.update(values=[wiki_values], range_name=wiki_update_range)
self.sheet_handler.sheet.update(values=[[current_dt]], range_name=dt_wiki_range)
else:
debug_print(f"Zeile {row_num}: Wikipedia-Timestamp bereits gesetzt überspringe Wiki-Auswertung.")
# ChatGPT-Verarbeitung
if process_chatgpt:
crm_umsatz = row_data[9] if len(row_data) > 9 else "k.A."
consistency_result = compare_umsatz_values(crm_umsatz, company_data.get('umsatz', 'k.A.'))
self.sheet_handler.sheet.update(values=[[consistency_result]], range_name=f"S{row_num}")
crm_data = ";".join(row_data[1:10])
wiki_data_str = ";".join(row_data[11:18])
prompt = ("Bitte überprüfe, ob die folgenden beiden Datensätze grundsätzlich zum gleichen Unternehmen gehören. "
f"CRM-Daten: {crm_data} | Wikipedia-Daten: {wiki_data_str}")
valid_result, tokens = safe_chatgpt_call(prompt, crm_data + " " + wiki_data_str, row_data[10] if len(row_data) > 10 else "k.A.")
total_tokens += tokens
if valid_result.strip().upper() == "OK":
wiki_consistency = "OK"
wiki_article_suggestion = ""
else:
wiki_consistency = "X"
wiki_article_suggestion = valid_result
self.sheet_handler.sheet.update(values=[[wiki_consistency]], range_name=f"T{row_num}")
self.sheet_handler.sheet.update(values=[[wiki_article_suggestion]], range_name=f"U{row_num}")
prompt_fsm = f"Bitte bewerte, ob das Unternehmen '{company_name}' für den Einsatz einer Field Service Management Lösung geeignet ist. Antworte mit 'Ja' oder 'Nein' und begründe kurz."
fsm_result, tokens = safe_chatgpt_call(prompt_fsm, company_name, row_data[10] if len(row_data) > 10 else "k.A.")
total_tokens += tokens
parts = fsm_result.split("-", 1)
fsm_suitability = parts[0].strip() if parts else fsm_result
fsm_justification = parts[1].strip() if len(parts) > 1 else ""
self.sheet_handler.sheet.update(values=[[fsm_suitability]], range_name=f"Z{row_num}")
self.sheet_handler.sheet.update(values=[[fsm_justification]], range_name=f"AA{row_num}")
prompt_st = f"Bitte schätze die Anzahl der Servicetechniker für das Unternehmen '{company_name}' ein. Antwortoptionen: '<50 Techniker', '>100 Techniker', '>200 Techniker', '>500 Techniker'."
st_estimate, tokens = safe_chatgpt_call(prompt_st, company_name, row_data[10] if len(row_data) > 10 else "k.A.")
total_tokens += tokens
self.sheet_handler.sheet.update(values=[[st_estimate]], range_name=f"AE{row_num}")
internal_value = row_data[7] if len(row_data) > 7 else "k.A."
internal_category = map_internal_technicians(internal_value) if internal_value != "k.A." else "k.A."
if internal_category != "k.A." and st_estimate != internal_category:
prompt_st_expl = f"Bitte erkläre, warum du für das Unternehmen '{company_name}' die Anzahl der Servicetechniker als '{st_estimate}' geschätzt hast."
st_explanation, tokens = safe_chatgpt_call(prompt_st_expl, company_name, row_data[10] if len(row_data) > 10 else "k.A.")
total_tokens += tokens
technician_explanation = st_explanation
else:
technician_explanation = "ok"
self.sheet_handler.sheet.update(values=[[technician_explanation]], range_name=f"AF{row_num}")
crm_mitarbeiter = row_data[10] if len(row_data) > 10 else "k.A."
wiki_mitarbeiter = company_data.get('mitarbeiter', "k.A.")
try:
crm_emp = float(crm_mitarbeiter)
wiki_emp = float(wiki_mitarbeiter)
diff = abs(crm_emp - wiki_emp) / ((crm_emp + wiki_emp) / 2) * 100
reason = "Beide Werte ähnlich" if diff < 30 else "Signifikante Abweichung"
mitarbeiter_result = f"CRM: {crm_mitarbeiter}, Wikipedia: {wiki_mitarbeiter}, Differenz: {diff:.2f}%, Einschätzung: {reason}"
except Exception as e:
mitarbeiter_result = "k.A."
self.sheet_handler.sheet.update(values=[[mitarbeiter_result]], range_name=f"AB{row_num}")
prompt_umsatz = f"Bitte schätze den Jahresumsatz (in Mio. €) für das Unternehmen '{company_name}' ein basierend auf den Daten: CRM: {crm_umsatz}, Wikipedia: {company_data.get('umsatz', 'k.A.')}. Antworte nur mit der Zahl."
umsatz_estimate, tokens = safe_chatgpt_call(prompt_umsatz, company_name, row_data[10] if len(row_data) > 10 else "k.A.")
total_tokens += tokens
self.sheet_handler.sheet.update(values=[[umsatz_estimate]], range_name=f"AG{row_num}")
self.sheet_handler.sheet.update(values=[[str(total_tokens)]], range_name=f"AQ{row_num}")
self.sheet_handler.sheet.update(values=[[current_dt]], range_name=dt_chat_range)
self.sheet_handler.sheet.update(values=[[current_dt]], range_name=ver_range)
self.sheet_handler.sheet.update(values=[[Config.VERSION]], range_name=ver_range)
debug_print(f"Zeile {row_num} verifiziert: URL: {company_data.get('url', 'k.A.')}, Branche: {company_data.get('branche', 'k.A.')}")
time.sleep(Config.RETRY_DELAY)
# ==================== NEUER MODUS: CONTACT RESEARCH (via SerpAPI) ====================
def process_contact_research():
debug_print("Starte Contact Research (Modus 6)...")
gc = gspread.authorize(ServiceAccountCredentials.from_json_keyfile_name(
Config.CREDENTIALS_FILE, ["https://www.googleapis.com/auth/spreadsheets"]))
sh = gc.open_by_url(Config.SHEET_URL)
main_sheet = sh.sheet1
data = main_sheet.get_all_values()
for i, row in enumerate(data[1:], start=2):
company_name = row[1] if len(row) > 1 else ""
search_name = row[2].strip() if len(row) > 2 and row[2].strip() not in ["", "k.A."] else company_name
website = row[3] if len(row) > 3 else ""
if not company_name or not website:
continue
count_service = count_linkedin_contacts(search_name, website, "Serviceleiter")
count_it = count_linkedin_contacts(search_name, website, "IT-Leiter")
count_management = count_linkedin_contacts(search_name, website, "Geschäftsführer")
count_disponent = count_linkedin_contacts(search_name, website, "Disponent")
current_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
main_sheet.update(values=[[str(count_service)]], range_name=f"AI{i}")
main_sheet.update(values=[[str(count_it)]], range_name=f"AJ{i)")
main_sheet.update(values=[[str(count_management)]], range_name=f"AK{i}")
main_sheet.update(values=[[str(count_disponent)]], range_name=f"AL{i}")
main_sheet.update(values=[[current_dt]], range_name=f"AM{i}")
debug_print(f"Zeile {i}: Serviceleiter {count_service}, IT-Leiter {count_it}, Management {count_management}, Disponent {count_disponent} Contact Search Timestamp gesetzt.")
time.sleep(Config.RETRY_DELAY * 1.5)
debug_print("Contact Research abgeschlossen.")
# ==================== NEUER MODUS: CONTACTS (LinkedIn) ====================
def process_contacts():
debug_print("Starte LinkedIn-Kontaktsuche...")
gc = gspread.authorize(ServiceAccountCredentials.from_json_keyfile_name(
Config.CREDENTIALS_FILE, ["https://www.googleapis.com/auth/spreadsheets"]))
sh = gc.open_by_url(Config.SHEET_URL)
try:
contacts_sheet = sh.worksheet("Contacts")
except gspread.exceptions.WorksheetNotFound:
contacts_sheet = sh.add_worksheet(title="Contacts", rows="1000", cols="10")
header = ["Firmenname", "Website", "Kurzform", "Vorname", "Nachname", "Position", "Anrede", "E-Mail"]
contacts_sheet.update(values=[header], range_name="A1:H1")
debug_print("Neues Blatt 'Contacts' erstellt und Header eingetragen.")
main_sheet = sh.sheet1
data = main_sheet.get_all_values()
positions = ["Serviceleiter", "IT-Leiter", "Leiter After Sales", "Leiter Einsatzplanung"]
# Fortsetzung der Kontaktsuche...
debug_print("LinkedIn-Kontaktsuche abgeschlossen.")