Files
Brancheneinstufung2/lead-engine/db.py
Floke b1081e8cc3 [31388f42] Feat: Persist drafts and enhance UI warnings
This commit introduces two key improvements to the Lead Engine:

1.  **Persistent Email Drafts:**
    - Adds a new  function to .
    - Modifies  to save generated email replies directly to the  column in the database, ensuring they persist across sessions.
    - Removes the previous session-based state for drafts.

2.  **Enhanced UI Visibility:**
    - Adds a warning icon (⚠️) directly to the lead expander's title if a lead is flagged as low-quality, making it easier to spot.
2026-03-02 19:32:07 +00:00

145 lines
4.6 KiB
Python

import sqlite3
import json
import os
from datetime import datetime
# Robust path handling:
# 1. Get the directory where this script (db.py) lives
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 2. Define data directory and db path relative to it
DATA_DIR = os.path.join(BASE_DIR, 'data')
DB_PATH = os.path.join(DATA_DIR, 'leads.db')
def init_db():
# Ensure data directory exists
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id TEXT UNIQUE,
received_at TIMESTAMP,
company_name TEXT,
contact_name TEXT,
email TEXT,
phone TEXT,
raw_body TEXT,
lead_metadata TEXT,
enrichment_data TEXT,
status TEXT DEFAULT 'new',
response_draft TEXT,
sent_at TIMESTAMP
)
''')
# Simple migration check: add 'lead_metadata' if not exists
c.execute("PRAGMA table_info(leads)")
columns = [row[1] for row in c.fetchall()]
if 'lead_metadata' not in columns:
print("Migrating DB: Adding lead_metadata column...")
c.execute('ALTER TABLE leads ADD COLUMN lead_metadata TEXT')
if 'source' not in columns:
print("Migrating DB: Adding source column...")
c.execute('ALTER TABLE leads ADD COLUMN source TEXT')
conn.commit()
conn.close()
def insert_lead(lead_data):
# Ensure DB exists before inserting (if ingest runs before init)
if not os.path.exists(DB_PATH):
init_db()
# Extract metadata fields
meta = {
'area': lead_data.get('area'),
'purpose': lead_data.get('purpose'),
'zip': lead_data.get('zip'),
'city': lead_data.get('city'),
'role': lead_data.get('role'),
'salutation': lead_data.get('salutation'),
'phone': lead_data.get('phone'),
'cleaning_functions': lead_data.get('cleaning_functions'),
'is_free_mail': lead_data.get('is_free_mail', False),
'is_low_quality': lead_data.get('is_low_quality', False)
}
# Use provided received_at or default to now
received_at = lead_data.get('received_at') or datetime.now()
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
try:
c.execute('''
INSERT INTO leads (source_id, received_at, company_name, contact_name, email, phone, raw_body, lead_metadata, status, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
lead_data.get('id'),
received_at,
lead_data.get('company'),
lead_data.get('contact'),
lead_data.get('email'),
lead_data.get('phone'),
lead_data.get('raw_body'),
json.dumps(meta),
'new',
lead_data.get('source') # Added source
))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
finally:
conn.close()
def update_lead_metadata(lead_id, meta_data):
"""Helper to update metadata for existing leads (repair)"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('UPDATE leads SET lead_metadata = ? WHERE id = ?', (json.dumps(meta_data), lead_id))
conn.commit()
conn.close()
def get_leads():
if not os.path.exists(DB_PATH):
init_db()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT * FROM leads ORDER BY received_at DESC')
rows = c.fetchall()
conn.close()
return [dict(row) for row in rows]
def update_lead_status(lead_id, status, response_draft=None):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
if response_draft:
c.execute('UPDATE leads SET status = ?, response_draft = ? WHERE id = ?', (status, response_draft, lead_id))
else:
c.execute('UPDATE leads SET status = ? WHERE id = ?', (status, lead_id))
conn.commit()
conn.close()
def update_lead_draft(lead_id, draft_text):
"""Saves a generated email draft to the database."""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('UPDATE leads SET response_draft = ? WHERE id = ?', (draft_text, lead_id))
conn.commit()
conn.close()
def reset_lead(lead_id):
"""Resets a lead to 'new' status and clears enrichment data."""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('UPDATE leads SET status = "new", enrichment_data = NULL WHERE id = ?', (lead_id,))
conn.commit()
conn.close()