199 lines
7.4 KiB
Python
199 lines
7.4 KiB
Python
import logging
|
|
import json
|
|
from sqlalchemy.orm import Session
|
|
from .ffmpeg_service import FFmpegService
|
|
from .transcription_service import TranscriptionService
|
|
from ..database import Meeting, TranscriptChunk
|
|
from ..config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def parse_time_to_seconds(time_str):
|
|
try:
|
|
parts = time_str.split(':')
|
|
if len(parts) == 2: # MM:SS
|
|
return int(parts[0]) * 60 + int(parts[1])
|
|
elif len(parts) == 3: # HH:MM:SS
|
|
return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
|
|
except:
|
|
return 0
|
|
return 0
|
|
|
|
def clean_json_string(text):
|
|
text = text.strip()
|
|
if text.startswith("```json"):
|
|
text = text[7:]
|
|
elif text.startswith("```"):
|
|
text = text[3:]
|
|
if text.endswith("```"):
|
|
text = text[:-3]
|
|
return text.strip()
|
|
|
|
def process_meeting_task(meeting_id: int, db_session_factory):
|
|
db = db_session_factory()
|
|
meeting = db.query(Meeting).filter(Meeting.id == meeting_id).first()
|
|
if not meeting:
|
|
return
|
|
|
|
try:
|
|
ffmpeg = FFmpegService()
|
|
transcriber = TranscriptionService()
|
|
|
|
# Phase 1: Split
|
|
meeting.status = "SPLITTING"
|
|
db.commit()
|
|
|
|
meeting.duration_seconds = ffmpeg.get_duration(meeting.file_path)
|
|
chunks = ffmpeg.split_audio(meeting.file_path, meeting.id)
|
|
|
|
# Phase 2: Transcribe
|
|
meeting.status = "TRANSCRIBING"
|
|
db.commit()
|
|
|
|
all_text = []
|
|
for i, chunk_path in enumerate(chunks):
|
|
offset = i * settings.CHUNK_DURATION_SEC
|
|
logger.info(f"Processing chunk {i+1}/{len(chunks)} with offset {offset}s")
|
|
|
|
result = transcriber.transcribe_chunk(chunk_path, offset)
|
|
|
|
# Parse JSON and Adjust Timestamps
|
|
json_data = []
|
|
try:
|
|
cleaned_text = clean_json_string(result["raw_text"])
|
|
raw_json = json.loads(cleaned_text)
|
|
|
|
# Check for wrapped structure (e.g. {"items": [...]}) if schema enforced it
|
|
if isinstance(raw_json, dict) and "items" in raw_json:
|
|
raw_json = raw_json["items"] # Extract inner list
|
|
|
|
if isinstance(raw_json, list):
|
|
for entry in raw_json:
|
|
seconds = parse_time_to_seconds(entry.get("time", "00:00"))
|
|
absolute_seconds = seconds + offset
|
|
entry["absolute_seconds"] = absolute_seconds
|
|
|
|
h = int(absolute_seconds // 3600)
|
|
m = int((absolute_seconds % 3600) // 60)
|
|
s = int(absolute_seconds % 60)
|
|
entry["display_time"] = f"{h:02}:{m:02}:{s:02}"
|
|
json_data.append(entry)
|
|
except Exception as e:
|
|
logger.error(f"JSON Parsing failed for chunk {i}: {e}. Raw text start: {result['raw_text'][:100]}")
|
|
|
|
# Save chunk result
|
|
db_chunk = TranscriptChunk(
|
|
meeting_id=meeting.id,
|
|
chunk_index=i,
|
|
raw_text=result["raw_text"],
|
|
json_content=json_data
|
|
)
|
|
db.add(db_chunk)
|
|
all_text.append(result["raw_text"])
|
|
db.commit()
|
|
|
|
# Phase 3: Finalize
|
|
meeting.status = "COMPLETED"
|
|
# Combine summary (first attempt - can be refined later with separate LLM call)
|
|
# meeting.summary = ...
|
|
db.commit()
|
|
logger.info(f"Meeting {meeting.id} processing completed.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing meeting {meeting_id}: {e}", exc_info=True)
|
|
meeting.status = "ERROR"
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
|
|
def retry_meeting_task(meeting_id: int, db_session_factory):
|
|
"""
|
|
Retries transcription using existing chunks on disk.
|
|
Avoids re-splitting the original file.
|
|
"""
|
|
db = db_session_factory()
|
|
meeting = db.query(Meeting).filter(Meeting.id == meeting_id).first()
|
|
if not meeting:
|
|
return
|
|
|
|
try:
|
|
import os
|
|
transcriber = TranscriptionService()
|
|
|
|
# 0. Validate Chunk Directory
|
|
chunk_dir = os.path.join(settings.UPLOAD_DIR, "chunks", str(meeting_id))
|
|
if not os.path.exists(chunk_dir):
|
|
logger.error(f"Chunk directory not found for meeting {meeting_id}")
|
|
meeting.status = "ERROR"
|
|
db.commit()
|
|
return
|
|
|
|
chunks = sorted([os.path.join(chunk_dir, f) for f in os.listdir(chunk_dir) if f.endswith(".mp3")])
|
|
if not chunks:
|
|
logger.error(f"No chunks found for meeting {meeting_id}")
|
|
meeting.status = "ERROR"
|
|
db.commit()
|
|
return
|
|
|
|
# Phase 1: Clear Old Chunks
|
|
meeting.status = "RETRYING"
|
|
db.query(TranscriptChunk).filter(TranscriptChunk.meeting_id == meeting_id).delete()
|
|
db.commit()
|
|
|
|
# Phase 2: Transcribe
|
|
all_text = []
|
|
for i, chunk_path in enumerate(chunks):
|
|
offset = i * settings.CHUNK_DURATION_SEC
|
|
logger.info(f"Retrying chunk {i+1}/{len(chunks)} with offset {offset}s")
|
|
|
|
result = transcriber.transcribe_chunk(chunk_path, offset)
|
|
|
|
# Parse JSON and Adjust Timestamps (Same logic as process_meeting_task)
|
|
json_data = []
|
|
try:
|
|
# With response_schema, raw_text SHOULD be valid JSON directly
|
|
# But let's keep clean_json_string just in case specific models deviate
|
|
cleaned_text = clean_json_string(result["raw_text"])
|
|
raw_json = json.loads(cleaned_text)
|
|
|
|
# Check for wrapped structure (e.g. {"items": [...]}) if schema enforced it
|
|
if isinstance(raw_json, dict) and "items" in raw_json:
|
|
raw_json = raw_json["items"] # Extract inner list
|
|
|
|
if isinstance(raw_json, list):
|
|
for entry in raw_json:
|
|
seconds = parse_time_to_seconds(entry.get("time", "00:00"))
|
|
absolute_seconds = seconds + offset
|
|
entry["absolute_seconds"] = absolute_seconds
|
|
|
|
h = int(absolute_seconds // 3600)
|
|
m = int((absolute_seconds % 3600) // 60)
|
|
s = int(absolute_seconds % 60)
|
|
entry["display_time"] = f"{h:02}:{m:02}:{s:02}"
|
|
json_data.append(entry)
|
|
except Exception as e:
|
|
logger.error(f"JSON Parsing failed for chunk {i}: {e}. Raw: {result['raw_text'][:100]}")
|
|
|
|
# Save chunk result
|
|
db_chunk = TranscriptChunk(
|
|
meeting_id=meeting.id,
|
|
chunk_index=i,
|
|
raw_text=result["raw_text"],
|
|
json_content=json_data
|
|
)
|
|
db.add(db_chunk)
|
|
all_text.append(result["raw_text"])
|
|
db.commit()
|
|
|
|
# Phase 3: Finalize
|
|
meeting.status = "COMPLETED"
|
|
db.commit()
|
|
logger.info(f"Meeting {meeting.id} retry completed.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrying meeting {meeting_id}: {e}", exc_info=True)
|
|
meeting.status = "ERROR"
|
|
db.commit()
|
|
finally:
|
|
db.close()
|