Files
Brancheneinstufung2/debug_connector_status.py
Floke 11d2bc03bf [30e88f42] ✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf
✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf
  mit Echtdaten abgeschlossen.
2026-02-22 08:20:28 +00:00

50 lines
1.7 KiB
Python

import sqlite3
import json
import os
DB_PATH = "connector_queue.db"
def inspect_queue():
if not os.path.exists(DB_PATH):
print(f"❌ Database not found at {DB_PATH}")
return
print(f"🔍 Inspecting Queue: {DB_PATH}")
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get stats
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
stats = dict(cursor.fetchall())
print(f"\n📊 Stats: {stats}")
# Get recent jobs
print("\n📝 Last 10 Jobs:")
cursor.execute("SELECT id, event_type, status, error_msg, updated_at, payload FROM jobs ORDER BY updated_at DESC LIMIT 10")
rows = cursor.fetchall()
for row in rows:
payload = json.loads(row['payload'])
# Try to identify entity
entity = "Unknown"
if "PrimaryKey" in payload: entity = f"ID {payload['PrimaryKey']}"
if "ContactId" in payload: entity = f"Contact {payload['ContactId']}"
print(f" - Job #{row['id']} [{row['status']}] {row['event_type']} ({entity})")
print(f" Updated: {row['updated_at']}")
if row['error_msg']:
print(f" ❌ ERROR: {row['error_msg']}")
# Print payload details relevant to syncing
if row['status'] == 'COMPLETED':
pass # Maybe less interesting if success, but user says it didn't sync
conn.close()
except Exception as e:
print(f"❌ Error reading DB: {e}")
if __name__ == "__main__":
inspect_queue()