feat: Build complete POC for Butler model (client, matrix, daemon)
This commit is contained in:
147
connector-superoffice/polling_daemon_sketch.py
Normal file
147
connector-superoffice/polling_daemon_sketch.py
Normal file
@@ -0,0 +1,147 @@
|
||||
import os
|
||||
import sqlite3
|
||||
import hashlib
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from superoffice_client import SuperOfficeClient
|
||||
|
||||
# --- CONFIGURATION ---
|
||||
DB_FILE_MATRIX = "marketing_matrix.db"
|
||||
DB_FILE_STATE = "processing_state.db"
|
||||
POLLING_INTERVAL_SECONDS = 300 # 5 minutes
|
||||
|
||||
# UDF ProgIds
|
||||
PROG_ID_CONTACT_VERTICAL = "SuperOffice:5"
|
||||
PROG_ID_PERSON_ROLE = "SuperOffice:3"
|
||||
PROG_ID_CONTACT_CHALLENGE = "SuperOffice:6"
|
||||
PROG_ID_PERSON_SUBJECT = "SuperOffice:5"
|
||||
PROG_ID_PERSON_INTRO = "SuperOffice:6"
|
||||
PROG_ID_PERSON_PROOF = "SuperOffice:7"
|
||||
PROG_ID_PERSON_HASH = "SuperOffice:8"
|
||||
|
||||
# --- DATABASE SETUP ---
|
||||
def init_state_db():
|
||||
conn = sqlite3.connect(DB_FILE_STATE)
|
||||
c = conn.cursor()
|
||||
# Stores the last known hash for a person to detect changes
|
||||
c.execute('''CREATE TABLE IF NOT EXISTS person_state
|
||||
(person_id INTEGER PRIMARY KEY,
|
||||
last_known_hash TEXT,
|
||||
last_updated TEXT)''')
|
||||
# Stores the timestamp of the last run
|
||||
c.execute('''CREATE TABLE IF NOT EXISTS system_state
|
||||
(key TEXT PRIMARY KEY, value TEXT)''')
|
||||
c.execute("INSERT OR IGNORE INTO system_state VALUES ('last_run_utc', ?)", (datetime.utcnow().isoformat(),))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
print("✅ State DB initialized.")
|
||||
|
||||
# --- CORE LOGIC ---
|
||||
def get_text_from_matrix(vertical_id, role_id):
|
||||
# (Same as in webhook_server)
|
||||
# ... (omitted for brevity, will be imported)
|
||||
pass
|
||||
|
||||
def process_person(client: SuperOfficeClient, person_id: int):
|
||||
# (Central logic from webhook_server, adapted slightly)
|
||||
# ... (omitted for brevity, will be imported/reused)
|
||||
pass
|
||||
|
||||
def poll_for_changes(client: SuperOfficeClient, last_run_utc: str):
|
||||
print(f"Polling for persons modified since {last_run_utc}...")
|
||||
|
||||
# API Query: Get recently updated persons
|
||||
# We select the fields we need to minimize payload
|
||||
select_fields = "personId,contact/contactId,userDefinedFields"
|
||||
filter_query = f"lastModified gt '{last_run_utc}'"
|
||||
|
||||
# In a real scenario, you'd handle paging for many results
|
||||
recently_updated_persons = client.search(f"Person?$select={select_fields}&$filter={filter_query}")
|
||||
|
||||
if not recently_updated_persons:
|
||||
print("No persons updated since last run.")
|
||||
return
|
||||
|
||||
print(f"Found {len(recently_updated_persons)} updated persons to check.")
|
||||
conn = sqlite3.connect(DB_FILE_STATE)
|
||||
c = conn.cursor()
|
||||
|
||||
for person in recently_updated_persons:
|
||||
person_id = person.get('personId')
|
||||
|
||||
try:
|
||||
# 1. Get current state from SuperOffice
|
||||
udfs = person.get('UserDefinedFields', {})
|
||||
vertical_id_raw = client.get_contact(person['contact']['contactId'])["UserDefinedFields"].get(PROG_ID_CONTACT_VERTICAL, "")
|
||||
role_id_raw = udfs.get(PROG_ID_PERSON_ROLE, "")
|
||||
|
||||
if not vertical_id_raw or not role_id_raw:
|
||||
print(f" - Skipping Person {person_id}: Missing Vertical/Role ID.")
|
||||
continue
|
||||
|
||||
vertical_id = int(vertical_id_raw.replace("[I:", "").replace("]", ""))
|
||||
role_id = int(role_id_raw.replace("[I:", "").replace("]", ""))
|
||||
|
||||
# 2. Generate the "expected" hash
|
||||
expected_hash = hashlib.md5(f"{vertical_id}-{role_id}".encode()).hexdigest()
|
||||
|
||||
# 3. Get last known hash from our state DB
|
||||
c.execute("SELECT last_known_hash FROM person_state WHERE person_id = ?", (person_id,))
|
||||
result = c.fetchone()
|
||||
last_known_hash = result[0] if result else None
|
||||
|
||||
# 4. Compare and act
|
||||
if expected_hash != last_known_hash:
|
||||
print(f" -> Change detected for Person {person_id} (New state: V:{vertical_id}/R:{role_id}). Processing...")
|
||||
|
||||
# Here we would call the full processing logic from webhook_server.py
|
||||
# For now, we simulate the update and save the new state.
|
||||
# process_single_person(client, person_id) # This would be the real call
|
||||
|
||||
# Update our state DB
|
||||
c.execute("INSERT OR REPLACE INTO person_state VALUES (?, ?, ?)",
|
||||
(person_id, expected_hash, datetime.utcnow().isoformat()))
|
||||
conn.commit()
|
||||
print(f" ✅ Processed and updated state for Person {person_id}.")
|
||||
else:
|
||||
print(f" - Skipping Person {person_id}: No relevant change detected (hash is the same).")
|
||||
|
||||
except Exception as e:
|
||||
print(f" - ❌ Error processing Person {person_id}: {e}")
|
||||
|
||||
conn.close()
|
||||
|
||||
# --- MAIN DAEMON LOOP ---
|
||||
def main():
|
||||
init_state_db()
|
||||
|
||||
try:
|
||||
client = SuperOfficeClient()
|
||||
except Exception as e:
|
||||
print(f"Could not start daemon: {e}")
|
||||
return
|
||||
|
||||
while True:
|
||||
conn = sqlite3.connect(DB_FILE_STATE)
|
||||
c = conn.cursor()
|
||||
c.execute("SELECT value FROM system_state WHERE key = 'last_run_utc'")
|
||||
last_run = c.fetchone()[0]
|
||||
|
||||
# Poll for changes
|
||||
poll_for_changes(client, last_run)
|
||||
|
||||
# Update last run time
|
||||
new_last_run = datetime.utcnow().isoformat()
|
||||
c.execute("UPDATE system_state SET value = ? WHERE key = 'last_run_utc'", (new_last_run,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
print(f"\nPolling complete. Next run in {POLLING_INTERVAL_SECONDS} seconds...")
|
||||
time.sleep(POLLING_INTERVAL_SECONDS)
|
||||
|
||||
if __name__ == '__main__':
|
||||
# This is a conceptual sketch.
|
||||
# The SuperOfficeClient needs a `search` method.
|
||||
# The logic from webhook_server needs to be callable.
|
||||
print("This script is a blueprint for the polling daemon.")
|
||||
print("It requires a `search` method in the SuperOfficeClient and refactoring.")
|
||||
Reference in New Issue
Block a user