Files
Brancheneinstufung2/content-engine/content_db_manager.py

179 lines
5.9 KiB
Python

import sqlite3
import json
import os
import logging
from datetime import datetime
DB_PATH = os.environ.get('DB_PATH', 'content_engine.db')
GTM_DB_PATH = os.environ.get('GTM_DB_PATH', 'gtm_projects.db')
def get_db_connection(path=DB_PATH):
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db_connection()
cursor = conn.cursor()
# Projects table
cursor.execute('''
CREATE TABLE IF NOT EXISTS content_projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
gtm_project_id TEXT,
category TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
gtm_data_snapshot TEXT, -- Full JSON snapshot of GTM data at import
seo_strategy TEXT, -- JSON blob
site_structure TEXT, -- JSON blob
metadata TEXT -- JSON blob
)
''')
# Content Assets table
cursor.execute('''
CREATE TABLE IF NOT EXISTS content_assets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER,
asset_type TEXT NOT NULL, -- 'website_section', 'linkedin', 'email', 'pr'
section_key TEXT, -- e.g., 'hero', 'features', 'faq'
title TEXT,
content TEXT, -- Markdown content
status TEXT DEFAULT 'draft',
keywords TEXT, -- JSON list of used keywords
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES content_projects (id) ON DELETE CASCADE
)
''')
conn.commit()
conn.close()
logging.info(f"Database initialized at {DB_PATH}")
# --- GTM READ ACCESS ---
def get_all_gtm_projects():
"""Lists all available GTM projects."""
if not os.path.exists(GTM_DB_PATH):
logging.warning(f"GTM DB not found at {GTM_DB_PATH}")
return []
conn = get_db_connection(GTM_DB_PATH)
try:
query = """
SELECT
id,
name,
updated_at,
json_extract(data, '$.phases.phase1_result.category') AS productCategory
FROM gtm_projects
ORDER BY updated_at DESC
"""
projects = [dict(row) for row in conn.execute(query).fetchall()]
return projects
finally:
conn.close()
def get_gtm_project_data(gtm_id):
"""Retrieves full data for a GTM project."""
conn = get_db_connection(GTM_DB_PATH)
try:
row = conn.execute("SELECT data FROM gtm_projects WHERE id = ?", (gtm_id,)).fetchone()
return json.loads(row['data']) if row else None
finally:
conn.close()
# --- CONTENT ENGINE OPERATIONS ---
def import_gtm_project(gtm_id):
"""Imports a GTM project as a new Content Engine project."""
gtm_data = get_gtm_project_data(gtm_id)
if not gtm_data:
return None
name = gtm_data.get('name', 'Imported Project')
# Phase 1 has the category
phase1 = gtm_data.get('phases', {}).get('phase1_result', {})
if isinstance(phase1, str): phase1 = json.loads(phase1)
category = phase1.get('category', 'Unknown')
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO content_projects (name, gtm_project_id, category, gtm_data_snapshot) VALUES (?, ?, ?, ?)",
(name, gtm_id, category, json.dumps(gtm_data))
)
project_id = cursor.lastrowid
conn.commit()
conn.close()
return {"id": project_id, "name": name, "category": category}
def get_all_content_projects():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, name, category, created_at, gtm_project_id FROM content_projects ORDER BY updated_at DESC")
projects = [dict(row) for row in cursor.fetchall()]
conn.close()
return projects
def get_content_project(project_id):
conn = get_db_connection()
row = conn.execute("SELECT * FROM content_projects WHERE id = ?", (project_id,)).fetchone()
conn.close()
if row:
d = dict(row)
if d['gtm_data_snapshot']: d['gtm_data_snapshot'] = json.loads(d['gtm_data_snapshot'])
if d['seo_strategy']: d['seo_strategy'] = json.loads(d['seo_strategy'])
return d
return None
def save_seo_strategy(project_id, strategy_dict):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"UPDATE content_projects SET seo_strategy = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(json.dumps(strategy_dict), project_id)
)
conn.commit()
conn.close()
def save_content_asset(project_id, asset_type, section_key, title, content, keywords=None):
conn = get_db_connection()
cursor = conn.cursor()
# Check if exists (upsert logic for sections)
cursor.execute(
"SELECT id FROM content_assets WHERE project_id = ? AND asset_type = ? AND section_key = ?",
(project_id, asset_type, section_key)
)
existing = cursor.fetchone()
if existing:
cursor.execute(
"UPDATE content_assets SET title = ?, content = ?, keywords = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(title, content, json.dumps(keywords) if keywords else None, existing['id'])
)
asset_id = existing['id']
else:
cursor.execute(
"INSERT INTO content_assets (project_id, asset_type, section_key, title, content, keywords) VALUES (?, ?, ?, ?, ?, ?)",
(project_id, asset_type, section_key, title, content, json.dumps(keywords) if keywords else None)
)
asset_id = cursor.lastrowid
conn.commit()
conn.close()
return asset_id
def get_project_assets(project_id):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM content_assets WHERE project_id = ?", (project_id,))
assets = [dict(row) for row in cursor.fetchall()]
conn.close()
return assets
if __name__ == "__main__":
init_db()