feat(transcription): add meeting assistant micro-service v0.1.0

- Added FastAPI backend with FFmpeg and Gemini 2.0 integration
- Added React frontend with upload and meeting list
- Integrated into main docker-compose stack and dashboard
This commit is contained in:
2026-01-24 16:34:01 +00:00
parent b16babb032
commit 0858df6f25
25 changed files with 721 additions and 2 deletions

View File

@@ -0,0 +1,49 @@
import subprocess
import os
import logging
from ..config import settings
logger = logging.getLogger(__name__)
class FFmpegService:
def split_audio(self, input_path: str, meeting_id: int) -> list:
"""
Splits audio into 30min chunks using ffmpeg segment muxer.
Returns a list of paths to the created chunks.
"""
output_dir = os.path.join(settings.UPLOAD_DIR, "chunks", str(meeting_id))
os.makedirs(output_dir, exist_ok=True)
output_pattern = os.path.join(output_dir, "chunk_%03d.mp3")
# ffmpeg -i input.mp3 -f segment -segment_time 1800 -c copy chunk_%03d.mp3
cmd = [
"ffmpeg", "-i", input_path,
"-f", "segment",
"-segment_time", str(settings.CHUNK_DURATION_SEC),
"-c", "copy",
output_pattern
]
logger.info(f"Splitting {input_path} into segments...")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"FFmpeg Error: {result.stderr}")
raise Exception("Failed to split audio file.")
chunks = sorted([os.path.join(output_dir, f) for f in os.listdir(output_dir) if f.endswith(".mp3")])
logger.info(f"Created {len(chunks)} chunks.")
return chunks
def get_duration(self, input_path: str) -> float:
"""Gets duration of audio file in seconds."""
cmd = [
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", input_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
try:
return float(result.stdout.strip())
except:
return 0.0

View File

@@ -0,0 +1,60 @@
import logging
from sqlalchemy.orm import Session
from .ffmpeg_service import FFmpegService
from .transcription_service import TranscriptionService
from ..database import Meeting, TranscriptChunk
from ..config import settings
logger = logging.getLogger(__name__)
def process_meeting_task(meeting_id: int, db_session_factory):
db = db_session_factory()
meeting = db.query(Meeting).filter(Meeting.id == meeting_id).first()
if not meeting:
return
try:
ffmpeg = FFmpegService()
transcriber = TranscriptionService()
# Phase 1: Split
meeting.status = "SPLITTING"
db.commit()
meeting.duration_seconds = ffmpeg.get_duration(meeting.file_path)
chunks = ffmpeg.split_audio(meeting.file_path, meeting.id)
# Phase 2: Transcribe
meeting.status = "TRANSCRIBING"
db.commit()
all_text = []
for i, chunk_path in enumerate(chunks):
offset = i * settings.CHUNK_DURATION_SEC
logger.info(f"Processing chunk {i+1}/{len(chunks)} with offset {offset}s")
result = transcriber.transcribe_chunk(chunk_path, offset)
# Save chunk result
db_chunk = TranscriptChunk(
meeting_id=meeting.id,
chunk_index=i,
raw_text=result["raw_text"]
)
db.add(db_chunk)
all_text.append(result["raw_text"])
db.commit()
# Phase 3: Finalize
meeting.status = "COMPLETED"
# Combine summary (first attempt - can be refined later with separate LLM call)
# meeting.summary = ...
db.commit()
logger.info(f"Meeting {meeting.id} processing completed.")
except Exception as e:
logger.error(f"Error processing meeting {meeting_id}: {e}", exc_info=True)
meeting.status = "ERROR"
db.commit()
finally:
db.close()

View File

@@ -0,0 +1,58 @@
import os
import time
import logging
from google import genai
from google.genai import types
from ..config import settings
logger = logging.getLogger(__name__)
class TranscriptionService:
def __init__(self):
if not settings.GEMINI_API_KEY:
raise Exception("Gemini API Key missing.")
self.client = genai.Client(api_key=settings.GEMINI_API_KEY)
def transcribe_chunk(self, file_path: str, offset_seconds: int = 0) -> dict:
"""
Uploads a chunk to Gemini and returns the transcription with timestamps.
"""
logger.info(f"Uploading chunk {file_path} to Gemini...")
# 1. Upload file
media_file = self.client.files.upload(path=file_path)
# 2. Wait for processing (usually fast for audio)
while media_file.state == "PROCESSING":
time.sleep(2)
media_file = self.client.files.get(name=media_file.name)
if media_file.state == "FAILED":
raise Exception("File processing failed at Gemini.")
# 3. Transcribe with Diarization and Timestamps
prompt = """
Transkribiere dieses Audio wortgetreu.
Identifiziere die Sprecher (Sprecher A, Sprecher B, etc.).
Gib das Ergebnis als strukturierte Liste mit Timestamps aus.
Wichtig: Das Audio ist ein Teil eines größeren Gesprächs.
Antworte NUR mit dem Transkript im Format:
[MM:SS] Sprecher X: Text
"""
logger.info(f"Generating transcription for {file_path}...")
response = self.client.models.generate_content(
model="gemini-2.0-flash",
contents=[media_file, prompt],
config=types.GenerateContentConfig(
temperature=0.1, # Low temp for accuracy
)
)
# Cleanup: Delete file from Gemini storage
self.client.files.delete(name=media_file.name)
return {
"raw_text": response.text,
"offset": offset_seconds
}