Files
Brancheneinstufung2/transcription-tool/backend/app.py
Floke 771b06c1bc feat(transcription): [2f388f42] integrate prompt database and AI insights
Implements the core functionality for the AI-powered analysis of meeting transcripts in the Transcription Tool.

This commit introduces a new 'AI Insights' feature that allows users to generate various summaries and analyses from a transcript on demand.

- Creates a  to manage and version different AI prompts for tasks like generating meeting minutes, extracting action items, and creating sales summaries.
- Adds a new  responsible for orchestrating the analysis process: fetching the transcript, calling the Gemini API with the appropriate prompt, and caching the results in the database.
- Extends the FastAPI backend with a new endpoint  to trigger the insight generation.
- Updates the React frontend () with a new 'AI Insights' panel, including buttons to trigger the analyses and a modal to display the results.
- Updates the documentation () to reflect the new features, API endpoints, and version.
2026-01-26 07:43:24 +00:00

193 lines
6.1 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, UploadFile, File, BackgroundTasks, Body
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session, joinedload
from typing import List, Dict, Any
import os
import shutil
import uuid
from datetime import datetime
from .config import settings
from .database import init_db, get_db, Meeting, TranscriptChunk, AnalysisResult, SessionLocal
from .services.orchestrator import process_meeting_task
from .services.insights_service import generate_insight
# Initialize FastAPI App
app = FastAPI(
title=settings.APP_NAME,
version=settings.VERSION,
root_path="/tr"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
def startup_event():
init_db()
@app.get("/api/health")
def health():
return {"status": "ok", "version": settings.VERSION}
@app.get("/api/meetings")
def list_meetings(db: Session = Depends(get_db)):
return db.query(Meeting).order_by(Meeting.created_at.desc()).all()
@app.get("/api/meetings/{meeting_id}")
def get_meeting(meeting_id: int, db: Session = Depends(get_db)):
meeting = db.query(Meeting).options(
joinedload(Meeting.chunks),
joinedload(Meeting.analysis_results) # Eager load analysis results
).filter(Meeting.id == meeting_id).first()
if not meeting:
raise HTTPException(404, detail="Meeting not found")
# Sort chunks by index
meeting.chunks.sort(key=lambda x: x.chunk_index)
return meeting
@app.put("/api/chunks/{chunk_id}")
def update_chunk(chunk_id: int, payload: Dict[str, Any] = Body(...), db: Session = Depends(get_db)):
chunk = db.query(TranscriptChunk).filter(TranscriptChunk.id == chunk_id).first()
if not chunk:
raise HTTPException(404, detail="Chunk not found")
# Update JSON content (e.g. after editing/deleting lines)
if "json_content" in payload:
chunk.json_content = payload["json_content"]
db.commit()
return {"status": "updated"}
@app.post("/api/upload")
async def upload_audio(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
db: Session = Depends(get_db)
):
# 1. Save File
file_id = str(uuid.uuid4())
ext = os.path.splitext(file.filename)[1]
filename = f"{file_id}{ext}"
file_path = os.path.join(settings.UPLOAD_DIR, filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 2. Create DB Entry
meeting = Meeting(
title=file.filename,
filename=filename,
file_path=file_path,
status="UPLOADED"
)
db.add(meeting)
db.commit()
db.refresh(meeting)
# 3. Trigger Processing in Background
background_tasks.add_task(process_meeting_task, meeting.id, SessionLocal)
return meeting
from pydantic import BaseModel
class InsightRequest(BaseModel):
insight_type: str
@app.post("/api/meetings/{meeting_id}/insights")
def create_insight(meeting_id: int, payload: InsightRequest, db: Session = Depends(get_db)):
"""
Triggers the generation of a specific insight (e.g., meeting minutes, action items).
If the insight already exists, it returns the stored result.
Otherwise, it generates, stores, and returns the new insight.
"""
try:
insight = generate_insight(db, meeting_id, payload.insight_type)
return insight
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
# For unexpected errors, return a generic 500 error
print(f"ERROR: Unexpected error in create_insight: {e}")
raise HTTPException(status_code=500, detail="An internal error occurred while generating the insight.")
class RenameRequest(BaseModel):
old_name: str
new_name: str
@app.post("/api/meetings/{meeting_id}/rename_speaker")
def rename_speaker_globally(meeting_id: int, payload: RenameRequest, db: Session = Depends(get_db)):
meeting = db.query(Meeting).filter(Meeting.id == meeting_id).first()
if not meeting:
raise HTTPException(404, detail="Meeting not found")
count = 0
# Iterate over all chunks directly from DB relation
for chunk in meeting.chunks:
if not chunk.json_content:
continue
modified = False
new_content = []
for msg in chunk.json_content:
if msg.get("speaker") == payload.old_name:
msg["speaker"] = payload.new_name
modified = True
count += 1
new_content.append(msg)
if modified:
# Force update of the JSON field
chunk.json_content = list(new_content)
db.add(chunk)
db.commit()
return {"status": "updated", "rows_affected": count}
@app.delete("/api/meetings/{meeting_id}")
def delete_meeting(meeting_id: int, db: Session = Depends(get_db)):
meeting = db.query(Meeting).filter(Meeting.id == meeting_id).first()
if not meeting:
raise HTTPException(404, detail="Meeting not found")
# 1. Delete Files
try:
if os.path.exists(meeting.file_path):
os.remove(meeting.file_path)
# Delete chunks dir
chunk_dir = os.path.join(settings.UPLOAD_DIR, "chunks", str(meeting_id))
if os.path.exists(chunk_dir):
shutil.rmtree(chunk_dir)
except Exception as e:
print(f"Error deleting files: {e}")
# 2. Delete DB Entry (Cascade deletes chunks/analyses)
db.delete(meeting)
db.commit()
return {"status": "deleted"}
# Serve Frontend
# This must be the last route definition to avoid catching API routes
static_path = "/frontend_static"
if not os.path.exists(static_path):
# Fallback for local development if not in Docker
static_path = os.path.join(os.path.dirname(__file__), "../frontend/dist")
if os.path.exists(static_path):
app.mount("/", StaticFiles(directory=static_path, html=True), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run("backend.app:app", host="0.0.0.0", port=8001, reload=True)