Files
Brancheneinstufung2/company-explorer/backend/app.py
Floke c900e96b5f [31f88f42] Keine neuen Commits in dieser Session.
Keine neuen Commits in dieser Session.
2026-03-10 13:54:07 +00:00

1717 lines
58 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, Query, BackgroundTasks, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session, joinedload
from typing import List, Optional, Dict, Any
from pydantic import BaseModel
from datetime import datetime
import os
import sys
import uuid
import shutil
import re
from collections import Counter
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
security = HTTPBasic()
async def authenticate_user(credentials: HTTPBasicCredentials = Depends(security)):
correct_username = secrets.compare_digest(credentials.username, os.getenv("API_USER", "default_user"))
correct_password = secrets.compare_digest(credentials.password, os.getenv("API_PASSWORD", "default_password"))
if not (correct_username and correct_password):
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Basic"},
)
return credentials.username
from .config import settings
from .lib.logging_setup import setup_logging
# Setup Logging first
setup_logging()
import logging
logger = logging.getLogger(__name__)
from .database import init_db, get_db, Company, Signal, EnrichmentData, RoboticsCategory, Contact, Industry, JobRolePattern, ReportedMistake, MarketingMatrix, Persona, RawJobTitle
from .services.deduplication import Deduplicator
from .services.discovery import DiscoveryService
from .services.scraping import ScraperService
from .services.classification import ClassificationService
from .services.role_mapping import RoleMappingService
from .services.optimization import PatternOptimizationService
# Initialize App
app = FastAPI(
title=settings.APP_NAME,
version=settings.VERSION,
description="Backend for Company Explorer (Robotics Edition)"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Service Singletons
scraper = ScraperService()
classifier = ClassificationService() # Now works without args
discovery = DiscoveryService()
# Global State for Long-Running Optimization Task
optimization_status = {
"state": "idle", # idle, processing, completed, error
"progress": 0,
"result": None,
"error": None
}
# --- Pydantic Models ---
class CompanyCreate(BaseModel):
name: str
city: Optional[str] = None
country: str = "DE"
website: Optional[str] = None
crm_id: Optional[str] = None
class ContactCreate(BaseModel):
company_id: int
first_name: Optional[str] = None
last_name: Optional[str] = None
email: Optional[str] = None
job_title: Optional[str] = None
role: Optional[str] = None
is_primary: bool = True
class BulkImportRequest(BaseModel):
names: List[str]
class AnalysisRequest(BaseModel):
company_id: int
force_scrape: bool = False
class IndustryUpdateModel(BaseModel):
industry_ai: str
class ReportMistakeRequest(BaseModel):
field_name: str
wrong_value: Optional[str] = None
corrected_value: Optional[str] = None
source_url: Optional[str] = None
quote: Optional[str] = None
user_comment: Optional[str] = None
class CompanyMatchRequest(BaseModel):
name: str
website: Optional[str] = None
city: Optional[str] = None
country: Optional[str] = "Deutschland"
class ProvisioningRequest(BaseModel):
so_contact_id: int
so_person_id: Optional[int] = None
crm_name: Optional[str] = None
crm_website: Optional[str] = None
job_title: Optional[str] = None
crm_industry_name: Optional[str] = None
campaign_tag: Optional[str] = None # NEW: e.g. "messe_2026"
class ProvisioningResponse(BaseModel):
status: str
company_name: str
website: Optional[str] = None
vertical_name: Optional[str] = None
role_name: Optional[str] = None
opener: Optional[str] = None # Primary opener (Infrastructure/Cleaning)
opener_secondary: Optional[str] = None # Secondary opener (Service/Logistics)
summary: Optional[str] = None # NEW: AI Research Dossier
texts: Dict[str, Optional[str]] = {}
unsubscribe_link: Optional[str] = None
# Enrichment Data for Write-Back
address_city: Optional[str] = None
address_zip: Optional[str] = None
address_street: Optional[str] = None
address_country: Optional[str] = None
vat_id: Optional[str] = None
class IndustryDetails(BaseModel):
pains: Optional[str] = None
gains: Optional[str] = None
priority: Optional[str] = None
notes: Optional[str] = None
ops_focus_secondary: bool = False
class Config:
from_attributes = True
class MarketingMatrixUpdate(BaseModel):
subject: Optional[str] = None
intro: Optional[str] = None
social_proof: Optional[str] = None
class MarketingMatrixResponse(BaseModel):
id: int
industry_id: int
persona_id: int
campaign_tag: str
industry_name: str
persona_name: str
subject: Optional[str] = None
intro: Optional[str] = None
social_proof: Optional[str] = None
updated_at: datetime
class Config:
from_attributes = True
class ContactResponse(BaseModel):
id: int
first_name: Optional[str] = None
last_name: Optional[str] = None
job_title: Optional[str] = None
role: Optional[str] = None
email: Optional[str] = None
is_primary: bool
class Config:
from_attributes = True
class EnrichmentDataResponse(BaseModel):
id: int
source_type: str
content: Dict[str, Any]
is_locked: bool
wiki_verified_empty: bool
updated_at: datetime
class Config:
from_attributes = True
class CompanyDetailsResponse(BaseModel):
id: int
name: str
website: Optional[str] = None
city: Optional[str] = None
country: Optional[str] = None
industry_ai: Optional[str] = None
status: str
# Metrics
calculated_metric_name: Optional[str] = None
calculated_metric_value: Optional[float] = None
calculated_metric_unit: Optional[str] = None
standardized_metric_value: Optional[float] = None
standardized_metric_unit: Optional[str] = None
metric_source: Optional[str] = None
metric_proof_text: Optional[str] = None
metric_source_url: Optional[str] = None
metric_confidence: Optional[float] = None
# Openers
ai_opener: Optional[str] = None
ai_opener_secondary: Optional[str] = None
research_dossier: Optional[str] = None
# Relations
industry_details: Optional[IndustryDetails] = None
contacts: List[ContactResponse] = []
enrichment_data: List[EnrichmentDataResponse] = []
class Config:
from_attributes = True
# --- Events ---
@app.on_event("startup")
def on_startup():
logger.info("Startup Event: Initializing Database...")
try:
init_db()
logger.info("Database initialized successfully.")
except Exception as e:
logger.critical(f"Database init failed: {e}", exc_info=True)
# --- Public Routes (No Auth) ---
from fastapi.responses import HTMLResponse
@app.get("/unsubscribe/{token}", response_class=HTMLResponse)
def unsubscribe_contact(token: str, db: Session = Depends(get_db)):
contact = db.query(Contact).filter(Contact.unsubscribe_token == token).first()
success_html = """
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<title>Abmeldung erfolgreich</title>
<style>
body { font-family: sans-serif; text-align: center; padding: 40px; }
h1 { color: #333; }
</style>
</head>
<body>
<h1>Sie wurden erfolgreich abgemeldet.</h1>
<p>Sie werden keine weiteren Marketing-E-Mails von uns erhalten.</p>
</body>
</html>
"""
error_html = """
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<title>Fehler bei der Abmeldung</title>
<style>
body { font-family: sans-serif; text-align: center; padding: 40px; }
h1 { color: #d9534f; }
</style>
</head>
<body>
<h1>Abmeldung fehlgeschlagen.</h1>
<p>Der von Ihnen verwendete Link ist ungültig oder abgelaufen. Bitte kontaktieren Sie uns bei Problemen direkt.</p>
</body>
</html>
"""
if not contact:
logger.warning(f"Unsubscribe attempt with invalid token: {token}")
return HTMLResponse(content=error_html, status_code=404)
if contact.status == "unsubscribed":
logger.info(f"Contact {contact.id} already unsubscribed, showing success page anyway.")
return HTMLResponse(content=success_html, status_code=200)
contact.status = "unsubscribed"
contact.updated_at = datetime.utcnow()
db.commit()
logger.info(f"Contact {contact.id} ({contact.email}) unsubscribed successfully via token.")
# Here you would trigger the sync back to SuperOffice in a background task
# background_tasks.add_task(sync_unsubscribe_to_superoffice, contact.id)
return HTMLResponse(content=success_html, status_code=200)
# --- API Routes ---
@app.get("/api/health")
def health_check(username: str = Depends(authenticate_user)):
return {"status": "ok", "version": settings.VERSION, "db": settings.DATABASE_URL}
@app.post("/api/match-company/reload")
async def reload_matching_service(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
"""
Forces the matching service (Deduplicator) to reload all company records from DB.
Should be called after major imports or SuperOffice syncs.
"""
try:
app.state.deduplicator = Deduplicator(db)
return {
"status": "success",
"records_loaded": len(app.state.deduplicator.reference_data)
}
except Exception as e:
logger.error(f"Failed to reload matching service: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/match-company")
async def match_company(request: CompanyMatchRequest, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
"""
Centralized Account Matching Service.
Checks if a company already exists in SuperOffice (via Company Explorer DB).
Returns list of matches with scores and CRM IDs.
"""
try:
# Lazy initialization of Deduplicator
if not hasattr(app.state, 'deduplicator'):
logger.info("Initializing Deduplicator for the first time...")
app.state.deduplicator = Deduplicator(db)
# Prepare Candidate dict for the service
candidate = {
'name': request.name,
'website': request.website,
'city': request.city,
'country': request.country
}
results = app.state.deduplicator.find_duplicates(candidate)
# Return structured results
return {
"query": candidate,
"match_found": len(results) > 0,
"best_match": results[0] if results else None,
"all_matches": results
}
except Exception as e:
logger.error(f"Error in company matching: {e}")
import traceback
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Matching failed: {str(e)}")
@app.post("/api/provision/superoffice-contact", response_model=ProvisioningResponse)
def provision_superoffice_contact(
req: ProvisioningRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
# 1. Find Company (via SO ID)
company = db.query(Company).filter(Company.crm_id == str(req.so_contact_id)).first()
if not company:
# AUTO-CREATE Logic
if not req.crm_name:
# Cannot create without name. Should ideally not happen if Connector does its job.
raise HTTPException(400, "Cannot create company: crm_name missing")
company = Company(
name=req.crm_name,
crm_id=str(req.so_contact_id),
crm_name=req.crm_name,
crm_website=req.crm_website,
status="NEW"
)
db.add(company)
db.commit()
db.refresh(company)
logger.info(f"Auto-created company {company.name} from SuperOffice request.")
# Trigger Discovery
background_tasks.add_task(run_discovery_task, company.id)
return ProvisioningResponse(
status="processing",
company_name=company.name
)
# 1b. Check Status & Progress
# If NEW or DISCOVERED, we are not ready to provide texts.
if company.status in ["NEW", "DISCOVERED"]:
# If we have a website, ensure analysis is triggered
if company.status == "DISCOVERED" or (company.website and company.website != "k.A."):
background_tasks.add_task(run_analysis_task, company.id)
elif company.status == "NEW":
# Ensure discovery runs
background_tasks.add_task(run_discovery_task, company.id)
return ProvisioningResponse(
status="processing",
company_name=company.name
)
# 1c. Update CRM Snapshot Data (The Double Truth)
changed = False
name_changed_significantly = False
if req.crm_name and req.crm_name != company.crm_name:
logger.info(f"CRM Name Change detected for ID {company.crm_id}: {company.crm_name} -> {req.crm_name}")
company.crm_name = req.crm_name
# If the name changes, we should potentially re-evaluate the whole company
# especially if the status was already ENRICHED
if company.status == "ENRICHED":
name_changed_significantly = True
changed = True
if req.crm_website:
if company.crm_website != req.crm_website:
company.crm_website = req.crm_website
changed = True
# ...
if changed:
company.updated_at = datetime.utcnow()
if name_changed_significantly:
logger.info(f"Triggering FRESH discovery for {company.name} due to CRM name change.")
company.status = "NEW"
# We don't change the internal 'name' yet, Discovery will do that or we keep it as anchor.
# But we must clear old results to avoid stale data.
company.industry_ai = None
company.ai_opener = None
company.ai_opener_secondary = None
background_tasks.add_task(run_discovery_task, company.id)
db.commit()
# If we just triggered a fresh discovery, tell the worker to wait.
if name_changed_significantly:
return ProvisioningResponse(
status="processing",
company_name=company.crm_name
)
# 2. Find Contact (Person)
if req.so_person_id is None:
# Just a company sync, but return all company-level metadata
return ProvisioningResponse(
status="success",
company_name=company.name,
website=company.website,
vertical_name=company.industry_ai,
opener=company.ai_opener,
opener_secondary=company.ai_opener_secondary,
address_city=company.city,
address_street=company.street,
address_zip=company.zip_code,
address_country=company.country,
vat_id=company.crm_vat
)
person = db.query(Contact).filter(Contact.so_person_id == req.so_person_id).first()
# Auto-Create/Update Person
if not person:
person = Contact(
company_id=company.id,
so_contact_id=req.so_contact_id,
so_person_id=req.so_person_id,
status="ACTIVE",
unsubscribe_token=str(uuid.uuid4())
)
db.add(person)
logger.info(f"Created new person {req.so_person_id} for company {company.name}")
# Update Job Title & Role logic
if req.job_title and req.job_title != person.job_title:
person.job_title = req.job_title
# New, service-based classification
role_mapping_service = RoleMappingService(db)
found_role = role_mapping_service.get_role_for_job_title(req.job_title)
if found_role != person.role:
logger.info(f"Role Change for {person.so_person_id} via Mapping Service: {person.role} -> {found_role}")
person.role = found_role
if not found_role:
# If no role was found, we log it for future pattern mining
role_mapping_service.add_or_update_unclassified_title(req.job_title)
db.commit()
db.refresh(person)
# 3. Determine Role
role_name = person.role
# 4. Determine Vertical (Industry)
vertical_name = company.industry_ai
# 5. Fetch Texts from Matrix
texts = {"subject": None, "intro": None, "social_proof": None}
if vertical_name and role_name:
industry_obj = db.query(Industry).filter(Industry.name == vertical_name).first()
persona_obj = db.query(Persona).filter(Persona.name == role_name).first()
if industry_obj and persona_obj:
# Try to find a campaign-specific entry first
matrix_entry = db.query(MarketingMatrix).filter(
MarketingMatrix.industry_id == industry_obj.id,
MarketingMatrix.persona_id == persona_obj.id,
MarketingMatrix.campaign_tag == req.campaign_tag
).first()
# Fallback to standard if no specific entry is found
if not matrix_entry:
matrix_entry = db.query(MarketingMatrix).filter(
MarketingMatrix.industry_id == industry_obj.id,
MarketingMatrix.persona_id == persona_obj.id,
MarketingMatrix.campaign_tag == "standard"
).first()
if matrix_entry:
texts["subject"] = matrix_entry.subject
texts["intro"] = matrix_entry.intro
texts["social_proof"] = matrix_entry.social_proof
# 6. Construct Unsubscribe Link
unsubscribe_link = None
if person and person.unsubscribe_token:
unsubscribe_link = f"{settings.APP_BASE_URL.rstrip('/')}/unsubscribe/{person.unsubscribe_token}"
return ProvisioningResponse(
status="success",
company_name=company.name,
website=company.website,
vertical_name=vertical_name,
role_name=role_name,
opener=company.ai_opener,
opener_secondary=company.ai_opener_secondary,
summary=company.research_dossier,
texts=texts,
unsubscribe_link=unsubscribe_link,
address_city=company.city,
address_street=company.street,
address_zip=company.zip_code,
address_country=company.country,
# TODO: Add VAT field to Company model if not present, for now using crm_vat if available
vat_id=company.crm_vat
)
@app.get("/api/companies")
def list_companies(
skip: int = 0,
limit: int = 50,
search: Optional[str] = None,
sort_by: Optional[str] = Query("name_asc"),
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
try:
query = db.query(Company)
if search:
query = query.filter(Company.name.ilike(f"%{search}%"))
total = query.count()
if sort_by == "updated_desc":
query = query.order_by(Company.updated_at.desc())
elif sort_by == "created_desc":
query = query.order_by(Company.id.desc())
else: # Default: name_asc
query = query.order_by(Company.name.asc())
items = query.offset(skip).limit(limit).all()
# Efficiently check for pending mistakes
company_ids = [c.id for c in items]
if company_ids:
pending_mistakes = db.query(ReportedMistake.company_id).filter(
ReportedMistake.company_id.in_(company_ids),
ReportedMistake.status == 'PENDING'
).distinct().all()
companies_with_pending_mistakes = {row[0] for row in pending_mistakes}
else:
companies_with_pending_mistakes = set()
# Add the flag to each company object
for company in items:
company.has_pending_mistakes = company.id in companies_with_pending_mistakes
return {"total": total, "items": items}
except Exception as e:
logger.error(f"List Companies Error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/companies/export")
def export_companies_csv(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
"""
Exports a CSV of all companies with their key metrics.
"""
import io
import csv
from fastapi.responses import StreamingResponse
output = io.StringIO()
# Add UTF-8 BOM for Excel
output.write('\ufeff')
writer = csv.writer(output)
# Header
writer.writerow([
"ID", "Name", "Website", "City", "Country", "AI Industry",
"Metric Name", "Metric Value", "Metric Unit", "Standardized Value (m2)",
"Source", "Source URL", "Confidence", "Proof Text"
])
companies = db.query(Company).order_by(Company.name.asc()).all()
for c in companies:
writer.writerow([
c.id, c.name, c.website, c.city, c.country, c.industry_ai,
c.calculated_metric_name,
c.calculated_metric_value,
c.calculated_metric_unit,
c.standardized_metric_value,
c.metric_source,
c.metric_source_url,
c.metric_confidence,
c.metric_proof_text
])
output.seek(0)
return StreamingResponse(
output,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=company_export_{datetime.utcnow().strftime('%Y-%m-%d')}.csv"}
)
@app.get("/api/companies/{company_id}", response_model=CompanyDetailsResponse)
def get_company(company_id: int, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).options(
joinedload(Company.enrichment_data),
joinedload(Company.contacts)
).filter(Company.id == company_id).first()
if not company:
raise HTTPException(404, detail="Company not found")
# Enrich with Industry Details (Strategy)
industry_details = None
if company.industry_ai:
ind = db.query(Industry).filter(Industry.name == company.industry_ai).first()
if ind:
industry_details = IndustryDetails.model_validate(ind)
# FastAPI will automatically serialize the 'company' ORM object into the
# CompanyDetailsResponse schema. We just need to attach the extra 'industry_details'.
response_data = CompanyDetailsResponse.model_validate(company)
response_data.industry_details = industry_details
return response_data
@app.post("/api/companies")
def create_company(company: CompanyCreate, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
db_company = db.query(Company).filter(Company.name == company.name).first()
if db_company:
raise HTTPException(status_code=400, detail="Company already registered")
new_company = Company(
name=company.name,
city=company.city,
country=company.country,
website=company.website,
crm_id=company.crm_id,
status="NEW"
)
db.add(new_company)
db.commit()
db.refresh(new_company)
return new_company
@app.post("/api/companies/bulk")
def bulk_import_companies(req: BulkImportRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
imported_count = 0
for name in req.names:
name = name.strip()
if not name: continue
exists = db.query(Company).filter(Company.name == name).first()
if not exists:
new_company = Company(name=name, status="NEW")
db.add(new_company)
imported_count += 1
# Optional: Auto-trigger discovery
# background_tasks.add_task(run_discovery_task, new_company.id)
db.commit()
return {"status": "success", "imported": imported_count}
@app.post("/api/contacts")
def create_contact_endpoint(contact: ContactCreate, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
# Check if company exists
company = db.query(Company).filter(Company.id == contact.company_id).first()
if not company:
raise HTTPException(status_code=404, detail="Company not found")
# Automatic Role Mapping logic
final_role = contact.role
if contact.job_title and not final_role:
role_mapping_service = RoleMappingService(db)
found_role = role_mapping_service.get_role_for_job_title(contact.job_title)
if found_role:
final_role = found_role
else:
# Log unclassified title for future mining
role_mapping_service.add_or_update_unclassified_title(contact.job_title)
# Check if contact with same email already exists for this company
if contact.email:
existing = db.query(Contact).filter(Contact.company_id == contact.company_id, Contact.email == contact.email).first()
if existing:
# Update existing contact
existing.first_name = contact.first_name
existing.last_name = contact.last_name
existing.job_title = contact.job_title
existing.role = final_role
db.commit()
db.refresh(existing)
return existing
new_contact = Contact(
company_id=contact.company_id,
first_name=contact.first_name,
last_name=contact.last_name,
email=contact.email,
job_title=contact.job_title,
role=final_role,
is_primary=contact.is_primary,
status="ACTIVE",
unsubscribe_token=str(uuid.uuid4())
)
db.add(new_contact)
db.commit()
db.refresh(new_contact)
return new_contact
@app.post("/api/companies/{company_id}/override/wikipedia")
def override_wikipedia(company_id: int, url: str, background_tasks: BackgroundTasks, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
raise HTTPException(404, detail="Company not found")
# Create or update manual wikipedia lock
existing = db.query(EnrichmentData).filter(
EnrichmentData.company_id == company_id,
EnrichmentData.source_type == "wikipedia"
).first()
# If URL is empty, we might want to clear it or set it to "k.A."
# Assuming 'url' param carries the new URL.
wiki_data = {"url": url, "full_text": None, "manual_override": True}
if not existing:
db.add(EnrichmentData(
company_id=company_id,
source_type="wikipedia",
content=wiki_data,
is_locked=True
))
else:
existing.content = wiki_data
existing.is_locked = True
db.commit()
# Trigger Re-evaluation if URL is valid
if url and url.startswith("http"):
background_tasks.add_task(run_wikipedia_reevaluation_task, company.id)
return {"status": "updated"}
@app.get("/api/robotics/categories")
def list_robotics_categories(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
return db.query(RoboticsCategory).all()
@app.get("/api/industries")
def list_industries(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
return db.query(Industry).all()
@app.get("/api/job_roles")
def list_job_roles(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
return db.query(JobRolePattern).order_by(JobRolePattern.priority.asc()).all()
# --- Marketing Matrix Endpoints ---
@app.get("/api/matrix", response_model=List[MarketingMatrixResponse])
def get_marketing_matrix(
industry_id: Optional[int] = Query(None),
persona_id: Optional[int] = Query(None),
campaign_tag: Optional[str] = Query(None),
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
query = db.query(MarketingMatrix).options(
joinedload(MarketingMatrix.industry),
joinedload(MarketingMatrix.persona)
)
if industry_id:
query = query.filter(MarketingMatrix.industry_id == industry_id)
if persona_id:
query = query.filter(MarketingMatrix.persona_id == persona_id)
if campaign_tag:
query = query.filter(MarketingMatrix.campaign_tag == campaign_tag)
entries = query.all()
# Map to response model
return [
MarketingMatrixResponse(
id=e.id,
industry_id=e.industry_id,
persona_id=e.persona_id,
campaign_tag=e.campaign_tag,
industry_name=e.industry.name if e.industry else "Unknown",
persona_name=e.persona.name if e.persona else "Unknown",
subject=e.subject,
intro=e.intro,
social_proof=e.social_proof,
updated_at=e.updated_at
) for e in entries
]
@app.get("/api/matrix/export")
def export_matrix_csv(
industry_id: Optional[int] = Query(None),
persona_id: Optional[int] = Query(None),
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
"""
Exports a CSV of the marketing matrix, optionally filtered.
"""
import io
import csv
from fastapi.responses import StreamingResponse
query = db.query(MarketingMatrix).options(
joinedload(MarketingMatrix.industry),
joinedload(MarketingMatrix.persona)
)
if industry_id:
query = query.filter(MarketingMatrix.industry_id == industry_id)
if persona_id:
query = query.filter(MarketingMatrix.persona_id == persona_id)
entries = query.all()
output = io.StringIO()
# Add UTF-8 BOM for Excel
output.write('\ufeff')
writer = csv.writer(output)
# Header
writer.writerow([
"ID", "Industry", "Persona", "Subject", "Intro", "Social Proof", "Last Updated"
])
for e in entries:
writer.writerow([
e.id,
e.industry.name if e.industry else "Unknown",
e.persona.name if e.persona else "Unknown",
e.subject,
e.intro,
e.social_proof,
e.updated_at.strftime('%Y-%m-%d %H:%M:%S') if e.updated_at else "-"
])
output.seek(0)
filename = f"marketing_matrix_{datetime.utcnow().strftime('%Y-%m-%d')}.csv"
return StreamingResponse(
output,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
@app.put("/api/matrix/{entry_id}", response_model=MarketingMatrixResponse)
def update_matrix_entry(
entry_id: int,
data: MarketingMatrixUpdate,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
entry = db.query(MarketingMatrix).options(
joinedload(MarketingMatrix.industry),
joinedload(MarketingMatrix.persona)
).filter(MarketingMatrix.id == entry_id).first()
if not entry:
raise HTTPException(status_code=404, detail="Matrix entry not found")
if data.subject is not None:
entry.subject = data.subject
if data.intro is not None:
entry.intro = data.intro
if data.social_proof is not None:
entry.social_proof = data.social_proof
entry.updated_at = datetime.utcnow()
db.commit()
db.refresh(entry)
return MarketingMatrixResponse(
id=entry.id,
industry_id=entry.industry_id,
persona_id=entry.persona_id,
industry_name=entry.industry.name if entry.industry else "Unknown",
persona_name=entry.persona.name if entry.persona else "Unknown",
subject=entry.subject,
intro=entry.intro,
social_proof=entry.social_proof,
updated_at=entry.updated_at
)
@app.get("/api/matrix/personas")
def list_personas(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
return db.query(Persona).all()
class JobRolePatternCreate(BaseModel):
pattern_type: str
pattern_value: str
role: str
priority: int = 100
class JobRolePatternResponse(BaseModel):
id: int
pattern_type: str
pattern_value: str
role: str
priority: int
is_active: bool
created_by: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class ClassificationResponse(BaseModel):
status: str
processed: int
new_patterns: int
class OptimizationProposal(BaseModel):
target_role: str
regex: str
explanation: str
priority: int
covered_pattern_ids: List[int]
covered_titles: List[str]
false_positives: List[str]
class ApplyOptimizationRequest(BaseModel):
target_role: str
regex: str
priority: int
ids_to_delete: List[int]
def run_optimization_task():
global optimization_status
optimization_status["state"] = "processing"
optimization_status["result"] = None
optimization_status["error"] = None
from .database import SessionLocal
db = SessionLocal()
try:
optimizer = PatternOptimizationService(db)
proposals = optimizer.generate_proposals()
optimization_status["result"] = proposals
optimization_status["state"] = "completed"
except Exception as e:
logger.error(f"Optimization task failed: {e}", exc_info=True)
optimization_status["state"] = "error"
optimization_status["error"] = str(e)
finally:
db.close()
@app.post("/api/job_roles/optimize-start")
def start_pattern_optimization(
background_tasks: BackgroundTasks,
username: str = Depends(authenticate_user)
):
"""
Starts the optimization analysis in the background.
"""
global optimization_status
if optimization_status["state"] == "processing":
return {"status": "already_running"}
background_tasks.add_task(run_optimization_task)
return {"status": "started"}
@app.get("/api/job_roles/optimize-status")
def get_pattern_optimization_status(
username: str = Depends(authenticate_user)
):
"""
Poll this endpoint to get the result of the optimization.
"""
return optimization_status
@app.post("/api/job_roles/apply-optimization")
def apply_pattern_optimization(
req: ApplyOptimizationRequest,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
"""
Applies a proposal: Creates the new regex and deletes the obsolete exact patterns.
"""
# 1. Create new Regex Pattern
# Check duplicate first
existing = db.query(JobRolePattern).filter(JobRolePattern.pattern_value == req.regex).first()
if not existing:
new_pattern = JobRolePattern(
pattern_type="regex",
pattern_value=req.regex,
role=req.target_role,
priority=req.priority,
created_by="optimizer"
)
db.add(new_pattern)
logger.info(f"Optimization: Created new regex {req.regex} for {req.target_role}")
# 2. Delete covered Exact Patterns
if req.ids_to_delete:
db.query(JobRolePattern).filter(JobRolePattern.id.in_(req.ids_to_delete)).delete(synchronize_session=False)
logger.info(f"Optimization: Deleted {len(req.ids_to_delete)} obsolete patterns.")
db.commit()
return {"status": "success", "message": f"Created regex and removed {len(req.ids_to_delete)} old patterns."}
@app.post("/api/job_roles", response_model=JobRolePatternResponse)
def create_job_role(
job_role: JobRolePatternCreate,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
db_job_role = JobRolePattern(
pattern_type=job_role.pattern_type,
pattern_value=job_role.pattern_value,
role=job_role.role,
priority=job_role.priority,
created_by="user"
)
db.add(db_job_role)
db.commit()
db.refresh(db_job_role)
return db_job_role
@app.put("/api/job_roles/{role_id}", response_model=JobRolePatternResponse)
def update_job_role(
role_id: int,
job_role: JobRolePatternCreate,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
db_job_role = db.query(JobRolePattern).filter(JobRolePattern.id == role_id).first()
if not db_job_role:
raise HTTPException(status_code=404, detail="Job role not found")
db_job_role.pattern_type = job_role.pattern_type
db_job_role.pattern_value = job_role.pattern_value
db_job_role.role = job_role.role
db_job_role.priority = job_role.priority
db_job_role.updated_at = datetime.utcnow()
db.commit()
db.refresh(db_job_role)
return db_job_role
@app.delete("/api/job_roles/{role_id}")
def delete_job_role(
role_id: int,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
db_job_role = db.query(JobRolePattern).filter(JobRolePattern.id == role_id).first()
if not db_job_role:
raise HTTPException(status_code=404, detail="Job role not found")
db.delete(db_job_role)
db.commit()
return {"status": "deleted"}
@app.post("/api/job_roles/classify-batch", response_model=ClassificationResponse)
def classify_batch_job_roles(
background_tasks: BackgroundTasks,
username: str = Depends(authenticate_user)
):
"""
Triggers a background task to classify all unmapped job titles from the inbox.
"""
background_tasks.add_task(run_batch_classification_task)
return {"status": "queued", "processed": 0, "new_patterns": 0}
@app.get("/api/job_roles/raw")
def list_raw_job_titles(
limit: int = 100,
unmapped_only: bool = True,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
"""
Returns unique raw job titles from CRM imports, prioritized by frequency.
"""
query = db.query(RawJobTitle)
if unmapped_only:
query = query.filter(RawJobTitle.is_mapped == False)
return query.order_by(RawJobTitle.count.desc()).limit(limit).all()
@app.get("/api/job_roles/suggestions")
def get_job_role_suggestions(db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
"""
Analyzes existing contacts to suggest regex patterns based on frequent keywords per role.
"""
contacts = db.query(Contact).filter(Contact.role != None, Contact.job_title != None).all()
role_groups = {}
for c in contacts:
if c.role not in role_groups:
role_groups[c.role] = []
role_groups[c.role].append(c.job_title)
suggestions = {}
for role, titles in role_groups.items():
all_tokens = []
for t in titles:
# Simple cleaning: keep alphanum, lower
cleaned = re.sub(r'[^\w\s]', ' ', t).lower()
tokens = [w for w in cleaned.split() if len(w) > 3] # Ignore short words
all_tokens.extend(tokens)
common = Counter(all_tokens).most_common(10)
suggestions[role] = [{"word": w, "count": c} for w, c in common]
return suggestions
@app.get("/api/mistakes")
def list_reported_mistakes(
status: Optional[str] = Query(None),
company_id: Optional[int] = Query(None),
skip: int = 0,
limit: int = 50,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
query = db.query(ReportedMistake).options(joinedload(ReportedMistake.company))
if status:
query = query.filter(ReportedMistake.status == status.upper())
if company_id:
query = query.filter(ReportedMistake.company_id == company_id)
total = query.count()
items = query.order_by(ReportedMistake.created_at.desc()).offset(skip).limit(limit).all()
return {"total": total, "items": items}
class MistakeUpdateStatusRequest(BaseModel):
status: str # PENDING, APPROVED, REJECTED
@app.put("/api/mistakes/{mistake_id}")
def update_reported_mistake_status(
mistake_id: int,
request: MistakeUpdateStatusRequest,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
mistake = db.query(ReportedMistake).filter(ReportedMistake.id == mistake_id).first()
if not mistake:
raise HTTPException(404, detail="Reported mistake not found")
if request.status.upper() not in ["PENDING", "APPROVED", "REJECTED"]:
raise HTTPException(400, detail="Invalid status. Must be PENDING, APPROVED, or REJECTED.")
mistake.status = request.status.upper()
mistake.updated_at = datetime.utcnow()
db.commit()
db.refresh(mistake)
logger.info(f"Updated status for mistake {mistake_id} to {mistake.status}")
return {"status": "success", "mistake": mistake}
# --- Database Management ---
@app.get("/api/admin/database/download")
def download_database(username: str = Depends(authenticate_user)):
"""
Downloads the current SQLite database file.
"""
db_path = "/app/companies_v3_fixed_2.db"
if not os.path.exists(db_path):
raise HTTPException(status_code=404, detail="Database file not found")
filename = f"companies_backup_{datetime.utcnow().strftime('%Y-%m-%d_%H-%M')}.db"
return FileResponse(db_path, media_type="application/octet-stream", filename=filename)
@app.post("/api/admin/database/upload")
async def upload_database(
file: UploadFile = File(...),
username: str = Depends(authenticate_user)
):
"""
Uploads and replaces the SQLite database file. Creating a backup first.
"""
db_path = "/app/companies_v3_fixed_2.db"
backup_path = f"{db_path}.bak.{datetime.utcnow().strftime('%Y-%m-%d_%H-%M-%S')}"
try:
# Create Backup
if os.path.exists(db_path):
shutil.copy2(db_path, backup_path)
logger.info(f"Created database backup at {backup_path}")
# Save new file
with open(db_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
logger.info(f"Database replaced via upload by user {username}")
return {"status": "success", "message": "Database uploaded successfully. Please restart the container to apply changes."}
except Exception as e:
logger.error(f"Database upload failed: {e}", exc_info=True)
# Try to restore backup if something went wrong during write
if os.path.exists(backup_path):
shutil.copy2(backup_path, db_path)
logger.warning("Restored database from backup due to upload failure.")
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
# --- Regex Testing ---
class RegexTestRequest(BaseModel):
pattern: str
pattern_type: str = "regex" # regex, exact, startswith
test_string: str
@app.post("/api/job_roles/test-pattern")
def test_job_role_pattern(req: RegexTestRequest, username: str = Depends(authenticate_user)):
"""
Tests if a given pattern matches a test string.
"""
try:
is_match = False
normalized_test = req.test_string.lower().strip()
pattern = req.pattern.lower().strip()
if req.pattern_type == "regex":
if re.search(pattern, normalized_test, re.IGNORECASE):
is_match = True
elif req.pattern_type == "exact":
if pattern == normalized_test:
is_match = True
elif req.pattern_type == "startswith":
if normalized_test.startswith(pattern):
is_match = True
return {"match": is_match}
except re.error as e:
return {"match": False, "error": f"Invalid Regex: {str(e)}"}
except Exception as e:
logger.error(f"Pattern test error: {e}")
return {"match": False, "error": str(e)}
@app.post("/api/enrich/discover")
def discover_company(req: AnalysisRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).filter(Company.id == req.company_id).first()
if not company: raise HTTPException(404, "Company not found")
background_tasks.add_task(run_discovery_task, company.id)
return {"status": "queued"}
@app.post("/api/enrich/analyze")
def analyze_company(req: AnalysisRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).filter(Company.id == req.company_id).first()
if not company: raise HTTPException(404, "Company not found")
if not company.website or company.website == "k.A.":
return {"error": "No website to analyze. Run Discovery first."}
background_tasks.add_task(run_analysis_task, company.id)
return {"status": "queued"}
@app.put("/api/companies/{company_id}/industry")
def update_company_industry(
company_id: int,
data: IndustryUpdateModel,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
raise HTTPException(404, detail="Company not found")
# 1. Update Industry
company.industry_ai = data.industry_ai
company.updated_at = datetime.utcnow()
db.commit()
# 2. Trigger Metric Re-extraction in Background
background_tasks.add_task(run_metric_reextraction_task, company.id)
return {"status": "updated", "industry_ai": company.industry_ai}
@app.post("/api/companies/{company_id}/reevaluate-wikipedia")
def reevaluate_wikipedia(company_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
raise HTTPException(404, detail="Company not found")
background_tasks.add_task(run_wikipedia_reevaluation_task, company.id)
return {"status": "queued"}
@app.delete("/api/companies/{company_id}")
def delete_company(company_id: int, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
raise HTTPException(404, detail="Company not found")
# Delete related data first (Cascade might handle this but being explicit is safer)
db.query(EnrichmentData).filter(EnrichmentData.company_id == company_id).delete()
db.query(Signal).filter(Signal.company_id == company_id).delete()
db.query(Contact).filter(Contact.company_id == company_id).delete()
db.delete(company)
db.commit()
return {"status": "deleted"}
@app.post("/api/companies/{company_id}/override/website")
def override_website(company_id: int, url: str, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
raise HTTPException(404, detail="Company not found")
company.website = url
company.updated_at = datetime.utcnow()
db.commit()
return {"status": "updated", "website": company.website}
@app.post("/api/companies/{company_id}/override/impressum")
def override_impressum(company_id: int, url: str, background_tasks: BackgroundTasks, db: Session = Depends(get_db), username: str = Depends(authenticate_user)):
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
raise HTTPException(404, detail="Company not found")
# Create or update manual impressum lock
existing = db.query(EnrichmentData).filter(
EnrichmentData.company_id == company_id,
EnrichmentData.source_type == "impressum_override"
).first()
if not existing:
db.add(EnrichmentData(
company_id=company_id,
source_type="impressum_override",
content={"url": url},
is_locked=True
))
else:
existing.content = {"url": url}
existing.is_locked = True
db.commit()
return {"status": "updated"}
@app.post("/api/companies/{company_id}/report-mistake")
def report_company_mistake(
company_id: int,
request: ReportMistakeRequest,
db: Session = Depends(get_db),
username: str = Depends(authenticate_user)
):
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
raise HTTPException(404, detail="Company not found")
new_mistake = ReportedMistake(
company_id=company_id,
field_name=request.field_name,
wrong_value=request.wrong_value,
corrected_value=request.corrected_value,
source_url=request.source_url,
quote=request.quote,
user_comment=request.user_comment
)
db.add(new_mistake)
db.commit()
db.refresh(new_mistake)
logger.info(f"Reported mistake for company {company_id}: {request.field_name} -> {request.corrected_value}")
return {"status": "success", "mistake_id": new_mistake.id}
def run_wikipedia_reevaluation_task(company_id: int):
from .database import SessionLocal
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company: return
logger.info(f"Re-evaluating Wikipedia metric for {company.name} (Industry: {company.industry_ai})")
industry = db.query(Industry).filter(Industry.name == company.industry_ai).first()
if industry:
classifier.reevaluate_wikipedia_metric(company, db, industry)
logger.info(f"Wikipedia metric re-evaluation complete for {company.name}")
else:
logger.warning(f"Industry '{company.industry_ai}' not found for re-evaluation.")
except Exception as e:
logger.error(f"Wikipedia Re-evaluation Task Error: {e}", exc_info=True)
finally:
db.close()
def run_metric_reextraction_task(company_id: int):
from .database import SessionLocal
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company: return
logger.info(f"Re-extracting metrics for {company.name} (Industry: {company.industry_ai})")
industries = db.query(Industry).all()
industry = next((i for i in industries if i.name == company.industry_ai), None)
if industry:
classifier.extract_metrics_for_industry(company, db, industry)
company.status = "ENRICHED"
db.commit()
logger.info(f"Metric re-extraction complete for {company.name}")
else:
logger.warning(f"Industry '{company.industry_ai}' not found for re-extraction.")
except Exception as e:
logger.error(f"Metric Re-extraction Task Error: {e}", exc_info=True)
finally:
db.close()
def run_discovery_task(company_id: int):
from .database import SessionLocal
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company: return
# 1. Website Search
if not company.website or company.website == "k.A.":
found_url = discovery.find_company_website(company.name, company.city)
if found_url and found_url != "k.A.":
company.website = found_url
# 2. Wikipedia Search
existing_wiki = db.query(EnrichmentData).filter(
EnrichmentData.company_id == company.id,
EnrichmentData.source_type == "wikipedia"
).first()
if not existing_wiki or not existing_wiki.is_locked:
wiki_url = discovery.find_wikipedia_url(company.name, website=company.website, city=company.city)
wiki_data = discovery.extract_wikipedia_data(wiki_url) if wiki_url and wiki_url != "k.A." else {"url": wiki_url}
if not existing_wiki:
db.add(EnrichmentData(company_id=company.id, source_type="wikipedia", content=wiki_data))
else:
existing_wiki.content = wiki_data
existing_wiki.updated_at = datetime.utcnow()
if company.status == "NEW" and company.website and company.website != "k.A.":
company.status = "DISCOVERED"
db.commit()
except Exception as e:
logger.error(f"Discovery Task Error: {e}", exc_info=True)
finally:
db.close()
def run_analysis_task(company_id: int):
from .database import SessionLocal
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == company_id).first()
if not company:
logger.error(f"Analysis Task: Company with ID {company_id} not found.")
return
logger.info(f"--- [BACKGROUND TASK] Starting for {company.name} ---")
# --- 1. Scrape Website (if not locked) ---
existing_scrape = db.query(EnrichmentData).filter(
EnrichmentData.company_id == company.id,
EnrichmentData.source_type == "website_scrape"
).first()
if not existing_scrape or not existing_scrape.is_locked:
logger.info(f"Scraping website for {company.name}...")
scrape_res = scraper.scrape_url(company.website)
if not existing_scrape:
db.add(EnrichmentData(company_id=company.id, source_type="website_scrape", content=scrape_res))
logger.info("Created new website_scrape entry.")
else:
existing_scrape.content = scrape_res
existing_scrape.updated_at = datetime.utcnow()
logger.info("Updated existing website_scrape entry.")
db.commit()
else:
logger.info("Website scrape is locked. Skipping.")
# --- 2. Classify Industry & Metrics ---
logger.info(f"Handing over to ClassificationService for {company.name}...")
classifier.classify_company_potential(company, db)
company.status = "ENRICHED"
db.commit()
logger.info(f"--- [BACKGROUND TASK] Successfully finished for {company.name} ---")
except Exception as e:
logger.critical(f"--- [BACKGROUND TASK] CRITICAL ERROR for Company ID {company_id} ---", exc_info=True)
finally:
db.close()
def run_batch_classification_task():
from .database import SessionLocal
from .lib.core_utils import call_gemini_flash
import json
db = SessionLocal()
logger.info("--- [BACKGROUND TASK] Starting Batch Job Title Classification ---")
BATCH_SIZE = 50
try:
personas = db.query(Persona).all()
available_roles = [p.name for p in personas]
if not available_roles:
logger.error("No Personas found. Aborting classification task.")
return
unmapped_titles = db.query(RawJobTitle).filter(RawJobTitle.is_mapped == False).all()
if not unmapped_titles:
logger.info("No unmapped titles to process.")
return
logger.info(f"Found {len(unmapped_titles)} unmapped titles. Processing in batches of {BATCH_SIZE}.")
for i in range(0, len(unmapped_titles), BATCH_SIZE):
batch = unmapped_titles[i:i + BATCH_SIZE]
title_strings = [item.title for item in batch]
prompt = f'''You are an expert in B2B contact segmentation. Classify the following job titles into one of the provided roles: {', '.join(available_roles)}. Respond ONLY with a valid JSON object mapping the title to the role. Use "Influencer" as a fallback. Titles: {json.dumps(title_strings)}'''
response_text = ""
try:
response_text = call_gemini_flash(prompt, json_mode=True)
if response_text.strip().startswith("```json"):
response_text = response_text.strip()[7:-4]
classifications = json.loads(response_text)
except Exception as e:
logger.error(f"LLM response error for batch, skipping. Error: {e}. Response: {response_text}")
continue
new_patterns = 0
for title_obj in batch:
original_title = title_obj.title
assigned_role = classifications.get(original_title)
if assigned_role and assigned_role in available_roles:
if not db.query(JobRolePattern).filter(JobRolePattern.pattern_value == original_title).first():
db.add(JobRolePattern(pattern_type='exact', pattern_value=original_title, role=assigned_role, priority=90, created_by='llm_batch'))
new_patterns += 1
title_obj.is_mapped = True
db.commit()
logger.info(f"Batch {i//BATCH_SIZE + 1} complete. Created {new_patterns} new patterns.")
except Exception as e:
logger.critical(f"--- [BACKGROUND TASK] CRITICAL ERROR during classification ---", exc_info=True)
db.rollback()
finally:
db.close()
logger.info("--- [BACKGROUND TASK] Finished Batch Job Title Classification ---")
# --- Serve Frontend ---
static_path = "/frontend_static"
if not os.path.exists(static_path):
static_path = os.path.join(os.path.dirname(__file__), "../../frontend/dist")
if not os.path.exists(static_path):
static_path = os.path.join(os.path.dirname(__file__), "../static")
logger.info(f"Static files path: {static_path} (Exists: {os.path.exists(static_path)})")
if os.path.exists(static_path):
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
index_file = os.path.join(static_path, "index.html")
# Mount assets specifically first
assets_path = os.path.join(static_path, "assets")
if os.path.exists(assets_path):
app.mount("/assets", StaticFiles(directory=assets_path), name="assets")
@app.get("/")
async def serve_index():
return FileResponse(index_file)
# Catch-all for SPA routing (any path not matched by API or assets)
@app.get("/{full_path:path}")
async def spa_fallback(full_path: str):
# Allow API calls to fail naturally with 404
if full_path.startswith("api/"):
raise HTTPException(status_code=404)
# If it's a file that exists, serve it (e.g. favicon, robots.txt)
file_path = os.path.join(static_path, full_path)
if os.path.isfile(file_path):
return FileResponse(file_path)
# Otherwise, serve index.html for SPA routing
return FileResponse(index_file)
else:
@app.get("/")
def root_no_frontend():
return {"message": "Company Explorer API is running, but frontend was not found.", "path_tried": static_path}
if __name__ == "__main__":
import uvicorn
uvicorn.run("backend.app:app", host="0.0.0.0", port=8000, reload=True)