This commit introduces the foundational elements for the new "Company Explorer" web application, marking a significant step away from the legacy Google Sheets / CLI system. Key changes include: - Project Structure: A new directory with separate (FastAPI) and (React/Vite) components. - Data Persistence: Migration from Google Sheets to a local SQLite database () using SQLAlchemy. - Core Utilities: Extraction and cleanup of essential helper functions (LLM wrappers, text utilities) into . - Backend Services: , , for AI-powered analysis, and logic. - Frontend UI: Basic React application with company table, import wizard, and dynamic inspector sidebar. - Docker Integration: Updated and for multi-stage builds and sideloading. - Deployment & Access: Integrated into central Nginx proxy and dashboard, accessible via . Lessons Learned & Fixed during development: - Frontend Asset Loading: Addressed issues with Vite's path and FastAPI's . - TypeScript Configuration: Added and . - Database Schema Evolution: Solved errors by forcing a new database file and correcting override. - Logging: Implemented robust file-based logging (). This new foundation provides a powerful and maintainable platform for future B2B robotics lead generation.
314 lines
11 KiB
Python
314 lines
11 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, Query, BackgroundTasks
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from typing import List, Optional, Dict, Any
|
|
from pydantic import BaseModel
|
|
from datetime import datetime
|
|
import os
|
|
import sys
|
|
|
|
from .config import settings
|
|
from .lib.logging_setup import setup_logging
|
|
|
|
# Setup Logging first
|
|
setup_logging()
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from .database import init_db, get_db, Company, Signal, EnrichmentData
|
|
from .services.deduplication import Deduplicator
|
|
from .services.discovery import DiscoveryService
|
|
from .services.scraping import ScraperService
|
|
from .services.classification import ClassificationService
|
|
|
|
# Initialize App
|
|
app = FastAPI(
|
|
title=settings.APP_NAME,
|
|
version=settings.VERSION,
|
|
description="Backend for Company Explorer (Robotics Edition)",
|
|
root_path="/ce"
|
|
)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Service Singletons
|
|
scraper = ScraperService()
|
|
classifier = ClassificationService()
|
|
discovery = DiscoveryService()
|
|
|
|
# --- Pydantic Models ---
|
|
class CompanyCreate(BaseModel):
|
|
name: str
|
|
city: Optional[str] = None
|
|
country: str = "DE"
|
|
website: Optional[str] = None
|
|
|
|
class BulkImportRequest(BaseModel):
|
|
names: List[str]
|
|
|
|
class AnalysisRequest(BaseModel):
|
|
company_id: int
|
|
force_scrape: bool = False
|
|
|
|
# --- Events ---
|
|
@app.on_event("startup")
|
|
def on_startup():
|
|
logger.info("Startup Event: Initializing Database...")
|
|
try:
|
|
init_db()
|
|
logger.info("Database initialized successfully.")
|
|
except Exception as e:
|
|
logger.critical(f"Database init failed: {e}", exc_info=True)
|
|
|
|
# --- Routes ---
|
|
|
|
@app.get("/api/health")
|
|
def health_check():
|
|
return {"status": "ok", "version": settings.VERSION, "db": settings.DATABASE_URL}
|
|
|
|
@app.get("/api/companies")
|
|
def list_companies(
|
|
skip: int = 0,
|
|
limit: int = 50,
|
|
search: Optional[str] = None,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
try:
|
|
query = db.query(Company)
|
|
if search:
|
|
query = query.filter(Company.name.ilike(f"%{search}%"))
|
|
|
|
total = query.count()
|
|
# Sort by ID desc (newest first)
|
|
items = query.order_by(Company.id.desc()).offset(skip).limit(limit).all()
|
|
|
|
return {"total": total, "items": items}
|
|
except Exception as e:
|
|
logger.error(f"List Companies Error: {e}", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.get("/api/companies/{company_id}")
|
|
def get_company(company_id: int, db: Session = Depends(get_db)):
|
|
company = db.query(Company).options(joinedload(Company.signals)).filter(Company.id == company_id).first()
|
|
if not company:
|
|
raise HTTPException(status_code=404, detail="Company not found")
|
|
return company
|
|
|
|
@app.post("/api/companies/bulk")
|
|
def bulk_import_names(req: BulkImportRequest, db: Session = Depends(get_db)):
|
|
"""
|
|
Quick import for testing. Just a list of names.
|
|
"""
|
|
logger.info(f"Starting bulk import of {len(req.names)} names.")
|
|
try:
|
|
added = 0
|
|
skipped = 0
|
|
|
|
# Deduplicator init
|
|
try:
|
|
dedup = Deduplicator(db)
|
|
logger.info("Deduplicator initialized.")
|
|
except Exception as e:
|
|
logger.warning(f"Deduplicator init failed: {e}")
|
|
dedup = None
|
|
|
|
for name in req.names:
|
|
clean_name = name.strip()
|
|
if not clean_name: continue
|
|
|
|
# 1. Simple Deduplication (Exact Name)
|
|
exists = db.query(Company).filter(Company.name == clean_name).first()
|
|
if exists:
|
|
skipped += 1
|
|
continue
|
|
|
|
# 2. Smart Deduplication (if available)
|
|
if dedup:
|
|
matches = dedup.find_duplicates({"name": clean_name})
|
|
if matches and matches[0]['score'] > 95:
|
|
logger.info(f"Duplicate found for {clean_name}: {matches[0]['name']}")
|
|
skipped += 1
|
|
continue
|
|
|
|
# 3. Create
|
|
new_comp = Company(
|
|
name=clean_name,
|
|
status="NEW" # This triggered the error before
|
|
)
|
|
db.add(new_comp)
|
|
added += 1
|
|
|
|
db.commit()
|
|
logger.info(f"Import success. Added: {added}, Skipped: {skipped}")
|
|
return {"added": added, "skipped": skipped}
|
|
except Exception as e:
|
|
logger.error(f"Bulk Import Failed: {e}", exc_info=True)
|
|
db.rollback()
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/api/enrich/discover")
|
|
def discover_company(req: AnalysisRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
|
|
"""
|
|
Triggers Stage 1: Discovery (Website Search + Wikipedia Search)
|
|
"""
|
|
try:
|
|
company = db.query(Company).filter(Company.id == req.company_id).first()
|
|
if not company:
|
|
raise HTTPException(404, "Company not found")
|
|
|
|
# Run in background
|
|
background_tasks.add_task(run_discovery_task, company.id)
|
|
|
|
return {"status": "queued", "message": f"Discovery started for {company.name}"}
|
|
except Exception as e:
|
|
logger.error(f"Discovery Error: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
def run_discovery_task(company_id: int):
|
|
# New Session for Background Task
|
|
from .database import SessionLocal
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter(Company.id == company_id).first()
|
|
if not company: return
|
|
|
|
logger.info(f"Running Discovery Task for {company.name}")
|
|
|
|
# 1. Website Search
|
|
if not company.website or company.website == "k.A.":
|
|
found_url = discovery.find_company_website(company.name, company.city)
|
|
if found_url and found_url != "k.A.":
|
|
company.website = found_url
|
|
logger.info(f"-> Found URL: {found_url}")
|
|
|
|
# 2. Wikipedia Search
|
|
wiki_url = discovery.find_wikipedia_url(company.name)
|
|
company.last_wiki_search_at = datetime.utcnow()
|
|
|
|
existing_wiki = db.query(EnrichmentData).filter(
|
|
EnrichmentData.company_id == company.id,
|
|
EnrichmentData.source_type == "wikipedia_url"
|
|
).first()
|
|
|
|
if not existing_wiki:
|
|
db.add(EnrichmentData(company_id=company.id, source_type="wikipedia_url", content={"url": wiki_url}))
|
|
else:
|
|
existing_wiki.content = {"url": wiki_url}
|
|
existing_wiki.updated_at = datetime.utcnow()
|
|
|
|
if company.status == "NEW" and company.website and company.website != "k.A.":
|
|
company.status = "DISCOVERED"
|
|
|
|
db.commit()
|
|
logger.info(f"Discovery finished for {company.id}")
|
|
except Exception as e:
|
|
logger.error(f"Background Task Error: {e}", exc_info=True)
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
@app.post("/api/enrich/analyze")
|
|
def analyze_company(req: AnalysisRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
|
|
company = db.query(Company).filter(Company.id == req.company_id).first()
|
|
if not company:
|
|
raise HTTPException(404, "Company not found")
|
|
|
|
if not company.website or company.website == "k.A.":
|
|
return {"error": "No website to analyze. Run Discovery first."}
|
|
|
|
background_tasks.add_task(run_analysis_task, company.id, company.website)
|
|
return {"status": "queued"}
|
|
|
|
def run_analysis_task(company_id: int, url: str):
|
|
from .database import SessionLocal
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter(Company.id == company_id).first()
|
|
if not company: return
|
|
|
|
logger.info(f"Running Analysis Task for {company.name}")
|
|
|
|
# 1. Scrape Website
|
|
scrape_result = scraper.scrape_url(url)
|
|
|
|
# Save Scrape Data
|
|
existing_scrape_data = db.query(EnrichmentData).filter(
|
|
EnrichmentData.company_id == company.id,
|
|
EnrichmentData.source_type == "website_scrape"
|
|
).first()
|
|
|
|
if "text" in scrape_result and scrape_result["text"]:
|
|
if not existing_scrape_data:
|
|
db.add(EnrichmentData(company_id=company.id, source_type="website_scrape", content=scrape_result))
|
|
else:
|
|
existing_scrape_data.content = scrape_result
|
|
existing_scrape_data.updated_at = datetime.utcnow()
|
|
elif "error" in scrape_result:
|
|
logger.warning(f"Scraping failed for {company.name}: {scrape_result['error']}")
|
|
|
|
# 2. Classify Robotics Potential
|
|
if "text" in scrape_result and scrape_result["text"]:
|
|
analysis = classifier.analyze_robotics_potential(
|
|
company_name=company.name,
|
|
website_text=scrape_result["text"]
|
|
)
|
|
|
|
if "error" in analysis:
|
|
logger.error(f"Robotics classification failed for {company.name}: {analysis['error']}")
|
|
else:
|
|
industry = analysis.get("industry")
|
|
if industry:
|
|
company.industry_ai = industry
|
|
|
|
# Delete old signals
|
|
db.query(Signal).filter(Signal.company_id == company.id).delete()
|
|
|
|
# Save new signals
|
|
potentials = analysis.get("potentials", {})
|
|
for signal_type, data in potentials.items():
|
|
new_signal = Signal(
|
|
company_id=company.id,
|
|
signal_type=f"robotics_{signal_type}_potential",
|
|
confidence=data.get("score", 0),
|
|
value="High" if data.get("score", 0) > 70 else "Medium" if data.get("score", 0) > 30 else "Low",
|
|
proof_text=data.get("reason")
|
|
)
|
|
db.add(new_signal)
|
|
|
|
company.status = "ENRICHED"
|
|
company.last_classification_at = datetime.utcnow()
|
|
logger.info(f"Robotics analysis complete for {company.name}.")
|
|
|
|
db.commit()
|
|
logger.info(f"Analysis finished for {company.id}")
|
|
except Exception as e:
|
|
logger.error(f"Analyze Task Error: {e}", exc_info=True)
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
# --- Serve Frontend ---
|
|
# Priority 1: Container Path (outside of /app volume)
|
|
static_path = "/frontend_static"
|
|
|
|
# Priority 2: Local Dev Path (relative to this file)
|
|
if not os.path.exists(static_path):
|
|
static_path = os.path.join(os.path.dirname(__file__), "../static")
|
|
|
|
if os.path.exists(static_path):
|
|
logger.info(f"Serving frontend from {static_path}")
|
|
app.mount("/", StaticFiles(directory=static_path, html=True), name="static")
|
|
else:
|
|
logger.warning(f"Frontend static files not found at {static_path} or local fallback.")
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run("backend.app:app", host="0.0.0.0", port=8000, reload=True) |