import sqlite3 import json import os from datetime import datetime # Robust path handling: # 1. Get the directory where this script (db.py) lives BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 2. Define data directory and db path relative to it DATA_DIR = os.path.join(BASE_DIR, 'data') DB_PATH = os.path.join(DATA_DIR, 'leads.db') def init_db(): # Ensure data directory exists if not os.path.exists(DATA_DIR): os.makedirs(DATA_DIR) conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS leads ( id INTEGER PRIMARY KEY AUTOINCREMENT, source_id TEXT UNIQUE, received_at TIMESTAMP, company_name TEXT, contact_name TEXT, email TEXT, phone TEXT, raw_body TEXT, lead_metadata TEXT, enrichment_data TEXT, status TEXT DEFAULT 'new', response_draft TEXT, sent_at TIMESTAMP ) ''') # Simple migration check: add 'lead_metadata' if not exists c.execute("PRAGMA table_info(leads)") columns = [row[1] for row in c.fetchall()] if 'lead_metadata' not in columns: print("Migrating DB: Adding lead_metadata column...") c.execute('ALTER TABLE leads ADD COLUMN lead_metadata TEXT') if 'source' not in columns: print("Migrating DB: Adding source column...") c.execute('ALTER TABLE leads ADD COLUMN source TEXT') conn.commit() conn.close() def insert_lead(lead_data): # Ensure DB exists before inserting (if ingest runs before init) if not os.path.exists(DB_PATH): init_db() # Extract metadata fields meta = { 'area': lead_data.get('area'), 'purpose': lead_data.get('purpose'), 'zip': lead_data.get('zip'), 'city': lead_data.get('city'), 'role': lead_data.get('role'), 'salutation': lead_data.get('salutation'), 'phone': lead_data.get('phone'), 'cleaning_functions': lead_data.get('cleaning_functions'), 'is_free_mail': lead_data.get('is_free_mail', False), 'is_low_quality': lead_data.get('is_low_quality', False) } # Use provided received_at or default to now received_at = lead_data.get('received_at') or datetime.now() conn = sqlite3.connect(DB_PATH) c = conn.cursor() try: c.execute(''' INSERT INTO leads (source_id, received_at, company_name, contact_name, email, phone, raw_body, lead_metadata, status, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( lead_data.get('id'), received_at, lead_data.get('company'), lead_data.get('contact'), lead_data.get('email'), lead_data.get('phone'), lead_data.get('raw_body'), json.dumps(meta), 'new', lead_data.get('source') # Added source )) conn.commit() return True except sqlite3.IntegrityError: return False finally: conn.close() def update_lead_metadata(lead_id, meta_data): """Helper to update metadata for existing leads (repair)""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('UPDATE leads SET lead_metadata = ? WHERE id = ?', (json.dumps(meta_data), lead_id)) conn.commit() conn.close() def get_leads(): if not os.path.exists(DB_PATH): init_db() conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute('SELECT * FROM leads ORDER BY received_at DESC') rows = c.fetchall() conn.close() return [dict(row) for row in rows] def update_lead_status(lead_id, status, response_draft=None): conn = sqlite3.connect(DB_PATH) c = conn.cursor() if response_draft: c.execute('UPDATE leads SET status = ?, response_draft = ? WHERE id = ?', (status, response_draft, lead_id)) else: c.execute('UPDATE leads SET status = ? WHERE id = ?', (status, lead_id)) conn.commit() conn.close() def update_lead_draft(lead_id, draft_text): """Saves a generated email draft to the database.""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('UPDATE leads SET response_draft = ? WHERE id = ?', (draft_text, lead_id)) conn.commit() conn.close() def reset_lead(lead_id): """Resets a lead to 'new' status and clears enrichment data.""" conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('UPDATE leads SET status = "new", enrichment_data = NULL WHERE id = ?', (lead_id,)) conn.commit() conn.close()