fix(transcription): [2f388f42] finalize and fix AI insights feature

This commit resolves all outstanding issues with the AI Insights feature.

- Corrects the transcript formatting logic in  to properly handle the database JSON structure, ensuring the AI receives the correct context.
- Fixes the Gemini API client by using the correct model name ('gemini-2.0-flash') and the proper client initialization.
- Updates  to securely pass the API key as an environment variable to the container.
- Cleans up the codebase by removing temporary debugging endpoints.
- Adds  script for programmatic updates.
- Updates documentation with troubleshooting insights from the implementation process.
This commit is contained in:
2026-01-26 08:53:13 +00:00
parent 771b06c1bc
commit 9019a801ed
39 changed files with 2254 additions and 80 deletions

View File

@@ -0,0 +1,133 @@
import os
import logging
import random
import time
from functools import wraps
# --- KI UMSCHALTUNG: Google Generative AI (Dual Support) ---
# This is a simplified, self-contained version for the transcription tool.
HAS_NEW_GENAI = False
HAS_OLD_GENAI = False
# 1. New library (google-genai)
try:
from google import genai
from google.genai import types
HAS_NEW_GENAI = True
logging.info("Library 'google.genai' (v1.0+) loaded.")
except ImportError:
logging.warning("Library 'google.genai' not found. Trying fallback.")
# 2. Old library (google-generativeai)
try:
import google.generativeai as old_genai
HAS_OLD_GENAI = True
logging.info("Library 'google.generativeai' (Legacy) loaded.")
except ImportError:
logging.warning("Library 'google.generativeai' not found.")
HAS_GEMINI = HAS_NEW_GENAI or HAS_OLD_GENAI
# A simple retry decorator, as the global one is not available
def retry_on_failure(func):
@wraps(func)
def wrapper(*args, **kwargs):
max_retries = 3
base_delay = 5
for attempt in range(max_retries):
try:
if attempt > 0:
logging.warning(f"Retrying attempt {attempt + 1}/{max_retries} for '{func.__name__}'...")
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
else:
raise e
return wrapper
def _get_gemini_api_key():
"""Gets the Gemini API key from environment variables."""
api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("GEMINI_API_KEY or OPENAI_API_KEY environment variable not set.")
return api_key
@retry_on_failure
def call_gemini_flash(prompt: str, system_instruction: str = None, temperature: float = 0.3, json_mode: bool = False):
"""
Calls the Gemini Flash model to generate text content.
This is a focused, local version of the function.
"""
logger = logging.getLogger(__name__)
api_key = _get_gemini_api_key()
if not HAS_GEMINI:
raise ImportError("No Google Generative AI library is available (google-genai or google-generativeai).")
# The legacy library was noted as preferred in the original helpers.py
if HAS_OLD_GENAI:
try:
old_genai.configure(api_key=api_key)
generation_config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
generation_config["response_mime_type"] = "application/json"
model = old_genai.GenerativeModel(
model_name="gemini-1.5-flash", # Using 1.5 as it's the modern standard
generation_config=generation_config,
system_instruction=system_instruction
)
response = model.generate_content([prompt])
return response.text.strip()
except Exception as e:
logger.error(f"Error with legacy GenAI Lib: {e}")
if not HAS_NEW_GENAI: raise e
# Fallthrough to new lib if legacy fails
# Fallback to the new library
if HAS_NEW_GENAI:
try:
# CORRECT: Use the Client-based API for the new library
client = genai.Client(api_key=api_key)
config = {
"temperature": temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
if json_mode:
config["response_mime_type"] = "application/json"
# Construct the contents list, including the system instruction if provided
contents = []
if system_instruction:
# Note: The new API doesn't have a direct 'system_instruction' parameter
# in generate_content. It's typically passed as the first message.
# This is an adaptation. For a more robust solution, one would
# structure prompts with roles.
contents.append({'role': 'system', 'parts': [{'text': system_instruction}]})
contents.append({'role': 'user', 'parts': [{'text': prompt}]})
# Use the client to generate content
response = client.models.generate_content(
model="models/gemini-2.0-flash-001", # CORRECTED: Using the project's standard model
contents=contents,
config=config
)
return response.text.strip()
except Exception as e:
logger.error(f"Error with new GenAI Lib: {e}")
raise e
raise RuntimeError("Both Gemini libraries failed or are unavailable.")

View File

@@ -4,36 +4,56 @@ from sqlalchemy.orm import Session
from .. import database
from .. import prompt_library
# Add project root to path to allow importing from 'helpers'
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')))
from helpers import call_gemini_flash
import logging
from sqlalchemy.orm import Session
from .. import database
from .. import prompt_library
from ..lib.gemini_client import call_gemini_flash
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _format_transcript(chunks: list[database.TranscriptChunk]) -> str:
"""
Formats the transcript chunks into a single, human-readable string.
Example: "[00:00:01] Speaker A: Hello world."
Formats the transcript chunks into a single, human-readable string,
sorted chronologically using the absolute_seconds timestamp.
"""
full_transcript = []
# Sort chunks by their index to ensure correct order
sorted_chunks = sorted(chunks, key=lambda c: c.chunk_index)
for chunk in sorted_chunks:
all_messages = []
for chunk in chunks:
if not chunk.json_content:
continue
for item in chunk.json_content:
# json_content can be a list of dicts
# The content can be a list of dicts, or sometimes a list containing a list of dicts
content_list = chunk.json_content
if content_list and isinstance(content_list[0], list):
content_list = content_list[0]
for item in content_list:
if isinstance(item, dict):
speaker = item.get('speaker', 'Unknown')
start_time = item.get('start', 0)
text = item.get('line', '')
all_messages.append(item)
# Format timestamp from seconds to HH:MM:SS
hours, remainder = divmod(int(start_time), 3600)
minutes, seconds = divmod(remainder, 60)
timestamp = f"{hours:02}:{minutes:02}:{seconds:02}"
# Sort all messages from all chunks chronologically
# Use a default of 0 for absolute_seconds if the key is missing
sorted_messages = sorted(all_messages, key=lambda msg: msg.get('absolute_seconds', 0))
full_transcript.append(f"[{timestamp}] {speaker}: {text}")
full_transcript = []
for msg in sorted_messages:
speaker = msg.get('speaker', 'Unknown')
text = msg.get('text', '') # Changed from 'line' to 'text' to match the JSON
# Use the reliable absolute_seconds for timestamp calculation
absolute_seconds = msg.get('absolute_seconds', 0)
try:
time_in_seconds = float(absolute_seconds)
hours, remainder = divmod(int(time_in_seconds), 3600)
minutes, seconds = divmod(remainder, 60)
timestamp = f"{hours:02}:{minutes:02}:{seconds:02}"
except (ValueError, TypeError):
timestamp = "00:00:00"
full_transcript.append(f"[{timestamp}] {speaker}: {text}")
return "\n".join(full_transcript)
@@ -62,7 +82,10 @@ def generate_insight(db: Session, meeting_id: int, insight_type: str) -> databas
).first()
if existing_insight:
return existing_insight
# Before returning, let's delete it so user can regenerate
db.delete(existing_insight)
db.commit()
# 2. Get the meeting and its transcript
meeting = db.query(database.Meeting).filter(database.Meeting.id == meeting_id).first()
@@ -74,14 +97,15 @@ def generate_insight(db: Session, meeting_id: int, insight_type: str) -> databas
# 3. Format the transcript and select the prompt
transcript_text = _format_transcript(meeting.chunks)
if not transcript_text.strip():
raise ValueError(f"Transcript for meeting {meeting_id} is empty.")
# This can happen if all chunks are empty or malformed
raise ValueError(f"Formatted transcript for meeting {meeting_id} is empty or could not be processed.")
prompt_template = get_prompt_by_type(insight_type)
final_prompt = prompt_template.format(transcript_text=transcript_text)
# 4. Call the AI model
# Update meeting status
meeting.status = "ANALYZING"
db.commit()
@@ -105,6 +129,5 @@ def generate_insight(db: Session, meeting_id: int, insight_type: str) -> databas
except Exception as e:
meeting.status = "ERROR"
db.commit()
# Log the error properly in a real application
print(f"Error generating insight for meeting {meeting_id}: {e}")
logger.error(f"Error generating insight for meeting {meeting_id}: {e}")
raise