import os import time import re import gspread import wikipedia import requests import openai from bs4 import BeautifulSoup from oauth2client.service_account import ServiceAccountCredentials from datetime import datetime from difflib import SequenceMatcher import unicodedata import csv # ==================== KONFIGURATION ==================== class Config: VERSION = "v1.2.4" # v1.2.4: Integriert ChatGPT API zur Umsatzbewertung, neuer Vergleich CRM vs. Wikipedia Umsatz LANG = "de" CREDENTIALS_FILE = "service_account.json" SHEET_URL = "https://docs.google.com/spreadsheets/d/1u_gHr9JUfmV1-iviRzbSe3575QEp7KLhK5jFV_gJcgo" MAX_RETRIES = 3 RETRY_DELAY = 5 LOG_CSV = "gpt_antworten_log.csv" SIMILARITY_THRESHOLD = 0.65 DEBUG = True WIKIPEDIA_SEARCH_RESULTS = 5 HTML_PARSER = "html.parser" # ==================== HELPER FUNCTIONS ==================== def retry_on_failure(func): def wrapper(*args, **kwargs): for attempt in range(Config.MAX_RETRIES): try: return func(*args, **kwargs) except Exception as e: print(f"⚠️ Fehler bei {func.__name__} (Versuch {attempt+1}): {str(e)[:100]}") time.sleep(Config.RETRY_DELAY) return None return wrapper def debug_print(message): if Config.DEBUG: print(f"[DEBUG] {message}") def clean_text(text): if not text: return "k.A." text = unicodedata.normalize("NFKC", str(text)) text = re.sub(r'\[\d+\]', '', text) text = re.sub(r'\s+', ' ', text).strip() return text if text else "k.A." def normalize_company_name(name): if not name: return "" forms = [ r'gmbh', r'g\.m\.b\.h\.', r'ug', r'u\.g\.', r'ug \(haftungsbeschränkt\)', r'u\.g\. \(haftungsbeschränkt\)', r'ag', r'a\.g\.', r'ohg', r'o\.h\.g\.', r'kg', r'k\.g\.', r'gmbh & co\.?\s*kg', r'g\.m\.b\.h\. & co\.?\s*k\.g\.', r'ag & co\.?\s*kg', r'a\.g\. & co\.?\s*k\.g\.', r'e\.k\.', r'e\.kfm\.', r'e\.kfr\.', r'ltd\.', r'ltd & co\.?\s*kg', r's\.a r\.l\.', r'stiftung', r'genossenschaft', r'ggmbh', r'gug', r'partg', r'partgmbb', r'kgaa', r'se', r'og', r'o\.g\.', r'e\.u\.', r'ges\.n\.b\.r\.', r'genmbh', r'verein', r'kollektivgesellschaft', r'kommanditgesellschaft', r'einzelfirma', r'sàrl', r'sa', r'sagl', r'gmbh & co\.?\s*ohg', r'ag & co\.?\s*ohg', r'gmbh & co\.?\s*kgaa', r'ag & co\.?\s*kgaa', r's\.a\.', r's\.p\.a\.', r'b\.v\.', r'n\.v\.' ] pattern = r'\b(' + '|'.join(forms) + r')\b' normalized = re.sub(pattern, '', name, flags=re.IGNORECASE) normalized = re.sub(r'[\-–]', ' ', normalized) normalized = re.sub(r'\s+', ' ', normalized).strip() return normalized.lower() def extract_numeric_value(raw_value, is_umsatz=False): raw_value = raw_value.strip() if not raw_value: return "k.A." raw = raw_value.lower().replace("\xa0", " ") match = re.search(r'([\d.,]+)', raw, flags=re.UNICODE) if not match or not match.group(1).strip(): debug_print(f"Keine numerischen Zeichen gefunden im Rohtext: '{raw_value}'") return "k.A." num_str = match.group(1) if ',' in num_str: num_str = num_str.replace('.', '').replace(',', '.') try: num = float(num_str) except Exception as e: debug_print(f"Fehler bei der Umwandlung von '{num_str}' (Rohtext: '{raw_value}'): {e}") return raw_value else: num_str = num_str.replace(' ', '').replace('.', '') try: num = float(num_str) except Exception as e: debug_print(f"Fehler bei der Umwandlung von '{num_str}' (Rohtext: '{raw_value}'): {e}") return raw_value if is_umsatz: if "mrd" in raw or "milliarden" in raw: num *= 1000 elif "mio" in raw or "millionen" in raw: pass else: num /= 1e6 return str(int(round(num))) else: return str(int(round(num))) def compare_umsatz_values(crm, wiki): try: crm_val = float(crm) wiki_val = float(wiki) except Exception: return "Daten unvollständig" if crm_val == 0: return "CRM Umsatz 0" diff = abs(crm_val - wiki_val) / crm_val if diff < 0.1: return "OK" else: diff_mio = abs(crm_val - wiki_val) return f"Abweichung: {int(round(diff_mio))} Mio €" def evaluate_umsatz_chatgpt(company_name, wiki_umsatz): try: with open("api_key.txt", "r") as f: api_key = f.read().strip() except Exception as e: debug_print(f"Fehler beim Lesen des API-Tokens: {e}") return "k.A." openai.api_key = api_key prompt = (f"Bitte schätze den Umsatz in Mio. Euro für das Unternehmen '{company_name}'. " f"Die Wikipedia-Daten zeigen: '{wiki_umsatz}'. " "Antworte nur mit der Zahl.") try: response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], temperature=0.0 ) result = response.choices[0].message.content.strip() try: value = float(result.replace(',', '.')) return str(int(round(value))) except Exception as conv_e: debug_print(f"Fehler bei der Verarbeitung der ChatGPT-Antwort '{result}': {conv_e}") return result except Exception as e: debug_print(f"Fehler beim Aufruf der ChatGPT API: {e}") return "k.A." # ==================== GOOGLE SHEET HANDLER ==================== class GoogleSheetHandler: def __init__(self): self.sheet = None self.sheet_values = [] self._connect() def _connect(self): scope = ["https://www.googleapis.com/auth/spreadsheets"] creds = ServiceAccountCredentials.from_json_keyfile_name(Config.CREDENTIALS_FILE, scope) self.sheet = gspread.authorize(creds).open_by_url(Config.SHEET_URL).sheet1 self.sheet_values = self.sheet.get_all_values() def get_start_index(self): filled_n = [row[13] if len(row) > 13 else '' for row in self.sheet_values[1:]] return next((i + 1 for i, v in enumerate(filled_n, start=1) if not str(v).strip()), len(filled_n) + 1) # ==================== ALIGNMENT DEMO (Modus 3) ==================== def alignment_demo(sheet): new_headers = [ "Spalte A (ReEval Flag)", "Spalte B (Firmenname)", "Spalte C (Website)", "Spalte D (Ort)", "Spalte E (Beschreibung)", "Spalte F (Aktuelle Branche)", "Spalte G (Beschreibung Branche extern)", "Spalte H (Anzahl Techniker CRM)", "Spalte I (Umsatz CRM)", "Spalte J (Anzahl Mitarbeiter CRM)", "Spalte K (Vorschlag Wiki URL)", "Spalte L (Wikipedia URL)", "Spalte M (Wikipedia Absatz)", "Spalte N (Wikipedia Branche)", "Spalte O (Wikipedia Umsatz)", "Spalte P (Wikipedia Mitarbeiter)", "Spalte Q (Wikipedia Kategorien)", "Spalte R (Konsistenzprüfung)", "Spalte S (Begründung bei Inkonsistenz)", "Spalte T (Vorschlag Wiki Artikel ChatGPT)", "Spalte U (Begründung bei Abweichung)", "Spalte V (Vorschlag neue Branche)", "Spalte W (Konsistenzprüfung Branche)", "Spalte X (Begründung Abweichung Branche)", "Spalte Y (FSM Relevanz Ja / Nein)", "Spalte Z (Begründung für FSM Relevanz)", "Spalte AA (Schätzung Anzahl Mitarbeiter)", "Spalte AB (Konsistenzprüfung Mitarbeiterzahl)", "Spalte AC (Begründung für Abweichung Mitarbeiterzahl)", "Spalte AD (Einschätzung Anzahl Servicetechniker)", "Spalte AE (Begründung bei Abweichung Anzahl Servicetechniker)", "Spalte AF (Schätzung Umsatz ChatGPT)", "Spalte AG (Begründung für Abweichung Umsatz)", "Spalte AH (Timestamp letzte Prüfung)", "Spalte AI (Version)" ] header_range = "A11200:AI11200" sheet.update(values=[new_headers], range_name=header_range) print("Alignment-Demo abgeschlossen: Neue Spaltenüberschriften in Zeile 11200 geschrieben.") # ==================== WIKIPEDIA SCRAPER ==================== class WikipediaScraper: def __init__(self): wikipedia.set_lang(Config.LANG) def _get_full_domain(self, website): if not website: return "" website = website.lower().strip() website = re.sub(r'^https?:\/\/', '', website) website = re.sub(r'^www\.', '', website) return website.split('/')[0] def _generate_search_terms(self, company_name, website): terms = [] full_domain = self._get_full_domain(website) if full_domain: terms.append(full_domain) normalized_name = normalize_company_name(company_name) candidate = " ".join(normalized_name.split()[:2]).strip() if candidate and candidate not in terms: terms.append(candidate) if normalized_name and normalized_name not in terms: terms.append(normalized_name) debug_print(f"Generierte Suchbegriffe: {terms}") return terms def _validate_article(self, page, company_name, website): full_domain = self._get_full_domain(website) domain_found = False if full_domain: try: html_raw = requests.get(page.url).text soup = BeautifulSoup(html_raw, Config.HTML_PARSER) infobox = soup.find('table', class_=lambda c: c and 'infobox' in c.lower()) if infobox: links = infobox.find_all('a', href=True) for link in links: href = link.get('href').lower() if href.startswith('/wiki/datei:'): continue if full_domain in href: debug_print(f"Definitiver Link-Match in Infobox gefunden: {href}") domain_found = True break if not domain_found and hasattr(page, 'externallinks'): for ext_link in page.externallinks: if full_domain in ext_link.lower(): debug_print(f"Definitiver Link-Match in externen Links gefunden: {ext_link}") domain_found = True break except Exception as e: debug_print(f"Fehler beim Extrahieren von Links: {str(e)}") normalized_title = normalize_company_name(page.title) normalized_company = normalize_company_name(company_name) similarity = SequenceMatcher(None, normalized_title, normalized_company).ratio() debug_print(f"Ähnlichkeit (normalisiert): {similarity:.2f} ({normalized_title} vs {normalized_company})") threshold = 0.60 if domain_found else Config.SIMILARITY_THRESHOLD return similarity >= threshold def extract_first_paragraph(self, page_url): try: response = requests.get(page_url) soup = BeautifulSoup(response.text, Config.HTML_PARSER) paragraphs = soup.find_all('p') for p in paragraphs: text = clean_text(p.get_text()) if len(text) > 50: return text return "k.A." except Exception as e: debug_print(f"Fehler beim Extrahieren des ersten Absatzes: {e}") return "k.A." def extract_categories(self, soup): cat_div = soup.find('div', id="mw-normal-catlinks") if cat_div: ul = cat_div.find('ul') if ul: cats = [clean_text(li.get_text()) for li in ul.find_all('li')] return ", ".join(cats) return "k.A." def _extract_infobox_value(self, soup, target): infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen'])) if not infobox: return "k.A." keywords_map = { 'branche': ['branche', 'industrie', 'tätigkeit', 'geschäftsfeld', 'sektor', 'produkte', 'leistungen', 'aktivitäten', 'wirtschaftszweig'], 'umsatz': ['umsatz', 'jahresumsatz', 'konzernumsatz', 'gesamtumsatz', 'erlöse', 'umsatzerlöse', 'einnahmen', 'ergebnis', 'jahresergebnis'], 'mitarbeiter': ['mitarbeiter', 'beschäftigte', 'personal', 'mitarbeiterzahl', 'angestellte', 'belegschaft', 'personalstärke'] } keywords = keywords_map.get(target, []) for row in infobox.find_all('tr'): header = row.find('th') if header: header_text = clean_text(header.get_text()).lower() if any(kw in header_text for kw in keywords): value = row.find('td') if value: raw_value = clean_text(value.get_text()) if target == 'branche': clean_val = re.sub(r'\[.*?\]|\(.*?\)', '', raw_value) return ' '.join(clean_val.split()).strip() if target == 'umsatz': return extract_numeric_value(raw_value, is_umsatz=True) if target == 'mitarbeiter': return extract_numeric_value(raw_value, is_umsatz=False) return "k.A." def extract_full_infobox(self, soup): infobox = soup.find('table', class_=lambda c: c and any(kw in c.lower() for kw in ['infobox', 'vcard', 'unternehmen'])) if not infobox: return "k.A." return clean_text(infobox.get_text(separator=' | ')) def extract_fields_from_infobox_text(self, infobox_text, field_names): result = {} tokens = [token.strip() for token in infobox_text.split("|") if token.strip()] for i, token in enumerate(tokens): for field in field_names: if field.lower() in token.lower(): j = i + 1 while j < len(tokens) and not tokens[j]: j += 1 result[field] = tokens[j] if j < len(tokens) else "k.A." return result def extract_company_data(self, page_url): if not page_url: return {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.', 'full_infobox': 'k.A.'} try: response = requests.get(page_url) soup = BeautifulSoup(response.text, Config.HTML_PARSER) full_infobox = self.extract_full_infobox(soup) extracted_fields = self.extract_fields_from_infobox_text(full_infobox, ['Branche', 'Umsatz', 'Mitarbeiter']) raw_branche = extracted_fields.get('Branche', self._extract_infobox_value(soup, 'branche')) raw_umsatz = extracted_fields.get('Umsatz', self._extract_infobox_value(soup, 'umsatz')) raw_mitarbeiter = extracted_fields.get('Mitarbeiter', self._extract_infobox_value(soup, 'mitarbeiter')) umsatz_val = extract_numeric_value(raw_umsatz, is_umsatz=True) mitarbeiter_val = extract_numeric_value(raw_mitarbeiter, is_umsatz=False) categories_val = self.extract_categories(soup) first_paragraph = self.extract_first_paragraph(page_url) return { 'url': page_url, 'first_paragraph': first_paragraph, 'branche': raw_branche, 'umsatz': umsatz_val, 'mitarbeiter': mitarbeiter_val, 'categories': categories_val, 'full_infobox': full_infobox } except Exception as e: debug_print(f"Extraktionsfehler: {str(e)}") return {'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.', 'full_infobox': 'k.A.'} @retry_on_failure def search_company_article(self, company_name, website): search_terms = self._generate_search_terms(company_name, website) for term in search_terms: try: results = wikipedia.search(term, results=Config.WIKIPEDIA_SEARCH_RESULTS) debug_print(f"Suchergebnisse für '{term}': {results}") for title in results: try: page = wikipedia.page(title, auto_suggest=False) if self._validate_article(page, company_name, website): return page except (wikipedia.exceptions.DisambiguationError, wikipedia.exceptions.PageError) as e: debug_print(f"Seitenfehler: {str(e)}") continue except Exception as e: debug_print(f"Suchfehler: {str(e)}") continue return None # ==================== DATA PROCESSOR ==================== class DataProcessor: def __init__(self): self.sheet_handler = GoogleSheetHandler() self.wiki_scraper = WikipediaScraper() def process_rows(self, num_rows=None): if MODE == "2": print("Re-Evaluierungsmodus: Verarbeitung aller Zeilen mit 'x' in Spalte A.") elif MODE == "3": print("Alignment-Demo-Modus: Schreibe neue Spaltenüberschriften in Zeile 11200.") alignment_demo(self.sheet_handler.sheet) return else: start_index = self.sheet_handler.get_start_index() print(f"Starte bei Zeile {start_index+1}") for i, row in enumerate(self.sheet_handler.sheet_values[1:], start=2): if MODE == "2": if row[0].strip().lower() == "x": self._process_single_row(i, row) else: if i >= self.sheet_handler.get_start_index(): self._process_single_row(i, row) def _process_single_row(self, row_num, row_data): # Neues Schema: # B: Firmenname, C: Website # Wikipedia-Daten: Spalten K bis Q # ChatGPT Umsatz: Spalte AF # CRM Umsatz (bestehend) in Spalte I (Index 8) # Umsatz-Abgleich: Ergebnis in Spalte AG # Timestamp in Spalte AH, Version in Spalte AI. company_name = row_data[1] if len(row_data) > 1 else "" website = row_data[2] if len(row_data) > 2 else "" wiki_update_range = f"K{row_num}:Q{row_num}" chatgpt_range = f"AF{row_num}" abgleich_range = f"AG{row_num}" dt_range = f"AH{row_num}" ver_range = f"AI{row_num}" print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Verarbeite Zeile {row_num}: {company_name}") article = self.wiki_scraper.search_company_article(company_name, website) if article: company_data = self.wiki_scraper.extract_company_data(article.url) else: company_data = { 'url': 'k.A.', 'first_paragraph': 'k.A.', 'branche': 'k.A.', 'umsatz': 'k.A.', 'mitarbeiter': 'k.A.', 'categories': 'k.A.', 'full_infobox': 'k.A.' } wiki_values = [ "k.A.", # Vorschlag Wiki URL company_data.get('url', 'k.A.'), company_data.get('first_paragraph', 'k.A.'), company_data.get('branche', 'k.A.'), company_data.get('umsatz', 'k.A.'), company_data.get('mitarbeiter', 'k.A.'), company_data.get('categories', 'k.A.') ] self.sheet_handler.sheet.update(values=[wiki_values], range_name=wiki_update_range) # ChatGPT API: Umsatzbewertung basierend auf Firmenname und Wikipedia-Umsatz wiki_umsatz = company_data.get('umsatz', 'k.A.') if wiki_umsatz != "k.A.": chatgpt_umsatz = evaluate_umsatz_chatgpt(company_name, wiki_umsatz) else: chatgpt_umsatz = "k.A." self.sheet_handler.sheet.update(values=[[chatgpt_umsatz]], range_name=chatgpt_range) # Umsatz-Abgleich zwischen CRM-Umsatz (Spalte I) und Wikipedia-Umsatz (aus company_data) crm_umsatz = row_data[8] if len(row_data) > 8 else "k.A." abgleich_result = compare_umsatz_values(crm_umsatz, company_data.get('umsatz', 'k.A.')) self.sheet_handler.sheet.update(values=[[abgleich_result]], range_name=abgleich_range) current_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.sheet_handler.sheet.update(values=[[current_dt]], range_name=dt_range) self.sheet_handler.sheet.update(values=[[Config.VERSION]], range_name=ver_range) print(f"✅ Aktualisiert: URL: {company_data.get('url', 'k.A.')}, Absatz: {company_data.get('first_paragraph', 'k.A.')[:30]}..., " f"Branche: {company_data.get('branche', 'k.A.')}, Wikipedia Umsatz: {company_data.get('umsatz', 'k.A.')}, " f"Mitarbeiter: {company_data.get('mitarbeiter', 'k.A.')}, Kategorien: {company_data.get('categories', 'k.A.')}, " f"ChatGPT Umsatz: {chatgpt_umsatz}, Umsatz-Abgleich: {abgleich_result}") if MODE == "2": print("----- Vollständiger Infobox-Inhalt -----") print(company_data.get("full_infobox", "k.A.")) print("----------------------------------------") time.sleep(Config.RETRY_DELAY) if __name__ == "__main__": mode_input = input("Wählen Sie den Modus: 1 für normalen Modus, 2 für Re-Evaluierungsmodus, 3 für Alignment-Demo: ").strip() if mode_input == "2": MODE = "2" elif mode_input == "3": MODE = "3" else: MODE = "1" if MODE == "1": try: num_rows = int(input("Wieviele Zeilen sollen überprüft werden? ")) except Exception as e: print("Ungültige Eingabe. Bitte eine Zahl eingeben.") exit(1) else: num_rows = None processor = DataProcessor() processor.process_rows(num_rows) print(f"\n✅ Wikipedia-Auswertung abgeschlossen ({Config.VERSION})")