Files
Brancheneinstufung2/company-explorer/backend/scripts/migrate_db.py
Floke 57360496f8 feat(Explorer): Enhance metric extraction, source transparency, and UI display
- **Standardization & Formula Logic:** Fixed NameError/SyntaxError in formula parser; added support for comments and capitalized placeholders.
- **Source URL Tracking:** Extended DB schema and cascade logic to store and track specific source URLs.
- **Frontend & UI:**
  - Added 'Standardized Potential' display in Inspector.
  - Added clickable source link with icon.
  - Fixed Settings tab layout collapse (flex-shrink-0).
- **Export Capabilities:**
  - Single-company JSON export now includes full quantitative metadata.
  - New global CSV export endpoint /api/companies/export.
- **System Integrity:**
  - Fixed Notion sync typo ('Stanardization').
  - Corrected Nginx proxy routing and FastAPI route ordering.
  - Ensured DB persistence via explicit docker-compose volume mapping.
2026-01-24 09:56:59 +00:00

85 lines
2.9 KiB
Python

import sqlite3
import sys
import os
import logging
# Add parent path to import config
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from backend.config import settings
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Extract DB file path from SQLAlchemy URL
DB_FILE = settings.DATABASE_URL.replace("sqlite:///", "")
def get_db_connection():
"""Establishes a connection to the SQLite database."""
return sqlite3.connect(DB_FILE)
def get_table_columns(cursor, table_name):
"""Returns a list of column names for a given table."""
cursor.execute(f"PRAGMA table_info({table_name})")
return [row[1] for row in cursor.fetchall()]
def migrate_tables():
"""
Adds new columns to existing tables to support v0.7.0 features.
"""
logger.info(f"Connecting to database at {DB_FILE} to run migrations...")
conn = get_db_connection()
cursor = conn.cursor()
try:
# 1. Update INDUSTRIES Table
logger.info("Checking 'industries' table schema...")
ind_columns = get_table_columns(cursor, "industries")
ind_migrations = {
"metric_type": "TEXT",
"scraper_search_term": "TEXT",
"standardization_logic": "TEXT",
"proxy_factor": "FLOAT",
"scraper_keywords": "TEXT",
"scraper_search_term": "TEXT"
}
for col, col_type in ind_migrations.items():
if col not in ind_columns:
logger.info(f"Adding column '{col}' to 'industries' table...")
cursor.execute(f"ALTER TABLE industries ADD COLUMN {col} {col_type}")
# 2. Update COMPANIES Table (New for v0.7.0)
logger.info("Checking 'companies' table schema...")
comp_columns = get_table_columns(cursor, "companies")
comp_migrations = {
"status": "TEXT", # Added to fix missing column error
"calculated_metric_name": "TEXT",
"calculated_metric_value": "FLOAT",
"calculated_metric_unit": "TEXT",
"standardized_metric_value": "FLOAT",
"standardized_metric_unit": "TEXT",
"metric_source": "TEXT",
"metric_source_url": "TEXT"
}
for col, col_type in comp_migrations.items():
if col not in comp_columns:
logger.info(f"Adding column '{col}' to 'companies' table...")
cursor.execute(f"ALTER TABLE companies ADD COLUMN {col} {col_type}")
conn.commit()
logger.info("All migrations completed successfully.")
except Exception as e:
logger.error(f"An error occurred during migration: {e}", exc_info=True)
conn.rollback()
finally:
conn.close()
if __name__ == "__main__":
if not os.path.exists(DB_FILE):
logger.error(f"Database file not found at {DB_FILE}.")
else:
migrate_tables()