feat(content): implement Content Engine MVP (v1.0) with GTM integration
This commit is contained in:
179
content-engine/content_db_manager.py
Normal file
179
content-engine/content_db_manager.py
Normal file
@@ -0,0 +1,179 @@
|
||||
import sqlite3
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
DB_PATH = os.environ.get('DB_PATH', 'content_engine.db')
|
||||
GTM_DB_PATH = os.environ.get('GTM_DB_PATH', 'gtm_projects.db')
|
||||
|
||||
def get_db_connection(path=DB_PATH):
|
||||
conn = sqlite3.connect(path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
def init_db():
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Projects table
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS content_projects (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
gtm_project_id TEXT,
|
||||
category TEXT,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
gtm_data_snapshot TEXT, -- Full JSON snapshot of GTM data at import
|
||||
seo_strategy TEXT, -- JSON blob
|
||||
site_structure TEXT, -- JSON blob
|
||||
metadata TEXT -- JSON blob
|
||||
)
|
||||
''')
|
||||
|
||||
# Content Assets table
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS content_assets (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
project_id INTEGER,
|
||||
asset_type TEXT NOT NULL, -- 'website_section', 'linkedin', 'email', 'pr'
|
||||
section_key TEXT, -- e.g., 'hero', 'features', 'faq'
|
||||
title TEXT,
|
||||
content TEXT, -- Markdown content
|
||||
status TEXT DEFAULT 'draft',
|
||||
keywords TEXT, -- JSON list of used keywords
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (project_id) REFERENCES content_projects (id) ON DELETE CASCADE
|
||||
)
|
||||
''')
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
logging.info(f"Database initialized at {DB_PATH}")
|
||||
|
||||
# --- GTM READ ACCESS ---
|
||||
|
||||
def get_all_gtm_projects():
|
||||
"""Lists all available GTM projects."""
|
||||
if not os.path.exists(GTM_DB_PATH):
|
||||
logging.warning(f"GTM DB not found at {GTM_DB_PATH}")
|
||||
return []
|
||||
|
||||
conn = get_db_connection(GTM_DB_PATH)
|
||||
try:
|
||||
query = """
|
||||
SELECT
|
||||
id,
|
||||
name,
|
||||
updated_at,
|
||||
json_extract(data, '$.phases.phase1_result.category') AS productCategory
|
||||
FROM gtm_projects
|
||||
ORDER BY updated_at DESC
|
||||
"""
|
||||
projects = [dict(row) for row in conn.execute(query).fetchall()]
|
||||
return projects
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def get_gtm_project_data(gtm_id):
|
||||
"""Retrieves full data for a GTM project."""
|
||||
conn = get_db_connection(GTM_DB_PATH)
|
||||
try:
|
||||
row = conn.execute("SELECT data FROM gtm_projects WHERE id = ?", (gtm_id,)).fetchone()
|
||||
return json.loads(row['data']) if row else None
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
# --- CONTENT ENGINE OPERATIONS ---
|
||||
|
||||
def import_gtm_project(gtm_id):
|
||||
"""Imports a GTM project as a new Content Engine project."""
|
||||
gtm_data = get_gtm_project_data(gtm_id)
|
||||
if not gtm_data:
|
||||
return None
|
||||
|
||||
name = gtm_data.get('name', 'Imported Project')
|
||||
# Phase 1 has the category
|
||||
phase1 = gtm_data.get('phases', {}).get('phase1_result', {})
|
||||
if isinstance(phase1, str): phase1 = json.loads(phase1)
|
||||
category = phase1.get('category', 'Unknown')
|
||||
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"INSERT INTO content_projects (name, gtm_project_id, category, gtm_data_snapshot) VALUES (?, ?, ?, ?)",
|
||||
(name, gtm_id, category, json.dumps(gtm_data))
|
||||
)
|
||||
project_id = cursor.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return {"id": project_id, "name": name, "category": category}
|
||||
|
||||
def get_all_content_projects():
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT id, name, category, created_at, gtm_project_id FROM content_projects ORDER BY updated_at DESC")
|
||||
projects = [dict(row) for row in cursor.fetchall()]
|
||||
conn.close()
|
||||
return projects
|
||||
|
||||
def get_content_project(project_id):
|
||||
conn = get_db_connection()
|
||||
row = conn.execute("SELECT * FROM content_projects WHERE id = ?", (project_id,)).fetchone()
|
||||
conn.close()
|
||||
if row:
|
||||
d = dict(row)
|
||||
if d['gtm_data_snapshot']: d['gtm_data_snapshot'] = json.loads(d['gtm_data_snapshot'])
|
||||
if d['seo_strategy']: d['seo_strategy'] = json.loads(d['seo_strategy'])
|
||||
return d
|
||||
return None
|
||||
|
||||
def save_seo_strategy(project_id, strategy_dict):
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"UPDATE content_projects SET seo_strategy = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
|
||||
(json.dumps(strategy_dict), project_id)
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def save_content_asset(project_id, asset_type, section_key, title, content, keywords=None):
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
# Check if exists (upsert logic for sections)
|
||||
cursor.execute(
|
||||
"SELECT id FROM content_assets WHERE project_id = ? AND asset_type = ? AND section_key = ?",
|
||||
(project_id, asset_type, section_key)
|
||||
)
|
||||
existing = cursor.fetchone()
|
||||
|
||||
if existing:
|
||||
cursor.execute(
|
||||
"UPDATE content_assets SET title = ?, content = ?, keywords = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
|
||||
(title, content, json.dumps(keywords) if keywords else None, existing['id'])
|
||||
)
|
||||
asset_id = existing['id']
|
||||
else:
|
||||
cursor.execute(
|
||||
"INSERT INTO content_assets (project_id, asset_type, section_key, title, content, keywords) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(project_id, asset_type, section_key, title, content, json.dumps(keywords) if keywords else None)
|
||||
)
|
||||
asset_id = cursor.lastrowid
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return asset_id
|
||||
|
||||
def get_project_assets(project_id):
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT * FROM content_assets WHERE project_id = ?", (project_id,))
|
||||
assets = [dict(row) for row in cursor.fetchall()]
|
||||
conn.close()
|
||||
return assets
|
||||
|
||||
if __name__ == "__main__":
|
||||
init_db()
|
||||
Reference in New Issue
Block a user