✦ In dieser Sitzung haben wir den End-to-End-Test der SuperOffice-Schnittstelle erfolgreich von der automatisierten Simulation bis zum produktiven Live-Lauf mit Echtdaten abgeschlossen.
50 lines
1.7 KiB
Python
50 lines
1.7 KiB
Python
import sqlite3
|
|
import json
|
|
import os
|
|
|
|
DB_PATH = "connector_queue.db"
|
|
|
|
def inspect_queue():
|
|
if not os.path.exists(DB_PATH):
|
|
print(f"❌ Database not found at {DB_PATH}")
|
|
return
|
|
|
|
print(f"🔍 Inspecting Queue: {DB_PATH}")
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
# Get stats
|
|
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
|
|
stats = dict(cursor.fetchall())
|
|
print(f"\n📊 Stats: {stats}")
|
|
|
|
# Get recent jobs
|
|
print("\n📝 Last 10 Jobs:")
|
|
cursor.execute("SELECT id, event_type, status, error_msg, updated_at, payload FROM jobs ORDER BY updated_at DESC LIMIT 10")
|
|
rows = cursor.fetchall()
|
|
|
|
for row in rows:
|
|
payload = json.loads(row['payload'])
|
|
# Try to identify entity
|
|
entity = "Unknown"
|
|
if "PrimaryKey" in payload: entity = f"ID {payload['PrimaryKey']}"
|
|
if "ContactId" in payload: entity = f"Contact {payload['ContactId']}"
|
|
|
|
print(f" - Job #{row['id']} [{row['status']}] {row['event_type']} ({entity})")
|
|
print(f" Updated: {row['updated_at']}")
|
|
if row['error_msg']:
|
|
print(f" ❌ ERROR: {row['error_msg']}")
|
|
|
|
# Print payload details relevant to syncing
|
|
if row['status'] == 'COMPLETED':
|
|
pass # Maybe less interesting if success, but user says it didn't sync
|
|
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"❌ Error reading DB: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
inspect_queue()
|