Files
Brancheneinstufung2/show_status.py
2026-02-17 19:44:42 +00:00

158 lines
6.1 KiB
Python

import sqlite3
import os
import requests
import json
from datetime import datetime, timedelta
# --- Configuration ---
DB_PATH = "/home/node/clawd/repos/brancheneinstufung2/company_explorer_local.db"
CE_API_URL = "http://192.168.178.6:8090/ce/api"
# SO_API_URL = "..." # To be added
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "connector-superoffice"))
from superoffice_client import SuperOfficeClient
class GtmHealthCheck:
def __init__(self):
self.db_path = DB_PATH
self.ce_api_url = CE_API_URL
self.api_user = os.getenv("COMPANY_EXPLORER_API_USER", "admin")
self.api_password = os.getenv("COMPANY_EXPLORER_API_PASSWORD", "gemini")
def get_ce_stats(self):
"""Holt Statistiken aus der lokalen Company Explorer DB."""
if not os.path.exists(self.db_path):
return {"total": 0, "in_progress": 0, "error": "DB file not found."}
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM companies")
total = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM companies WHERE status != 'ENRICHED'")
in_progress = cursor.fetchone()[0]
conn.close()
return {"total": total, "in_progress": in_progress}
except Exception as e:
return {"total": 0, "in_progress": 0, "error": str(e)}
def check_ce_api_health(self):
"""Prüft die Erreichbarkeit der Company Explorer API."""
try:
response = requests.get(
f"{self.ce_api_url}/health",
auth=(self.api_user, self.api_password),
timeout=5
)
if response.status_code == 200 and response.json().get("status") == "ok":
return "[HEALTHY]"
return f"[ERROR - Status {response.status_code}]"
except requests.exceptions.Timeout:
return "[BUSY/TIMEOUT]"
except requests.exceptions.RequestException:
return "[UNREACHABLE]"
def get_so_stats(self):
"""Holt die ungefähre Gesamtanzahl der Firmen aus der SuperOffice API."""
try:
client = SuperOfficeClient()
# We query the first page with a page size of 200 (a common default)
query_string = "Contact?$top=200"
# Directly use the requests logic from the client's search method for a single page
url = f"{client.base_url}/{query_string}"
response = requests.get(url, headers=client.headers, timeout=15)
response.raise_for_status()
data = response.json()
count_on_page = len(data.get('value', []))
# Check for a next link to determine if there are more pages
if 'odata.nextLink' in data or 'next_page_url' in data:
return {"total": f"> {count_on_page}"} # More than one page
else:
return {"total": str(count_on_page)} # Exact number if only one page
except requests.exceptions.RequestException as e:
return {"total": f"API Error"}
except Exception as e:
return {"total": f"Error"}
def check_so_api_health(self):
"""Prüft die Erreichbarkeit der SuperOffice API."""
try:
client = SuperOfficeClient()
# A simple request to the base URL should suffice as a health check
response = requests.get(client.base_url, headers=client.headers, timeout=10)
if response.status_code == 200:
return "[HEALTHY]"
return f"[ERROR - Status {response.status_code}]"
except requests.exceptions.Timeout:
return "[BUSY/TIMEOUT]"
except requests.exceptions.RequestException:
return "[UNREACHABLE]"
def get_throughput(self):
"""Zählt die verarbeiteten Accounts der letzten Stunde aus dem Log."""
log_file = "/home/node/clawd/repos/brancheneinstufung2/logs/throughput.log"
if not os.path.exists(log_file):
return 0
count = 0
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
try:
with open(log_file, "r") as f:
for line in f:
parts = line.strip().split(',')
if len(parts) >= 1:
try:
timestamp = datetime.fromisoformat(parts[0])
if timestamp >= one_hour_ago:
count += 1
except ValueError:
continue # Ignore malformed lines
return count
except Exception:
return "Log Error"
def render_dashboard(self):
"""Stellt das Dashboard auf der Konsole dar."""
ce_stats = self.get_ce_stats()
ce_health = self.check_ce_api_health()
so_stats = self.get_so_stats()
so_health = self.check_so_api_health()
throughput = self.get_throughput()
timestamp = datetime.now().strftime("%d.%m.%y %H:%M")
print("=======================================")
print(f"GTM Lead Engine - Status ({timestamp})")
print("=======================================")
print("\n[+] Schnittstellen:")
print(f" - SuperOffice API: {so_health}")
print(f" - Company Explorer: {ce_health}")
print("\n[+] Account Trichter:")
print(f" - SuperOffice Gesamt: {so_stats.get('total')}")
if 'error' in ce_stats:
print(f" - Im Company Explorer: Error ({ce_stats['error']})")
else:
print(f" - Im Company Explorer: {ce_stats.get('total')}")
print(f" - In Bearbeitung: {ce_stats.get('in_progress')}")
print("\n[+] Durchsatz (Letzte Stunde):")
print(f" - Verarbeitet: {throughput} Accounts")
print("\n")
if __name__ == "__main__":
checker = GtmHealthCheck()
checker.render_dashboard()