import sqlite3 import os import requests import json from datetime import datetime, timedelta # --- Configuration --- DB_PATH = "/home/node/clawd/repos/brancheneinstufung2/company_explorer_local.db" CE_API_URL = "http://192.168.178.6:8090/ce/api" # SO_API_URL = "..." # To be added import sys sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "connector-superoffice")) from superoffice_client import SuperOfficeClient class GtmHealthCheck: def __init__(self): self.db_path = DB_PATH self.ce_api_url = CE_API_URL self.api_user = os.getenv("COMPANY_EXPLORER_API_USER", "admin") self.api_password = os.getenv("COMPANY_EXPLORER_API_PASSWORD", "gemini") def get_ce_stats(self): """Holt Statistiken aus der lokalen Company Explorer DB.""" if not os.path.exists(self.db_path): return {"total": 0, "in_progress": 0, "error": "DB file not found."} try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM companies") total = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM companies WHERE status != 'ENRICHED'") in_progress = cursor.fetchone()[0] conn.close() return {"total": total, "in_progress": in_progress} except Exception as e: return {"total": 0, "in_progress": 0, "error": str(e)} def check_ce_api_health(self): """Prüft die Erreichbarkeit der Company Explorer API.""" try: response = requests.get( f"{self.ce_api_url}/health", auth=(self.api_user, self.api_password), timeout=5 ) if response.status_code == 200 and response.json().get("status") == "ok": return "[HEALTHY]" return f"[ERROR - Status {response.status_code}]" except requests.exceptions.Timeout: return "[BUSY/TIMEOUT]" except requests.exceptions.RequestException: return "[UNREACHABLE]" def get_so_stats(self): """Holt die ungefähre Gesamtanzahl der Firmen aus der SuperOffice API.""" try: client = SuperOfficeClient() # We query the first page with a page size of 200 (a common default) query_string = "Contact?$top=200" # Directly use the requests logic from the client's search method for a single page url = f"{client.base_url}/{query_string}" response = requests.get(url, headers=client.headers, timeout=15) response.raise_for_status() data = response.json() count_on_page = len(data.get('value', [])) # Check for a next link to determine if there are more pages if 'odata.nextLink' in data or 'next_page_url' in data: return {"total": f"> {count_on_page}"} # More than one page else: return {"total": str(count_on_page)} # Exact number if only one page except requests.exceptions.RequestException as e: return {"total": f"API Error"} except Exception as e: return {"total": f"Error"} def check_so_api_health(self): """Prüft die Erreichbarkeit der SuperOffice API.""" try: client = SuperOfficeClient() # A simple request to the base URL should suffice as a health check response = requests.get(client.base_url, headers=client.headers, timeout=10) if response.status_code == 200: return "[HEALTHY]" return f"[ERROR - Status {response.status_code}]" except requests.exceptions.Timeout: return "[BUSY/TIMEOUT]" except requests.exceptions.RequestException: return "[UNREACHABLE]" def get_throughput(self): """Zählt die verarbeiteten Accounts der letzten Stunde aus dem Log.""" log_file = "/home/node/clawd/repos/brancheneinstufung2/logs/throughput.log" if not os.path.exists(log_file): return 0 count = 0 one_hour_ago = datetime.utcnow() - timedelta(hours=1) try: with open(log_file, "r") as f: for line in f: parts = line.strip().split(',') if len(parts) >= 1: try: timestamp = datetime.fromisoformat(parts[0]) if timestamp >= one_hour_ago: count += 1 except ValueError: continue # Ignore malformed lines return count except Exception: return "Log Error" def render_dashboard(self): """Stellt das Dashboard auf der Konsole dar.""" ce_stats = self.get_ce_stats() ce_health = self.check_ce_api_health() so_stats = self.get_so_stats() so_health = self.check_so_api_health() throughput = self.get_throughput() timestamp = datetime.now().strftime("%d.%m.%y %H:%M") print("=======================================") print(f"GTM Lead Engine - Status ({timestamp})") print("=======================================") print("\n[+] Schnittstellen:") print(f" - SuperOffice API: {so_health}") print(f" - Company Explorer: {ce_health}") print("\n[+] Account Trichter:") print(f" - SuperOffice Gesamt: {so_stats.get('total')}") if 'error' in ce_stats: print(f" - Im Company Explorer: Error ({ce_stats['error']})") else: print(f" - Im Company Explorer: {ce_stats.get('total')}") print(f" - In Bearbeitung: {ce_stats.get('in_progress')}") print("\n[+] Durchsatz (Letzte Stunde):") print(f" - Verarbeitet: {throughput} Accounts") print("\n") if __name__ == "__main__": checker = GtmHealthCheck() checker.render_dashboard()