diff --git a/CLAUDE.md b/CLAUDE.md index 1e55c229..b16d1e8c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -86,130 +86,54 @@ cp .env.template .env # Configure environment variables sudo rm -rf backends/advanced/data/ ``` -### Testing Infrastructure +### Running Tests -#### Local Test Scripts -The project includes simplified test scripts that mirror CI workflows: +#### Quick Commands +All test operations are managed through a simple Makefile interface: -```bash -# Run all tests from project root -./run-test.sh [advanced-backend|speaker-recognition|all] - -# Advanced backend tests only -./run-test.sh advanced-backend - -# Speaker recognition tests only -./run-test.sh speaker-recognition - -# Run all test suites (default) -./run-test.sh all -``` - -#### Advanced Backend Integration Tests - -**Three Test Execution Modes:** - -1. **No-API Tests** (Fast, No External Dependencies) ```bash cd tests -# Run tests without API keys (excludes requires-api-keys tag) -./run-no-api-tests.sh +# Full test workflow (recommended) +make test # Start containers + run all tests -# ~70% of test suite -# Uses mock services config -# No DEEPGRAM_API_KEY or OPENAI_API_KEY required -# Fast feedback (~10-15 minutes) -``` +# Or step by step +make start # Start test containers (with health checks) +make test-all # Run all test suites +make stop # Stop containers (preserves volumes) -2. **Full Tests with API Keys** (Comprehensive) -```bash -cd tests +# Run specific test suites +make test-endpoints # API endpoint tests (~40 tests, fast) +make test-integration # End-to-end workflows (~15 tests, slower) +make test-infra # Infrastructure resilience (~5 tests) -# Requires .env file with DEEPGRAM_API_KEY and OPENAI_API_KEY -cp setup/.env.test.template setup/.env.test # Configure API keys +# Quick iteration (reuse existing containers) +make test-quick # Run tests without restarting containers +``` -# Run full integration test suite (100% of tests) -./run-robot-tests.sh +#### Container Management +All container operations automatically preserve logs before cleanup: -# Leave test containers running for debugging -CLEANUP_CONTAINERS=false ./run-robot-tests.sh +```bash +make start # Start test containers +make stop # Stop containers (keep volumes) +make restart # Restart without rebuild +make rebuild # Rebuild images + restart (for code changes) +make containers-clean # SAVES LOGS → removes everything +make status # Show container health +make logs SERVICE= # View specific service logs ``` -3. **API-Only Tests** (Optional) -```bash -cd tests +**Log Preservation:** All cleanup operations save container logs to `tests/logs/YYYY-MM-DD_HH-MM-SS/` -# Run only tests that require API keys -./run-api-tests.sh +#### Test Environment -# ~30% of test suite -# Only E2E tests with transcription/memory extraction -``` +Test services use isolated ports and database: +- **Ports:** Backend (8001), MongoDB (27018), Redis (6380), Qdrant (6337/6338) +- **Database:** `test_db` (separate from production) +- **Credentials:** `test-admin@example.com` / `test-admin-password-123` -#### Test Separation by API Requirements - -Tests are separated into two categories: - -- **No API Keys Required** (~70%): Endpoint tests, infrastructure tests, basic integration - - Uses `configs/mock-services.yml` - - Runs on all PRs by default - - Fast CI feedback - -- **API Keys Required** (~30%): Full E2E tests with transcription and memory extraction - - Uses `configs/deepgram-openai.yml` - - Tagged with `requires-api-keys` - - Runs on dev/main branches or when PR labeled with `test-with-api-keys` - -#### Test Configuration Flags -- **CLEANUP_CONTAINERS** (default: false): Automatically stop and remove test containers after test completion - - Set to `true` for cleanup: `CLEANUP_CONTAINERS=true ./run-robot-tests.sh` -- **CONFIG_FILE**: Choose test configuration - - `configs/mock-services.yml` - No API keys (default for run-no-api-tests.sh) - - `configs/deepgram-openai.yml` - With API keys (default for run-robot-tests.sh) - - `configs/parakeet-ollama.yml` - Fully local (no external APIs) - -#### Test Environment Variables -Tests use isolated test environment with overridden credentials: -- **Test Database**: `test_db` (MongoDB on port 27018, separate from production) -- **Test Ports**: Backend (8001), Qdrant (6337/6338), WebUI (3001) -- **Test Credentials**: - - `AUTH_SECRET_KEY`: test-jwt-signing-key-for-integration-tests - - `ADMIN_EMAIL`: test-admin@example.com - - `ADMIN_PASSWORD`: test-admin-password-123 -- **API Keys**: Loaded from `.env` file (DEEPGRAM_API_KEY, OPENAI_API_KEY) -- **Test Settings**: `DISABLE_SPEAKER_RECOGNITION=true` to prevent segment duplication - -#### Test Script Features -- **Environment Compatibility**: Works with both local .env files and CI environment variables -- **Isolated Test Environment**: Separate ports and database prevent conflicts with running services -- **Automatic Cleanup**: Configurable via CLEANUP_CONTAINERS flag (default: false for faster re-runs) -- **Colored Output**: Clear progress indicators and error reporting -- **Timeout Protection**: 30-minute timeout for test execution -- **Fresh Testing**: Clean database and containers for each test run -- **API Key Separation**: Ability to run tests with or without external API dependencies - -#### GitHub Workflows - -**Three workflows handle test execution:** - -1. **`robot-tests.yml`** - PR Tests (No API Keys) - - Triggers: All pull requests - - Execution: Excludes `requires-api-keys` tests (~70% of suite) - - No secrets required - - Fast feedback for contributors - -2. **`full-tests-with-api.yml`** - Dev/Main Tests (Full Suite) - - Triggers: Push to dev/main branches - - Execution: All tests including API-dependent (~100% of suite) - - Requires: DEEPGRAM_API_KEY, OPENAI_API_KEY - - Comprehensive validation before deployment - -3. **`pr-tests-with-api.yml`** - Label-Triggered PR Tests - - Triggers: PR with `test-with-api-keys` label - - Execution: Full test suite before merge - - Requires: DEEPGRAM_API_KEY, OPENAI_API_KEY - - Useful for testing API integration changes +**For complete test documentation, see `tests/README.md`** ### Mobile App Development ```bash @@ -571,12 +495,11 @@ tailscale ip -4 - **Docker**: Primary deployment method with docker-compose ### Testing Strategy -- **Local Test Scripts**: Simplified scripts (`./run-test.sh`) mirror CI workflows for local development -- **End-to-End Integration**: Robot Framework tests (`tests/integration/integration_test.robot`) validate complete audio processing pipeline -- **Speaker Recognition Tests**: `test_speaker_service_integration.py` validates speaker identification +- **Makefile-Based**: All test operations through simple `make` commands (`make test`, `make start`, `make stop`) +- **Log Preservation**: Container logs always saved before cleanup (never lose debugging info) +- **End-to-End Integration**: Robot Framework validates complete audio processing pipeline - **Environment Flexibility**: Tests work with both local .env files and CI environment variables -- **Automated Cleanup**: Test containers are automatically removed after execution -- **CI/CD Integration**: GitHub Actions use the same local test scripts for consistency +- **CI/CD Integration**: Same test logic locally and in GitHub Actions ### Code Style - **Python**: Black formatter with 100-character line length, isort for imports @@ -603,14 +526,10 @@ The system includes comprehensive health checks: - Memory debug system for transcript processing monitoring ### Integration Test Infrastructure -- **Unified Test Scripts**: Local `./run-test.sh` scripts mirror GitHub Actions workflows -- **Test Environment**: `docker-compose-test.yml` provides isolated services on separate ports -- **Test Database**: Uses `test_db` database with isolated collections -- **Service Ports**: Backend (8001), MongoDB (27018), Qdrant (6335/6336), WebUI (5174) -- **Test Credentials**: Auto-generated `.env.test` files with secure test configurations -- **Ground Truth**: Expected transcript established via `scripts/test_deepgram_direct.py` -- **AI Validation**: OpenAI-powered transcript similarity comparison -- **Test Audio**: 4-minute glass blowing tutorial (`extras/test-audios/DIY*mono*.wav`) +- **Makefile Interface**: Simple `make` commands for all operations (see `tests/README.md`) +- **Test Environment**: `docker-compose-test.yml` with isolated services on separate ports +- **Test Database**: Uses `test_db` database (separate from production) +- **Log Preservation**: All cleanup operations save logs to `tests/logs/` automatically - **CI Compatibility**: Same test logic runs locally and in GitHub Actions ### Cursor Rule Integration diff --git a/backends/advanced/.env.template b/backends/advanced/.env.template index 752f140c..88617688 100644 --- a/backends/advanced/.env.template +++ b/backends/advanced/.env.template @@ -1,263 +1,62 @@ # ======================================== -# GETTING STARTED +# Chronicle Backend - Secrets Only # ======================================== +# This file contains ONLY secret values (API keys, passwords, tokens). +# All other configuration is in config/config.yml. +# +# Setup: # 1. Copy this file to .env: cp .env.template .env -# 2. Fill in your API keys below (at minimum: DEEPGRAM_API_KEY, OPENAI_API_KEY) -# 3. Run: docker compose up --build -d -# 4. For testing: ./run-test.sh (requires API keys to be set) - -# This key is used to sign your JWT token, just make it random and long -AUTH_SECRET_KEY= - -# This is the password for the admin user -ADMIN_PASSWORD= - -# Admin email (defaults to admin@example.com if not set) -ADMIN_EMAIL=admin@example.com +# 2. Fill in your API keys and secrets below +# 3. Configure non-secret settings in config/config.yml +# 4. Run: docker compose up --build -d # ======================================== -# LLM CONFIGURATION (Standard) +# Authentication Secrets # ======================================== -# LLM Provider: "openai" or "ollama" (default: openai) -LLM_PROVIDER=openai +# JWT signing key (generate a long random string) +AUTH_SECRET_KEY= -# OpenAI or OpenAI-compatible API configuration -OPENAI_API_KEY=your-openai-key-here -OPENAI_BASE_URL=https://api.openai.com/v1 -OPENAI_MODEL=gpt-4o-mini +# Admin account password +ADMIN_PASSWORD= -# For Ollama (OpenAI-compatible mode): -# LLM_PROVIDER=ollama -# OLLAMA_BASE_URL=dummy -# OLLAMA_BASE_URL=http://ollama:11434/v1 -# OLLAMA_MODEL=llama3.1:latest -# OLLAMA_EMBEDDER_MODEL=nomic-embed-text:latest +# Admin email address +ADMIN_EMAIL=admin@example.com # ======================================== -# CHAT INTERFACE CONFIGURATION (Optional) +# LLM API Keys # ======================================== -# Chat-specific LLM model (defaults to OPENAI_MODEL if not set) -# CHAT_LLM_MODEL=gpt-4o-mini - -# Chat temperature for more conversational responses (defaults to 0.7) -# CHAT_TEMPERATURE=0.7 +# OpenAI API key (or OpenAI-compatible provider) +OPENAI_API_KEY= # ======================================== -# SPEECH-TO-TEXT CONFIGURATION (API Keys Only) +# Transcription API Keys # ======================================== -# Provider selection is in config.yml (defaults.stt) -# Deepgram (cloud-based, recommended) +# Deepgram API key (for cloud-based transcription) DEEPGRAM_API_KEY= -# Note: Parakeet ASR URL configured in config.yml - -# ======================================== -# SPEECH DETECTION CONFIGURATION -# ======================================== - -# Speech detection settings for conversation creation (speech-driven architecture) -# Only meaningful speech creates conversations - silence/noise is filtered out - -# Minimum words required to create a conversation (default: 5) -SPEECH_DETECTION_MIN_WORDS=5 - -# Minimum word confidence threshold (0.0-1.0, default: 0.5) -# Used for both conversation creation and speech gap analysis -SPEECH_DETECTION_MIN_CONFIDENCE=0.5 - -# Batch transcription monitoring (for batch providers like Parakeet) -TRANSCRIPTION_BUFFER_SECONDS=120 # Trigger transcription every N seconds - -# Auto-stop thresholds -SPEECH_INACTIVITY_THRESHOLD_SECONDS=60 # Close conversation after N seconds of no speech - -# Speaker enrollment filter (default: false) -# When enabled, only creates conversations when enrolled speakers are detected -# Requires speaker recognition service to be running and speakers to be enrolled -# Set to "true" to enable, "false" or omit to disable -RECORD_ONLY_ENROLLED_SPEAKERS=false - -# ======================================== -# DATABASE CONFIGURATION -# ======================================== - -# MongoDB for conversations and user data (defaults to mongodb://mongo:27017) -MONGODB_URI=mongodb://mongo:27017 - -# MongoDB database name (new installations use 'chronicle', legacy installations use 'friend-lite') -MONGODB_DATABASE=chronicle - -# Qdrant for vector memory storage (defaults to qdrant) -QDRANT_BASE_URL=qdrant - - # ======================================== -# MEMORY PROVIDER CONFIGURATION +# Speaker Recognition # ======================================== -# Memory Provider: "chronicle" (default), "openmemory_mcp", or "mycelia" -# -# Chronicle (default): In-house memory system with full control -# - Custom LLM-powered extraction with individual fact storage -# - Smart deduplication and memory updates (ADD/UPDATE/DELETE) -# - Direct Qdrant vector storage -# - No external dependencies -# -# OpenMemory MCP: Delegates to external OpenMemory MCP server -# - Professional memory processing with cross-client compatibility -# - Works with Claude Desktop, Cursor, Windsurf, etc. -# - Web UI at http://localhost:8765 -# - Requires external server setup -# -# Mycelia: Full-featured personal memory timeline -# - Voice, screenshots, and text capture -# - Timeline UI with waveform playback -# - Conversation extraction and semantic search -# - OAuth federation for cross-instance sharing -# - Requires Mycelia server setup (extras/mycelia) -# -# See MEMORY_PROVIDERS.md for detailed comparison -MEMORY_PROVIDER=chronicle - -# ---------------------------------------- -# OpenMemory MCP Configuration -# (Only needed if MEMORY_PROVIDER=openmemory_mcp) -# ---------------------------------------- -# First start the external server: -# cd extras/openmemory-mcp && docker compose up -d -# -# OPENMEMORY_MCP_URL=http://host.docker.internal:8765 -# OPENMEMORY_CLIENT_NAME=chronicle -# OPENMEMORY_USER_ID=openmemory -# OPENMEMORY_TIMEOUT=30 - -# ---------------------------------------- -# Mycelia Configuration -# (Only needed if MEMORY_PROVIDER=mycelia) -# ---------------------------------------- -# First start Mycelia: -# cd extras/mycelia && docker compose up -d redis mongo mongo-search -# cd extras/mycelia/backend && deno task dev -# -# IMPORTANT: JWT_SECRET in Mycelia backend/.env must match AUTH_SECRET_KEY above -# MYCELIA_URL=http://host.docker.internal:5173 -# MYCELIA_DB=mycelia # Database name (use mycelia_test for test environment) -# MYCELIA_TIMEOUT=30 +# Hugging Face token (for PyAnnote speaker recognition models) +HF_TOKEN= # ======================================== -# OPTIONAL FEATURES +# Optional Services # ======================================== -NEO4J_HOST=neo4j-mem0 -NEO4J_USER=neo4j +# Neo4j password (if using Neo4j for graph memory) NEO4J_PASSWORD= -# Debug directory for troubleshooting -DEBUG_DIR=./data/debug_dir - -# Ngrok for external access (if using ngrok from docker-compose) -# NGROK_AUTHTOKEN= - -# Speaker recognition service -# HF_TOKEN= -# SPEAKER_SERVICE_URL=http://speaker-recognition:8001 - -# Speaker recognition chunking configuration (for large files) -# Files longer than SPEAKER_CHUNK_THRESHOLD will be split into smaller segments -# for processing to avoid memory issues -SPEAKER_CHUNK_THRESHOLD=1500 # 25 minutes - chunk files larger than this (seconds) -SPEAKER_CHUNK_SIZE=900 # 15 minutes - size of each chunk (seconds) -SPEAKER_CHUNK_OVERLAP=30 # 30 seconds - overlap between chunks for continuity - -# Audio processing settings -# NEW_CONVERSATION_TIMEOUT_MINUTES=1.5 -# AUDIO_CROPPING_ENABLED=true -# MIN_SPEECH_SEGMENT_DURATION=1.0 -# CROPPING_CONTEXT_PADDING=0.1 - -# ======================================== -# SPEECH-DRIVEN CONVERSATIONS CONFIGURATION -# ======================================== - -# Note: File rotation for long sessions is not yet implemented -# Audio sessions currently create single files that grow until the session ends - - -# ======================================== -# PUBLIC ACCESS CONFIGURATION -# ======================================== -# These settings control how the browser accesses the backend for audio playback - -# The IP address or hostname where your backend is publicly accessible from the browser -# Examples: -# - For local development: localhost or 127.0.0.1 -# - For LAN access: your machine's IP (e.g., 192.168.1.100) -# - For VPN/Tailscale access: your VPN IP (e.g., 100.64.x.x for Tailscale) -# - For internet access: your domain or public IP (e.g., friend.example.com) -# Note: This must be accessible from your browser, not from the Docker container -HOST_IP=localhost +# Langfuse API keys (for LLM observability) +LANGFUSE_PUBLIC_KEY= +LANGFUSE_SECRET_KEY= -# Backend API port (where audio files are served) -BACKEND_PUBLIC_PORT=8000 +# Tailscale auth key (for remote service access) +TS_AUTHKEY= -# WebUI port (defaults to 5173 for Vite dev server) -WEBUI_PORT=5173 - -# CORS origins (comma-separated list of allowed origins for browser requests) -# Note: Tailscale IPs (100.x.x.x) are automatically supported via regex -# For HTTPS access, add HTTPS origins after running ./init.sh -# Examples: -# - Local HTTP: http://localhost:5173,http://127.0.0.1:5173 -# - Local HTTPS: https://localhost,https://127.0.0.1 -# - Tailscale HTTPS: https://100.x.x.x -# - Custom: http://192.168.1.100:5173,https://192.168.1.100 -CORS_ORIGINS=http://localhost:5173,http://localhost:3000,http://127.0.0.1:5173,http://127.0.0.1:3000 - -# Memory settings -# MEM0_TELEMETRY=False - -# Langfuse settings -LANGFUSE_PUBLIC_KEY="" -LANGFUSE_SECRET_KEY="" -LANGFUSE_HOST="http://x.x.x.x:3002" -LANGFUSE_ENABLE_TELEMETRY=False - -# ======================================== -# TAILSCALE CONFIGURATION (Optional) -# ======================================== -# Required for accessing remote services on Tailscale network (e.g., Home Assistant plugin) -# -# To enable Tailscale Docker integration: -# 1. Get auth key from: https://login.tailscale.com/admin/settings/keys -# 2. Set TS_AUTHKEY below -# 3. Start Tailscale: docker compose --profile tailscale up -d -# -# The Tailscale container provides proxy access to remote services at: -# http://host.docker.internal:18123 (proxies to Home Assistant on Tailscale) -# -TS_AUTHKEY=your-tailscale-auth-key-here - -# ======================================== -# HOME ASSISTANT PLUGIN (Optional) -# ======================================== -# Required for Home Assistant voice control via wake word (e.g., "Hey Vivi, turn off the lights") -# -# To get a long-lived access token: -# 1. Go to Home Assistant → Profile → Security tab -# 2. Scroll to "Long-lived access tokens" -# 3. Click "Create Token" -# 4. Copy the token and paste it below -# -# Configuration in config/plugins.yml: -# - Enable the homeassistant plugin -# - Set ha_url to your Home Assistant URL -# - Set ha_token to ${HA_TOKEN} (reads from this variable) -# -# SECURITY: This token grants full access to your Home Assistant. -# - Never commit .env or config/plugins.yml to version control -# - Rotate the token if it's ever exposed -# -HA_TOKEN= \ No newline at end of file +# Home Assistant long-lived access token (for voice control plugin) +HA_TOKEN= diff --git a/backends/advanced/cleanup.sh b/backends/advanced/cleanup.sh new file mode 100755 index 00000000..041e6364 --- /dev/null +++ b/backends/advanced/cleanup.sh @@ -0,0 +1,15 @@ +#!/bin/bash +# Wrapper script for cleanup_state.py +# Usage: ./cleanup.sh --backup --export-audio +# +# This script runs the cleanup_state.py script inside the chronicle-backend container +# to handle data ownership and permissions correctly. +# +# Examples: +# ./cleanup.sh --dry-run # Preview what would be deleted +# ./cleanup.sh --backup # Cleanup with metadata backup +# ./cleanup.sh --backup --export-audio # Full backup including audio +# ./cleanup.sh --backup --force # Skip confirmation prompts + +cd "$(dirname "$0")" +docker compose exec chronicle-backend uv run python src/scripts/cleanup_state.py "$@" diff --git a/backends/advanced/diarization_config.json.template b/backends/advanced/diarization_config.json.template deleted file mode 100644 index d760df85..00000000 --- a/backends/advanced/diarization_config.json.template +++ /dev/null @@ -1,9 +0,0 @@ -{ - "diarization_source": "pyannote", - "similarity_threshold": 0.15, - "min_duration": 0.5, - "collar": 2.0, - "min_duration_off": 1.5, - "min_speakers": 2, - "max_speakers": 6 -} \ No newline at end of file diff --git a/backends/advanced/docker-compose-test.yml b/backends/advanced/docker-compose-test.yml index 43a7f3b7..09cd04ca 100644 --- a/backends/advanced/docker-compose-test.yml +++ b/backends/advanced/docker-compose-test.yml @@ -16,8 +16,7 @@ services: - ./data/test_audio_chunks:/app/audio_chunks - ./data/test_debug_dir:/app/debug # Fixed: mount to /app/debug for plugin database - ./data/test_data:/app/data - - ../../config:/app/config # Mount config directory with defaults.yml - - ${CONFIG_FILE:-../../config/config.yml}:/app/config.yml # Override main config (for test-specific configs) + - ../../config:/app/config # Mount config directory with defaults.yml and config.yml - ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/plugins.yml # Mount test plugins config environment: # Override with test-specific settings @@ -169,8 +168,7 @@ services: - ./data/test_audio_chunks:/app/audio_chunks - ./data/test_debug_dir:/app/debug # Fixed: mount to /app/debug for plugin database - ./data/test_data:/app/data - - ../../config:/app/config # Mount config directory with defaults.yml - - ${CONFIG_FILE:-../../config/config.yml}:/app/config.yml # Override main config (for test-specific configs) + - ../../config:/app/config # Mount config directory with defaults.yml and config.yml - ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/plugins.yml # Mount test plugins config environment: # Same environment as backend diff --git a/backends/advanced/docker-compose.yml b/backends/advanced/docker-compose.yml index ed9e8356..c5d718a3 100644 --- a/backends/advanced/docker-compose.yml +++ b/backends/advanced/docker-compose.yml @@ -115,6 +115,33 @@ services: condition: service_started restart: unless-stopped + # Annotation Cron Scheduler + # Runs periodic jobs for AI-powered annotation suggestions: + # - Daily: Surface potential errors in transcripts/memories + # - Weekly: Fine-tune error detection models using user feedback + # Set DEV_MODE=true in .env for 1-minute intervals (testing) + annotation-cron: + build: + context: . + dockerfile: Dockerfile + target: prod + command: ["uv", "run", "python", "-m", "advanced_omi_backend.cron"] + container_name: chronicle-annotation-cron + env_file: + - .env + environment: + - MONGODB_URI=mongodb://mongo:27017 + - DEV_MODE=${DEV_MODE:-false} + - OPENAI_API_KEY=${OPENAI_API_KEY} + - LLM_PROVIDER=${LLM_PROVIDER:-openai} + - OLLAMA_BASE_URL=${OLLAMA_BASE_URL} + depends_on: + mongo: + condition: service_healthy + restart: unless-stopped + profiles: + - annotation # Optional profile - enable with: docker compose --profile annotation up + webui: build: context: ./webui diff --git a/backends/advanced/pyproject.toml b/backends/advanced/pyproject.toml index aa26a9b2..c5d17b00 100644 --- a/backends/advanced/pyproject.toml +++ b/backends/advanced/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "httpx>=0.28.0,<1.0.0", "fastapi-users[beanie]>=14.0.1", "PyYAML>=6.0.1", + "omegaconf>=2.3.0", "langfuse>=3.3.0", "spacy>=3.8.2", "en-core-web-sm @ https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl", diff --git a/backends/advanced/src/advanced_omi_backend/app_factory.py b/backends/advanced/src/advanced_omi_backend/app_factory.py index cf44ba34..79f893c0 100644 --- a/backends/advanced/src/advanced_omi_backend/app_factory.py +++ b/backends/advanced/src/advanced_omi_backend/app_factory.py @@ -57,10 +57,11 @@ async def lifespan(app: FastAPI): from advanced_omi_backend.models.audio_chunk import AudioChunkDocument from advanced_omi_backend.models.user import User from advanced_omi_backend.models.waveform import WaveformData + from advanced_omi_backend.models.annotation import Annotation await init_beanie( database=config.db, - document_models=[User, Conversation, AudioChunkDocument, WaveformData], + document_models=[User, Conversation, AudioChunkDocument, WaveformData, Annotation], ) application_logger.info("Beanie initialized for all document models") except Exception as e: diff --git a/backends/advanced/src/advanced_omi_backend/config.py b/backends/advanced/src/advanced_omi_backend/config.py index 543baeeb..20a08afe 100644 --- a/backends/advanced/src/advanced_omi_backend/config.py +++ b/backends/advanced/src/advanced_omi_backend/config.py @@ -1,273 +1,99 @@ """ Configuration management for Chronicle backend. -Currently contains diarization settings because they were used in multiple places -causing circular imports. Other configurations can be moved here as needed. +Uses OmegaConf for unified YAML configuration with environment variable interpolation. +Secrets are stored in .env files, all other config in config/config.yml. """ -import json import logging import os -import shutil -import yaml -from dataclasses import dataclass, asdict +from dataclasses import dataclass from pathlib import Path from typing import Optional +from omegaconf import OmegaConf + +from advanced_omi_backend.config_loader import ( + get_backend_config, + get_config_dir, + load_config, +) +from advanced_omi_backend.config_loader import reload_config as reload_omegaconf_config +from advanced_omi_backend.config_loader import ( + save_config_section, +) + logger = logging.getLogger(__name__) # Data directory paths DATA_DIR = Path(os.getenv("DATA_DIR", "/app/data")) CHUNK_DIR = Path("./audio_chunks") # Mounted to ./data/audio_chunks by Docker -# Default diarization settings -DEFAULT_DIARIZATION_SETTINGS = { - "diarization_source": "pyannote", - "similarity_threshold": 0.15, - "min_duration": 0.5, - "collar": 2.0, - "min_duration_off": 1.5, - "min_speakers": 2, - "max_speakers": 6 -} - -# Default speech detection settings -DEFAULT_SPEECH_DETECTION_SETTINGS = { - "min_words": 10, # Minimum words to create conversation (increased from 5) - "min_confidence": 0.7, # Word confidence threshold (increased from 0.5) - "min_duration": 10.0, # Minimum speech duration in seconds (increased from 2.0) -} - -# Default conversation stop settings -DEFAULT_CONVERSATION_STOP_SETTINGS = { - "transcription_buffer_seconds": 120, # Periodic transcription interval (2 minutes) - "speech_inactivity_threshold": 60, # Speech gap threshold for closure (1 minute) -} - -# Default audio storage settings -DEFAULT_AUDIO_STORAGE_SETTINGS = { - "audio_base_path": "/app/data", # Main audio directory (where volume is mounted) - "audio_chunks_path": "/app/audio_chunks", # Full path to audio chunks subfolder -} - -# Global cache for diarization settings -_diarization_settings = None - - -def get_diarization_config_path(): - """Get the path to the diarization config file.""" - # Try different locations in order of preference - # 1. Data directory (for persistence across container restarts) - data_path = Path("/app/data/diarization_config.json") - if data_path.parent.exists(): - return data_path - - # 2. App root directory - app_path = Path("/app/diarization_config.json") - if app_path.parent.exists(): - return app_path - - # 3. Local development path - local_path = Path("diarization_config.json") - return local_path - # ============================================================================ -# Configuration Merging System (for defaults.yml + config.yml) +# Configuration Functions (OmegaConf-based) # ============================================================================ -def get_config_dir() -> Path: +def get_config_yml_path() -> Path: """ - Get config directory path. Single source of truth for config location. - Matches root config_manager.py logic. + Get path to config.yml file. Returns: - Path to config directory + Path to config.yml """ - config_dir = os.getenv("CONFIG_DIR", "/app/config") - return Path(config_dir) - - -def get_config_yml_path() -> Path: - """Get path to config.yml file.""" return get_config_dir() / "config.yml" - -def get_defaults_yml_path() -> Path: - """Get path to defaults.yml file.""" - return get_config_dir() / "defaults.yml" - - -def get_defaults_config_path(): - """ - Get the path to the defaults config file. - - DEPRECATED: Use get_defaults_yml_path() instead. - Kept for backward compatibility. - """ - defaults_path = get_defaults_yml_path() - return defaults_path if defaults_path.exists() else None - - -def merge_configs(defaults: dict, overrides: dict) -> dict: +def get_config(force_reload: bool = False) -> dict: """ - Deep merge two configuration dictionaries. + Get merged configuration using OmegaConf. - Override values take precedence over defaults. - Lists are replaced (not merged). + Wrapper around load_config() from config_loader for backward compatibility. Args: - defaults: Default configuration values - overrides: User-provided overrides + force_reload: If True, reload from disk even if cached Returns: - Merged configuration dictionary + Merged configuration dictionary with all settings """ - result = defaults.copy() + cfg = load_config(force_reload=force_reload) + return OmegaConf.to_container(cfg, resolve=True) - for key, value in overrides.items(): - if key in result and isinstance(result[key], dict) and isinstance(value, dict): - # Recursively merge dictionaries - result[key] = merge_configs(result[key], value) - else: - # Override (lists, scalars, new keys) - result[key] = value - return result +def reload_config(): + """Reload configuration from disk (invalidate cache).""" + return reload_omegaconf_config() -# Global cache for merged config -_config_cache: Optional[dict] = None +# ============================================================================ +# Diarization Settings (OmegaConf-based) +# ============================================================================ +def get_diarization_settings() -> dict: + """ + Get diarization settings using OmegaConf. -def get_config(force_reload: bool = False) -> dict: + Returns: + Dict with diarization configuration (resolved from YAML + env vars) """ - Get merged configuration from defaults.yml + config.yml. + cfg = get_backend_config('diarization') + return OmegaConf.to_container(cfg, resolve=True) + - Priority order: config.yml > environment variables > defaults.yml +def save_diarization_settings(settings: dict) -> bool: + """ + Save diarization settings to config.yml using OmegaConf. Args: - force_reload: If True, reload from disk even if cached + settings: Dict with diarization settings to save Returns: - Merged configuration dictionary with all settings + True if saved successfully, False otherwise """ - global _config_cache - - if _config_cache is not None and not force_reload: - return _config_cache - - # Load defaults - defaults_path = get_defaults_yml_path() - defaults = {} - if defaults_path.exists(): - try: - with open(defaults_path, 'r') as f: - defaults = yaml.safe_load(f) or {} - logger.info(f"Loaded defaults from {defaults_path}") - except Exception as e: - logger.warning(f"Could not load defaults from {defaults_path}: {e}") - - # Load user config - config_path = get_config_yml_path() - user_config = {} - if config_path.exists(): - try: - with open(config_path, 'r') as f: - user_config = yaml.safe_load(f) or {} - logger.info(f"Loaded config from {config_path}") - except Exception as e: - logger.error(f"Error loading config from {config_path}: {e}") - - # Merge configurations - merged = merge_configs(defaults, user_config) - - # Resolve environment variables (lazy import to avoid circular dependency) - try: - from advanced_omi_backend.model_registry import _deep_resolve_env - merged = _deep_resolve_env(merged) - except ImportError: - # If model_registry not available, skip env resolution - # (will be resolved when model_registry loads the config) - logger.warning("Could not import _deep_resolve_env, environment variables may not be resolved") - - # Cache result - _config_cache = merged - - return merged - - -def reload_config(): - """Reload configuration from disk (invalidate cache).""" - global _config_cache - _config_cache = None - return get_config(force_reload=True) - - -def load_diarization_settings_from_file(): - """Load diarization settings from file or create from template.""" - global _diarization_settings - - config_path = get_diarization_config_path() - template_path = Path("/app/diarization_config.json.template") - - # If no template, try local development path - if not template_path.exists(): - template_path = Path("diarization_config.json.template") - - # If config doesn't exist, try to copy from template - if not config_path.exists(): - if template_path.exists(): - try: - # Ensure parent directory exists - config_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(template_path, config_path) - logger.info(f"Created diarization config from template at {config_path}") - except Exception as e: - logger.warning(f"Could not copy template to {config_path}: {e}") - - # Load from file if it exists - if config_path.exists(): - try: - with open(config_path, 'r') as f: - _diarization_settings = json.load(f) - logger.info(f"Loaded diarization settings from {config_path}") - return _diarization_settings - except Exception as e: - logger.error(f"Error loading diarization settings from {config_path}: {e}") - - # Fall back to defaults - _diarization_settings = DEFAULT_DIARIZATION_SETTINGS.copy() - logger.info("Using default diarization settings") - return _diarization_settings - - -def save_diarization_settings_to_file(settings): - """Save diarization settings to file.""" - global _diarization_settings - - config_path = get_diarization_config_path() - - try: - # Ensure parent directory exists - config_path.parent.mkdir(parents=True, exist_ok=True) - - # Write settings to file - with open(config_path, 'w') as f: - json.dump(settings, f, indent=2) - - # Update cache - _diarization_settings = settings - - logger.info(f"Saved diarization settings to {config_path}") - return True - except Exception as e: - logger.error(f"Error saving diarization settings to {config_path}: {e}") - return False + return save_config_section('backend.diarization', settings) # ============================================================================ -# Cleanup Settings (JSON file-based with in-memory caching) +# Cleanup Settings (OmegaConf-based) # ============================================================================ @dataclass @@ -276,123 +102,78 @@ class CleanupSettings: auto_cleanup_enabled: bool = False retention_days: int = 30 -# Global cache for cleanup settings -_cleanup_settings: Optional[CleanupSettings] = None - - -def get_cleanup_config_path() -> Path: - """Get path to cleanup settings JSON file.""" - data_dir = Path(os.getenv("DATA_DIR", "/app/data")) - data_dir.mkdir(parents=True, exist_ok=True) - return data_dir / "cleanup_config.json" - -def load_cleanup_settings_from_file() -> CleanupSettings: +def get_cleanup_settings() -> dict: """ - Load cleanup settings from JSON file or return defaults. + Get cleanup settings using OmegaConf. - Returns cached settings if available, otherwise loads from file. - If file doesn't exist, returns default settings. + Returns: + Dict with auto_cleanup_enabled and retention_days """ - global _cleanup_settings + cfg = get_backend_config('cleanup') + return OmegaConf.to_container(cfg, resolve=True) - # Return cached settings if available - if _cleanup_settings is not None: - return _cleanup_settings - config_path = get_cleanup_config_path() - - # Try to load from file - if config_path.exists(): - try: - with open(config_path, "r") as f: - data = json.load(f) - _cleanup_settings = CleanupSettings(**data) - logger.info(f"✅ Loaded cleanup settings: auto_cleanup={_cleanup_settings.auto_cleanup_enabled}, retention={_cleanup_settings.retention_days}d") - return _cleanup_settings - except Exception as e: - logger.error(f"❌ Failed to load cleanup settings from {config_path}: {e}") - - # Return defaults if file doesn't exist or failed to load - _cleanup_settings = CleanupSettings() - logger.info("Using default cleanup settings (auto_cleanup_enabled=False, retention_days=30)") - return _cleanup_settings - - -def save_cleanup_settings_to_file(settings: CleanupSettings) -> None: +def save_cleanup_settings(settings: CleanupSettings) -> bool: """ - Save cleanup settings to JSON file and update in-memory cache. + Save cleanup settings to config.yml using OmegaConf. Args: - settings: CleanupSettings to persist + settings: CleanupSettings dataclass instance - Raises: - Exception: If file write fails + Returns: + True if saved successfully, False otherwise """ - global _cleanup_settings - - config_path = get_cleanup_config_path() - - try: - # Save to JSON file - with open(config_path, "w") as f: - json.dump(asdict(settings), f, indent=2) + from dataclasses import asdict + return save_config_section('backend.cleanup', asdict(settings)) - # Update in-memory cache - _cleanup_settings = settings - - logger.info(f"✅ Saved cleanup settings: auto_cleanup={settings.auto_cleanup_enabled}, retention={settings.retention_days}d") - except Exception as e: - logger.error(f"❌ Failed to save cleanup settings to {config_path}: {e}") - raise +# ============================================================================ +# Speech Detection Settings (OmegaConf-based) +# ============================================================================ -def get_cleanup_settings() -> dict: +def get_speech_detection_settings() -> dict: """ - Get current cleanup settings as dict (for API responses). + Get speech detection settings using OmegaConf. Returns: - Dict with auto_cleanup_enabled and retention_days + Dict with min_words, min_confidence, min_duration """ - settings = load_cleanup_settings_from_file() - return { - "auto_cleanup_enabled": settings.auto_cleanup_enabled, - "retention_days": settings.retention_days, - } + cfg = get_backend_config('speech_detection') + return OmegaConf.to_container(cfg, resolve=True) -def get_speech_detection_settings(): - """Get speech detection settings from environment or defaults.""" +# ============================================================================ +# Conversation Stop Settings (OmegaConf-based) +# ============================================================================ - return { - "min_words": int(os.getenv("SPEECH_DETECTION_MIN_WORDS", DEFAULT_SPEECH_DETECTION_SETTINGS["min_words"])), - "min_confidence": float(os.getenv("SPEECH_DETECTION_MIN_CONFIDENCE", DEFAULT_SPEECH_DETECTION_SETTINGS["min_confidence"])), - "min_duration": float(os.getenv("SPEECH_DETECTION_MIN_DURATION", DEFAULT_SPEECH_DETECTION_SETTINGS["min_duration"])), - } +def get_conversation_stop_settings() -> dict: + """ + Get conversation stop settings using OmegaConf. + Returns: + Dict with transcription_buffer_seconds, speech_inactivity_threshold + """ + cfg = get_backend_config('conversation_stop') + settings = OmegaConf.to_container(cfg, resolve=True) -def get_conversation_stop_settings(): - """Get conversation stop settings from environment or defaults.""" + # Add min_word_confidence from speech_detection for backward compatibility + speech_cfg = get_backend_config('speech_detection') + settings['min_word_confidence'] = OmegaConf.to_container(speech_cfg, resolve=True).get('min_confidence', 0.7) - return { - "transcription_buffer_seconds": float(os.getenv("TRANSCRIPTION_BUFFER_SECONDS", DEFAULT_CONVERSATION_STOP_SETTINGS["transcription_buffer_seconds"])), - "speech_inactivity_threshold": float(os.getenv("SPEECH_INACTIVITY_THRESHOLD_SECONDS", DEFAULT_CONVERSATION_STOP_SETTINGS["speech_inactivity_threshold"])), - "min_word_confidence": float(os.getenv("SPEECH_DETECTION_MIN_CONFIDENCE", DEFAULT_SPEECH_DETECTION_SETTINGS["min_confidence"])), - } + return settings -def get_audio_storage_settings(): - """Get audio storage settings from environment or defaults.""" - - # Get base path and derive chunks path - audio_base_path = os.getenv("AUDIO_BASE_PATH", DEFAULT_AUDIO_STORAGE_SETTINGS["audio_base_path"]) - audio_chunks_path = os.getenv("AUDIO_CHUNKS_PATH", f"{audio_base_path}/audio_chunks") - - return { - "audio_base_path": audio_base_path, - "audio_chunks_path": audio_chunks_path, - } +# ============================================================================ +# Audio Storage Settings (OmegaConf-based) +# ============================================================================ +def get_audio_storage_settings() -> dict: + """ + Get audio storage settings using OmegaConf. -# Initialize settings on module load -_diarization_settings = load_diarization_settings_from_file() \ No newline at end of file + Returns: + Dict with audio_base_path, audio_chunks_path + """ + cfg = get_backend_config('audio_storage') + return OmegaConf.to_container(cfg, resolve=True) \ No newline at end of file diff --git a/backends/advanced/src/advanced_omi_backend/config_loader.py b/backends/advanced/src/advanced_omi_backend/config_loader.py new file mode 100644 index 00000000..6b504c79 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/config_loader.py @@ -0,0 +1,162 @@ +""" +OmegaConf-based configuration management for Chronicle. + +Provides unified config loading with environment variable interpolation. +""" + +import logging +import os +from pathlib import Path +from typing import Optional + +from omegaconf import DictConfig, OmegaConf + +logger = logging.getLogger(__name__) + +# Global config cache +_config_cache: Optional[DictConfig] = None + + +def get_config_dir() -> Path: + """Get config directory path (single source of truth).""" + config_dir = os.getenv("CONFIG_DIR", "/app/config") + return Path(config_dir) + + +def get_plugins_yml_path() -> Path: + """ + Get path to plugins.yml file (single source of truth). + + Returns: + Path to plugins.yml + """ + return get_config_dir() / "plugins.yml" + + +def load_config(force_reload: bool = False) -> DictConfig: + """ + Load and merge configuration using OmegaConf. + + Merge priority (later overrides earlier): + 1. config/defaults.yml (shipped defaults) + 2. config/config.yml (user overrides) + 3. Environment variables (via ${oc.env:VAR,default} syntax) + + Args: + force_reload: If True, reload from disk even if cached + + Returns: + Merged DictConfig with all settings + """ + global _config_cache + + if _config_cache is not None and not force_reload: + return _config_cache + + config_dir = get_config_dir() + defaults_path = config_dir / "defaults.yml" + config_path = config_dir / "config.yml" + + # Load defaults + defaults = {} + if defaults_path.exists(): + try: + defaults = OmegaConf.load(defaults_path) + logger.info(f"Loaded defaults from {defaults_path}") + except Exception as e: + logger.warning(f"Could not load defaults from {defaults_path}: {e}") + + # Load user config + user_config = {} + if config_path.exists(): + try: + user_config = OmegaConf.load(config_path) + logger.info(f"Loaded config from {config_path}") + except Exception as e: + logger.error(f"Error loading config from {config_path}: {e}") + + # Merge configurations (user config overrides defaults) + merged = OmegaConf.merge(defaults, user_config) + + # Cache result + _config_cache = merged + + logger.info("Configuration loaded successfully with OmegaConf") + return merged + + +def reload_config() -> DictConfig: + """Reload configuration from disk (invalidate cache).""" + global _config_cache + _config_cache = None + return load_config(force_reload=True) + + +def get_backend_config(section: Optional[str] = None) -> DictConfig: + """ + Get backend configuration section. + + Args: + section: Optional subsection (e.g., 'diarization', 'cleanup') + + Returns: + DictConfig for backend section or subsection + """ + cfg = load_config() + if 'backend' not in cfg: + return OmegaConf.create({}) + + backend_cfg = cfg.backend + if section: + return backend_cfg.get(section, OmegaConf.create({})) + return backend_cfg + + +def get_service_config(service_name: str) -> DictConfig: + """ + Get service configuration section. + + Args: + service_name: Service name (e.g., 'speaker_recognition', 'asr_services') + + Returns: + DictConfig for service section + """ + cfg = load_config() + return cfg.get(service_name, OmegaConf.create({})) + + +def save_config_section(section_path: str, values: dict) -> bool: + """ + Update a config section and save to config.yml. + + Args: + section_path: Dot-separated path (e.g., 'backend.diarization') + values: Dict with new values + + Returns: + True if saved successfully + """ + try: + config_path = get_config_dir() / "config.yml" + + # Load existing config + existing_config = {} + if config_path.exists(): + existing_config = OmegaConf.load(config_path) + + # Update section using dot notation + OmegaConf.update(existing_config, section_path, values, merge=True) + + # Save back to file + OmegaConf.save(existing_config, config_path) + + # Invalidate cache + reload_config() + + logger.info(f"Saved config section '{section_path}' to {config_path}") + return True + + except Exception as e: + logger.error(f"Error saving config section '{section_path}': {e}") + return False diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 3a2a8af8..29d303b6 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -200,6 +200,7 @@ async def upload_and_process_audio_files( conversation_id=conversation_id, audio_uuid=audio_uuid, user_id=user.user_id, + transcript_version_id=version_id, # Pass the version_id from transcription job depends_on_job=transcription_job, # Wait for transcription to complete client_id=client_id # Pass client_id for UI tracking ) diff --git a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py index de7e046e..c815bae3 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py @@ -4,6 +4,7 @@ import logging import time +import uuid from datetime import datetime from pathlib import Path @@ -13,9 +14,22 @@ ClientManager, client_belongs_to_user, ) -from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.config_loader import get_service_config +from advanced_omi_backend.controllers.queue_controller import ( + JOB_RESULT_TTL, + default_queue, + memory_queue, + transcription_queue, +) from advanced_omi_backend.models.audio_chunk import AudioChunkDocument +from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.models.job import JobPriority from advanced_omi_backend.users import User +from advanced_omi_backend.workers.memory_jobs import ( + enqueue_memory_processing, + process_memory_job, +) +from advanced_omi_backend.workers.speaker_jobs import recognise_speakers_job logger = logging.getLogger(__name__) audio_logger = logging.getLogger("audio_processing") @@ -137,10 +151,15 @@ async def get_conversations(user: User, include_deleted: bool = False): if not user.is_superuser: # Regular users can only see their own conversations # Filter by deleted status - query = Conversation.user_id == str(user.user_id) if not include_deleted: - query = query & (Conversation.deleted == False) - user_conversations = await Conversation.find(query).sort(-Conversation.created_at).to_list() + user_conversations = await Conversation.find( + Conversation.user_id == str(user.user_id), + Conversation.deleted == False + ).sort(-Conversation.created_at).to_list() + else: + user_conversations = await Conversation.find( + Conversation.user_id == str(user.user_id) + ).sort(-Conversation.created_at).to_list() else: # Admins see all conversations # Filter by deleted status @@ -409,18 +428,9 @@ async def reprocess_transcript(conversation_id: str, user: User): ) # Create new transcript version ID - import uuid version_id = str(uuid.uuid4()) # Enqueue job chain with RQ (transcription -> speaker recognition -> memory) - from advanced_omi_backend.controllers.queue_controller import ( - JOB_RESULT_TTL, - default_queue, - memory_queue, - transcription_queue, - ) - from advanced_omi_backend.workers.memory_jobs import process_memory_job - from advanced_omi_backend.workers.speaker_jobs import recognise_speakers_job from advanced_omi_backend.workers.transcription_jobs import ( transcribe_full_audio_job, ) @@ -440,33 +450,48 @@ async def reprocess_transcript(conversation_id: str, user: User): ) logger.info(f"📥 RQ: Enqueued transcription job {transcript_job.id}") - # Job 2: Recognize speakers (depends on transcription, reads data from DB) - speaker_job = transcription_queue.enqueue( - recognise_speakers_job, - conversation_id, - version_id, - depends_on=transcript_job, - job_timeout=600, - result_ttl=JOB_RESULT_TTL, - job_id=f"speaker_{conversation_id[:8]}", - description=f"Recognize speakers for {conversation_id[:8]}", - meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} - ) - logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id} (depends on {transcript_job.id})") + # Check if speaker recognition is enabled + speaker_config = get_service_config('speaker_recognition') + speaker_enabled = speaker_config.get('enabled', True) # Default to True for backward compatibility + + # Job 2: Recognize speakers (conditional - only if enabled) + speaker_dependency = transcript_job # Start with transcription job + speaker_job = None + + if speaker_enabled: + speaker_job = transcription_queue.enqueue( + recognise_speakers_job, + conversation_id, + version_id, + depends_on=transcript_job, + job_timeout=600, + result_ttl=JOB_RESULT_TTL, + job_id=f"speaker_{conversation_id[:8]}", + description=f"Recognize speakers for {conversation_id[:8]}", + meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} + ) + speaker_dependency = speaker_job # Chain for next job + logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id} (depends on {transcript_job.id})") + else: + logger.info(f"⏭️ Speaker recognition disabled, skipping speaker job for conversation {conversation_id[:8]}") - # Job 3: Extract memories (depends on speaker recognition) + # Job 3: Extract memories + # Depends on speaker job if it was created, otherwise depends on transcription # Note: redis_client is injected by @async_job decorator, don't pass it directly memory_job = memory_queue.enqueue( process_memory_job, conversation_id, - depends_on=speaker_job, + depends_on=speaker_dependency, # Either speaker_job or transcript_job job_timeout=1800, result_ttl=JOB_RESULT_TTL, job_id=f"memory_{conversation_id[:8]}", description=f"Extract memories for {conversation_id[:8]}", meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id} ) - logger.info(f"📥 RQ: Enqueued memory job {memory_job.id} (depends on {speaker_job.id})") + if speaker_job: + logger.info(f"📥 RQ: Enqueued memory job {memory_job.id} (depends on speaker job {speaker_job.id})") + else: + logger.info(f"📥 RQ: Enqueued memory job {memory_job.id} (depends on transcript job {transcript_job.id})") job = transcript_job # For backward compatibility with return value logger.info(f"Created transcript reprocessing job {job.id} (version: {version_id}) for conversation {conversation_id}") @@ -518,12 +543,9 @@ async def reprocess_memory(conversation_id: str, transcript_version_id: str, use ) # Create new memory version ID - import uuid version_id = str(uuid.uuid4()) # Enqueue memory processing job with RQ (RQ handles job tracking) - from advanced_omi_backend.models.job import JobPriority - from advanced_omi_backend.workers.memory_jobs import enqueue_memory_processing job = enqueue_memory_processing( client_id=conversation_model.client_id, diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index d83c7075..f8710718 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -22,6 +22,7 @@ from advanced_omi_backend.models.job import JobPriority from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.config_loader import get_service_config logger = logging.getLogger(__name__) @@ -405,27 +406,39 @@ def start_post_conversation_jobs( if client_id: job_meta['client_id'] = client_id - # Step 1: Speaker recognition job (uses streaming transcript from conversation document) - speaker_job_id = f"speaker_{conversation_id[:12]}" - logger.info(f"🔍 DEBUG: Creating speaker job with job_id={speaker_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") - - speaker_job = transcription_queue.enqueue( - recognise_speakers_job, - conversation_id, - version_id, - job_timeout=1200, # 20 minutes - result_ttl=JOB_RESULT_TTL, - depends_on=depends_on_job, # Direct dependency (no transcription job) - job_id=speaker_job_id, - description=f"Speaker recognition for conversation {conversation_id[:8]}", - meta=job_meta - ) - if depends_on_job: - logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (depends on {depends_on_job.id})") + # Check if speaker recognition is enabled + speaker_config = get_service_config('speaker_recognition') + speaker_enabled = speaker_config.get('enabled', True) # Default to True for backward compatibility + + # Step 1: Speaker recognition job (conditional - only if enabled) + speaker_dependency = depends_on_job # Start with upstream dependency (transcription if file upload) + speaker_job = None + + if speaker_enabled: + speaker_job_id = f"speaker_{conversation_id[:12]}" + logger.info(f"🔍 DEBUG: Creating speaker job with job_id={speaker_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") + + speaker_job = transcription_queue.enqueue( + recognise_speakers_job, + conversation_id, + version_id, + job_timeout=1200, # 20 minutes + result_ttl=JOB_RESULT_TTL, + depends_on=speaker_dependency, + job_id=speaker_job_id, + description=f"Speaker recognition for conversation {conversation_id[:8]}", + meta=job_meta + ) + speaker_dependency = speaker_job # Chain for next jobs + if depends_on_job: + logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (depends on {depends_on_job.id})") + else: + logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (no dependencies, starts immediately)") else: - logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (no dependencies, starts immediately)") + logger.info(f"⏭️ Speaker recognition disabled, skipping speaker job for conversation {conversation_id[:8]}") - # Step 3: Memory extraction job (parallel with title/summary) + # Step 2: Memory extraction job + # Depends on speaker job if it was created, otherwise depends on upstream (transcription or nothing) memory_job_id = f"memory_{conversation_id[:12]}" logger.info(f"🔍 DEBUG: Creating memory job with job_id={memory_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") @@ -434,15 +447,20 @@ def start_post_conversation_jobs( conversation_id, job_timeout=900, # 15 minutes result_ttl=JOB_RESULT_TTL, - depends_on=speaker_job, + depends_on=speaker_dependency, # Either speaker_job or upstream dependency job_id=memory_job_id, description=f"Memory extraction for conversation {conversation_id[:8]}", meta=job_meta ) - logger.info(f"📥 RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (depends on {speaker_job.id})") + if speaker_job: + logger.info(f"📥 RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (depends on speaker job {speaker_job.id})") + elif depends_on_job: + logger.info(f"📥 RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (depends on {depends_on_job.id})") + else: + logger.info(f"📥 RQ: Enqueued memory extraction job {memory_job.id}, meta={memory_job.meta} (no dependencies, starts immediately)") - # Step 4: Title/summary generation job (parallel with memory, independent) - # This ensures conversations always get titles/summaries even if memory job fails + # Step 3: Title/summary generation job + # Depends on speaker job if enabled, otherwise on upstream dependency title_job_id = f"title_summary_{conversation_id[:12]}" logger.info(f"🔍 DEBUG: Creating title/summary job with job_id={title_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") @@ -451,12 +469,17 @@ def start_post_conversation_jobs( conversation_id, job_timeout=300, # 5 minutes result_ttl=JOB_RESULT_TTL, - depends_on=speaker_job, # Depends on speaker job, NOT memory job + depends_on=speaker_dependency, # Depends on speaker job if enabled, NOT memory job job_id=title_job_id, description=f"Generate title and summary for conversation {conversation_id[:8]}", meta=job_meta ) - logger.info(f"📥 RQ: Enqueued title/summary job {title_summary_job.id}, meta={title_summary_job.meta} (depends on {speaker_job.id})") + if speaker_job: + logger.info(f"📥 RQ: Enqueued title/summary job {title_summary_job.id}, meta={title_summary_job.meta} (depends on speaker job {speaker_job.id})") + elif depends_on_job: + logger.info(f"📥 RQ: Enqueued title/summary job {title_summary_job.id}, meta={title_summary_job.meta} (depends on {depends_on_job.id})") + else: + logger.info(f"📥 RQ: Enqueued title/summary job {title_summary_job.id}, meta={title_summary_job.meta} (no dependencies, starts immediately)") # Step 5: Dispatch conversation.complete event (runs after both memory and title/summary complete) # This ensures plugins receive the event after all processing is done diff --git a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py index 46812a8a..575e4dcb 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/system_controller.py @@ -6,6 +6,7 @@ import os import shutil import time +import warnings from datetime import UTC, datetime from pathlib import Path @@ -13,9 +14,12 @@ from fastapi import HTTPException from advanced_omi_backend.config import ( - load_diarization_settings_from_file, - save_diarization_settings_to_file, + get_diarization_settings as load_diarization_settings, ) +from advanced_omi_backend.config import ( + save_diarization_settings, +) +from advanced_omi_backend.config_loader import get_plugins_yml_path from advanced_omi_backend.model_registry import _find_config_path, load_models_config from advanced_omi_backend.models.user import User @@ -23,6 +27,201 @@ audio_logger = logging.getLogger("audio_processing") +async def get_config_diagnostics(): + """ + Get comprehensive configuration diagnostics. + + Returns warnings, errors, and status for all configuration components. + """ + diagnostics = { + "timestamp": datetime.now(UTC).isoformat(), + "overall_status": "healthy", + "issues": [], + "warnings": [], + "info": [], + "components": {} + } + + # Test OmegaConf configuration loading + try: + from advanced_omi_backend.config_loader import load_config + + # Capture warnings during config load + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + config = load_config(force_reload=True) + + # Check for OmegaConf warnings + for warning in w: + warning_msg = str(warning.message) + if "some elements are missing" in warning_msg.lower(): + # Extract the variable name from warning + if "variable '" in warning_msg.lower(): + var_name = warning_msg.split("'")[1] + diagnostics["warnings"].append({ + "component": "OmegaConf", + "severity": "warning", + "message": f"Environment variable '{var_name}' not set (using empty default)", + "resolution": f"Set {var_name} in .env file if needed" + }) + + diagnostics["components"]["omegaconf"] = { + "status": "healthy", + "message": "Configuration loaded successfully" + } + except Exception as e: + diagnostics["overall_status"] = "unhealthy" + diagnostics["issues"].append({ + "component": "OmegaConf", + "severity": "error", + "message": f"Failed to load configuration: {str(e)}", + "resolution": "Check config/defaults.yml and config/config.yml syntax" + }) + diagnostics["components"]["omegaconf"] = { + "status": "unhealthy", + "message": str(e) + } + + # Test model registry + try: + from advanced_omi_backend.model_registry import get_models_registry + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + registry = get_models_registry() + + # Capture model loading warnings + for warning in w: + warning_msg = str(warning.message) + diagnostics["warnings"].append({ + "component": "Model Registry", + "severity": "warning", + "message": warning_msg, + "resolution": "Check model definitions in config/defaults.yml" + }) + + if registry: + diagnostics["components"]["model_registry"] = { + "status": "healthy", + "message": f"Loaded {len(registry.models)} models", + "details": { + "total_models": len(registry.models), + "defaults": dict(registry.defaults) if registry.defaults else {} + } + } + + # Check critical models + stt = registry.get_default("stt") + stt_stream = registry.get_default("stt_stream") + llm = registry.get_default("llm") + + # STT check + if stt: + if stt.api_key: + diagnostics["info"].append({ + "component": "STT (Batch)", + "message": f"Configured: {stt.name} ({stt.model_provider}) - API key present" + }) + else: + diagnostics["warnings"].append({ + "component": "STT (Batch)", + "severity": "warning", + "message": f"{stt.name} ({stt.model_provider}) - No API key configured", + "resolution": "Transcription will fail without API key" + }) + else: + diagnostics["issues"].append({ + "component": "STT (Batch)", + "severity": "error", + "message": "No batch STT model configured", + "resolution": "Set defaults.stt in config.yml" + }) + diagnostics["overall_status"] = "partial" + + # Streaming STT check + if stt_stream: + if stt_stream.api_key: + diagnostics["info"].append({ + "component": "STT (Streaming)", + "message": f"Configured: {stt_stream.name} ({stt_stream.model_provider}) - API key present" + }) + else: + diagnostics["warnings"].append({ + "component": "STT (Streaming)", + "severity": "warning", + "message": f"{stt_stream.name} ({stt_stream.model_provider}) - No API key configured", + "resolution": "Real-time transcription will fail without API key" + }) + else: + diagnostics["warnings"].append({ + "component": "STT (Streaming)", + "severity": "warning", + "message": "No streaming STT model configured - streaming worker disabled", + "resolution": "Set defaults.stt_stream in config.yml for WebSocket transcription" + }) + + # LLM check + if llm: + if llm.api_key: + diagnostics["info"].append({ + "component": "LLM", + "message": f"Configured: {llm.name} ({llm.model_provider}) - API key present" + }) + else: + diagnostics["warnings"].append({ + "component": "LLM", + "severity": "warning", + "message": f"{llm.name} ({llm.model_provider}) - No API key configured", + "resolution": "Memory extraction will fail without API key" + }) + + else: + diagnostics["overall_status"] = "unhealthy" + diagnostics["issues"].append({ + "component": "Model Registry", + "severity": "error", + "message": "Failed to load model registry", + "resolution": "Check config/defaults.yml for syntax errors" + }) + diagnostics["components"]["model_registry"] = { + "status": "unhealthy", + "message": "Registry failed to load" + } + except Exception as e: + diagnostics["overall_status"] = "partial" + diagnostics["issues"].append({ + "component": "Model Registry", + "severity": "error", + "message": f"Error loading registry: {str(e)}", + "resolution": "Check logs for detailed error information" + }) + diagnostics["components"]["model_registry"] = { + "status": "unhealthy", + "message": str(e) + } + + # Check environment variables + env_checks = [ + ("DEEPGRAM_API_KEY", "Required for Deepgram transcription"), + ("OPENAI_API_KEY", "Required for OpenAI LLM and embeddings"), + ("AUTH_SECRET_KEY", "Required for authentication"), + ("ADMIN_EMAIL", "Required for admin user login"), + ("ADMIN_PASSWORD", "Required for admin user login"), + ] + + for env_var, description in env_checks: + value = os.getenv(env_var) + if not value or value == "": + diagnostics["warnings"].append({ + "component": "Environment Variables", + "severity": "warning", + "message": f"{env_var} not set - {description}", + "resolution": f"Set {env_var} in .env file" + }) + + return diagnostics + + async def get_current_metrics(): """Get current system metrics.""" try: @@ -65,8 +264,8 @@ async def get_auth_config(): async def get_diarization_settings(): """Get current diarization settings.""" try: - # Reload from file to get latest settings - settings = load_diarization_settings_from_file() + # Get settings using OmegaConf + settings = load_diarization_settings() return { "settings": settings, "status": "success" @@ -76,7 +275,7 @@ async def get_diarization_settings(): raise e -async def save_diarization_settings(settings: dict): +async def save_diarization_settings_controller(settings: dict): """Save diarization settings.""" try: # Validate settings @@ -99,29 +298,28 @@ async def save_diarization_settings(settings: dict): else: if not isinstance(value, (int, float)) or value < 0: raise HTTPException(status_code=400, detail=f"Invalid value for {key}: must be positive number") - + # Get current settings and merge with new values - current_settings = load_diarization_settings_from_file() + current_settings = load_diarization_settings() current_settings.update(settings) - - # Save to file - if save_diarization_settings_to_file(current_settings): + + # Save using OmegaConf + if save_diarization_settings(current_settings): logger.info(f"Updated and saved diarization settings: {settings}") - + return { "message": "Diarization settings saved successfully", "settings": current_settings, "status": "success" } else: - # Even if file save fails, we've updated the in-memory settings - logger.warning("Settings updated in memory but file save failed") + logger.warning("Settings save failed") return { - "message": "Settings updated (file save failed)", + "message": "Settings save failed", "settings": current_settings, - "status": "partial" + "status": "error" } - + except Exception as e: logger.exception("Error saving diarization settings") raise e @@ -161,7 +359,7 @@ async def save_cleanup_settings_controller( Raises: ValueError: If validation fails """ - from advanced_omi_backend.config import CleanupSettings, save_cleanup_settings_to_file + from advanced_omi_backend.config import CleanupSettings, save_cleanup_settings # Validation if not isinstance(auto_cleanup_enabled, bool): @@ -179,8 +377,8 @@ async def save_cleanup_settings_controller( retention_days=retention_days ) - # Save to file (also updates in-memory cache) - save_cleanup_settings_to_file(settings) + # Save using OmegaConf + save_cleanup_settings(settings) logger.info(f"Admin {user.email} updated cleanup settings: auto_cleanup={auto_cleanup_enabled}, retention={retention_days}d") @@ -627,7 +825,7 @@ async def validate_chat_config_yaml(prompt_text: str) -> dict: async def get_plugins_config_yaml() -> str: """Get plugins configuration as YAML text.""" try: - plugins_yml_path = Path("/app/plugins.yml") + plugins_yml_path = get_plugins_yml_path() # Default empty plugins config default_config = """plugins: @@ -659,7 +857,7 @@ async def get_plugins_config_yaml() -> str: async def save_plugins_config_yaml(yaml_content: str) -> dict: """Save plugins configuration from YAML text.""" try: - plugins_yml_path = Path("/app/plugins.yml") + plugins_yml_path = get_plugins_yml_path() # Validate YAML can be parsed try: diff --git a/backends/advanced/src/advanced_omi_backend/cron.py b/backends/advanced/src/advanced_omi_backend/cron.py new file mode 100644 index 00000000..161ceb31 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/cron.py @@ -0,0 +1,121 @@ +""" +Annotation cron scheduler for AI-powered suggestion surfacing. + +This scheduler runs background jobs to: +1. Surface AI suggestions for potential transcript/memory errors (daily) +2. Fine-tune error detection models using user feedback (weekly) + +Configuration via environment variables: +- MONGODB_URI: MongoDB connection string +- DEV_MODE: When true, uses 1-minute intervals for testing + +Usage: + uv run python -m advanced_omi_backend.cron +""" + +import asyncio +import logging +import os +from datetime import datetime, timezone + +from beanie import init_beanie +from motor.motor_asyncio import AsyncIOMotorClient + +from advanced_omi_backend.models.annotation import Annotation +from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.models.user import User +from advanced_omi_backend.workers.annotation_jobs import ( + finetune_hallucination_model, + surface_error_suggestions, +) + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +# Configuration +MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://mongo:27017") +DEV_MODE = os.getenv("DEV_MODE", "false").lower() == "true" + +# Intervals (1 minute in dev, normal in production) +if DEV_MODE: + SUGGESTION_INTERVAL = 60 # 1 minute for dev testing + TRAINING_INTERVAL = 60 # 1 minute for dev testing + logger.info("🔧 DEV_MODE enabled - using 1-minute intervals for testing") +else: + SUGGESTION_INTERVAL = 24 * 60 * 60 # Daily + TRAINING_INTERVAL = 7 * 24 * 60 * 60 # Weekly + logger.info("📅 Production mode - using daily/weekly intervals") + + +async def init_db(): + """Initialize database connection""" + try: + client = AsyncIOMotorClient(MONGODB_URI) + await init_beanie( + database=client.chronicle, + document_models=[Annotation, Conversation, User], + ) + logger.info("✅ Database connection initialized") + except Exception as e: + logger.error(f"❌ Failed to initialize database: {e}") + raise + + +async def run_scheduler(): + """Main scheduler loop""" + await init_db() + logger.info("🕐 Annotation cron scheduler started") + logger.info(f" - Suggestion interval: {SUGGESTION_INTERVAL}s") + logger.info(f" - Training interval: {TRAINING_INTERVAL}s") + + last_suggestion_run = datetime.now(timezone.utc) + last_training_run = datetime.now(timezone.utc) + + while True: + try: + now = datetime.now(timezone.utc) + + # Daily: Surface AI suggestions + if (now - last_suggestion_run).total_seconds() >= SUGGESTION_INTERVAL: + logger.info(f"🤖 Running suggestion surfacing at {now}") + try: + await surface_error_suggestions() + last_suggestion_run = now + logger.info("✅ Suggestion surfacing completed") + except Exception as e: + logger.error(f"❌ Suggestion job failed: {e}", exc_info=True) + + # Weekly: Fine-tune model + if (now - last_training_run).total_seconds() >= TRAINING_INTERVAL: + logger.info(f"🎓 Running model fine-tuning at {now}") + try: + await finetune_hallucination_model() + last_training_run = now + logger.info("✅ Model fine-tuning completed") + except Exception as e: + logger.error(f"❌ Training job failed: {e}", exc_info=True) + + # Sleep for check interval + await asyncio.sleep(60) # Check every minute + + except KeyboardInterrupt: + logger.info("⛔ Scheduler stopped by user") + break + except Exception as e: + logger.error(f"❌ Unexpected error in scheduler loop: {e}", exc_info=True) + # Continue running despite errors + await asyncio.sleep(60) + + +if __name__ == "__main__": + logger.info("🚀 Starting annotation cron scheduler...") + try: + asyncio.run(run_scheduler()) + except KeyboardInterrupt: + logger.info("👋 Annotation cron scheduler stopped") + except Exception as e: + logger.error(f"💥 Fatal error: {e}", exc_info=True) + exit(1) diff --git a/backends/advanced/src/advanced_omi_backend/model_registry.py b/backends/advanced/src/advanced_omi_backend/model_registry.py index d0a46ab6..382674da 100644 --- a/backends/advanced/src/advanced_omi_backend/model_registry.py +++ b/backends/advanced/src/advanced_omi_backend/model_registry.py @@ -4,12 +4,11 @@ definitions (LLM, embeddings, etc.) in a provider-agnostic way. Now using Pydantic for robust validation and type safety. +Environment variable resolution is handled by OmegaConf in the config module. """ from __future__ import annotations -import os -import re import yaml from pathlib import Path from typing import Any, Dict, List, Optional @@ -18,77 +17,9 @@ from pydantic import BaseModel, Field, field_validator, model_validator, ConfigDict, ValidationError # Import config merging for defaults.yml + config.yml integration +# OmegaConf handles environment variable resolution (${VAR:-default} syntax) from advanced_omi_backend.config import get_config -def _resolve_env(value: Any) -> Any: - """Resolve ``${VAR:-default}`` patterns inside a single value. - - This helper is intentionally minimal: it only operates on strings and leaves - all other types unchanged. Patterns of the form ``${VAR}`` or - ``${VAR:-default}`` are expanded using ``os.getenv``: - - - If the environment variable **VAR** is set, its value is used. - - Otherwise the optional ``default`` is used (or ``\"\"`` if omitted). - - Examples: - >>> os.environ.get("OLLAMA_MODEL") - >>> _resolve_env("${OLLAMA_MODEL:-llama3.1:latest}") - 'llama3.1:latest' - - >>> os.environ["OLLAMA_MODEL"] = "llama3.2:latest" - >>> _resolve_env("${OLLAMA_MODEL:-llama3.1:latest}") - 'llama3.2:latest' - - >>> _resolve_env("Bearer ${OPENAI_API_KEY:-}") - 'Bearer ' # when OPENAI_API_KEY is not set - - Note: - Use :func:`_deep_resolve_env` to apply this logic to an entire - nested config structure (dicts/lists) loaded from YAML. - """ - if not isinstance(value, str): - return value - - pattern = re.compile(r"\$\{([^}:]+)(?::-(.*?))?\}") - - def repl(match: re.Match[str]) -> str: - var, default = match.group(1), match.group(2) - return os.getenv(var, default or "") - - return pattern.sub(repl, value) - - -def _deep_resolve_env(data: Any) -> Any: - """Recursively resolve environment variables in nested structures. - - This walks arbitrary Python structures produced by ``yaml.safe_load`` and - applies :func:`_resolve_env` to every string it finds. Dictionaries and - lists are traversed deeply; scalars are passed through unchanged. - - Examples: - >>> os.environ["OPENAI_MODEL"] = "gpt-4o-mini" - >>> cfg = { - ... "models": [ - ... {"model_name": "${OPENAI_MODEL:-gpt-4o-mini}"}, - ... {"model_url": "${OPENAI_BASE_URL:-https://api.openai.com/v1}"} - ... ] - ... } - >>> resolved = _deep_resolve_env(cfg) - >>> resolved["models"][0]["model_name"] - 'gpt-4o-mini' - >>> resolved["models"][1]["model_url"] - 'https://api.openai.com/v1' - - This is what :func:`load_models_config` uses immediately after loading - ``config.yml`` so that all ``${VAR:-default}`` placeholders are resolved - before Pydantic validation and model registry construction. - """ - if isinstance(data, dict): - return {k: _deep_resolve_env(v) for k, v in data.items()} - if isinstance(data, list): - return [_deep_resolve_env(v) for v in data] - return _resolve_env(data) - class ModelDef(BaseModel): """Model definition with validation. @@ -270,7 +201,8 @@ def load_models_config(force_reload: bool = False) -> Optional[AppModels]: """Load model configuration from merged defaults.yml + config.yml. This function loads defaults.yml and config.yml, merges them with user overrides, - resolves environment variables, validates model definitions using Pydantic, and caches the result. + validates model definitions using Pydantic, and caches the result. + Environment variables are resolved by OmegaConf during config loading. Args: force_reload: If True, reload from disk even if already cached @@ -280,24 +212,18 @@ def load_models_config(force_reload: bool = False) -> Optional[AppModels]: Raises: ValidationError: If config.yml has invalid model definitions - yaml.YAMLError: If config.yml has invalid YAML syntax """ global _REGISTRY if _REGISTRY is not None and not force_reload: return _REGISTRY - # Try to get merged configuration (defaults + user config) + # Get merged configuration (defaults + user config) + # OmegaConf resolves environment variables automatically try: raw = get_config(force_reload=force_reload) except Exception as e: logging.error(f"Failed to load merged configuration: {e}") - # Fallback to direct config.yml loading - cfg_path = _find_config_path() - if not cfg_path.exists(): - return None - with cfg_path.open("r") as f: - raw = yaml.safe_load(f) or {} - raw = _deep_resolve_env(raw) + return None # Extract sections defaults = raw.get("defaults", {}) or {} diff --git a/backends/advanced/src/advanced_omi_backend/models/annotation.py b/backends/advanced/src/advanced_omi_backend/models/annotation.py new file mode 100644 index 00000000..3e3dbdb1 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/models/annotation.py @@ -0,0 +1,139 @@ +""" +Unified annotation system for Chronicle. + +Supports annotations for memories, transcripts, and future content types. +Enables both user edits and AI-powered suggestions. +""" + +from enum import Enum +from typing import Optional +from datetime import datetime, timezone +import uuid + +from beanie import Document, Indexed +from pydantic import BaseModel, Field + + +class AnnotationType(str, Enum): + """Type of content being annotated.""" + MEMORY = "memory" + TRANSCRIPT = "transcript" + + +class AnnotationSource(str, Enum): + """Origin of the annotation.""" + USER = "user" # User-created edit + MODEL_SUGGESTION = "model_suggestion" # AI-generated suggestion + + +class AnnotationStatus(str, Enum): + """Lifecycle status of annotation.""" + PENDING = "pending" # Waiting for user review (suggestions) + ACCEPTED = "accepted" # Applied to content + REJECTED = "rejected" # User dismissed suggestion + + +class Annotation(Document): + """ + Unified annotation model for all content types. + + Supports both user edits and AI-powered suggestions across + memories, transcripts, and future content types (chat, action items, etc.). + + Design: Polymorphic model with type-specific fields based on annotation_type. + """ + + # Identity + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + + # Classification + annotation_type: AnnotationType + user_id: Indexed(str) + source: AnnotationSource = Field(default=AnnotationSource.USER) + status: AnnotationStatus = Field(default=AnnotationStatus.ACCEPTED) + + # Content + original_text: str # Text before correction + corrected_text: str # Text after correction + + # Polymorphic References (based on annotation_type) + # For MEMORY annotations: + memory_id: Optional[str] = None + + # For TRANSCRIPT annotations: + conversation_id: Optional[str] = None + segment_index: Optional[int] = None + + # Timestamps (Python 3.12+ compatible) + created_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) + ) + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) + ) + + class Settings: + name = "annotations" + # Create indexes on commonly queried fields + # Note: Enum fields and Optional fields don't use Indexed() wrapper + indexes = [ + "annotation_type", # Query by type (memory vs transcript) + "user_id", # User-scoped queries + "status", # Filter by status (pending/accepted/rejected) + "memory_id", # Lookup annotations for specific memory + "conversation_id", # Lookup annotations for specific conversation + ] + + def is_memory_annotation(self) -> bool: + """Check if this is a memory annotation.""" + return self.annotation_type == AnnotationType.MEMORY + + def is_transcript_annotation(self) -> bool: + """Check if this is a transcript annotation.""" + return self.annotation_type == AnnotationType.TRANSCRIPT + + def is_pending_suggestion(self) -> bool: + """Check if this is a pending AI suggestion.""" + return ( + self.source == AnnotationSource.MODEL_SUGGESTION + and self.status == AnnotationStatus.PENDING + ) + + +# Pydantic Request/Response Models + + +class AnnotationCreateBase(BaseModel): + """Base model for annotation creation.""" + original_text: str + corrected_text: str + status: AnnotationStatus = AnnotationStatus.ACCEPTED + + +class MemoryAnnotationCreate(AnnotationCreateBase): + """Create memory annotation request.""" + memory_id: str + + +class TranscriptAnnotationCreate(AnnotationCreateBase): + """Create transcript annotation request.""" + conversation_id: str + segment_index: int + + +class AnnotationResponse(BaseModel): + """Annotation response for API.""" + id: str + annotation_type: AnnotationType + user_id: str + memory_id: Optional[str] = None + conversation_id: Optional[str] = None + segment_index: Optional[int] = None + original_text: str + corrected_text: str + status: AnnotationStatus + source: AnnotationSource + created_at: datetime + + class Config: + from_attributes = True # Pydantic v2 compatibility diff --git a/backends/advanced/src/advanced_omi_backend/plugins/base.py b/backends/advanced/src/advanced_omi_backend/plugins/base.py index e5dfcc36..dbd13301 100644 --- a/backends/advanced/src/advanced_omi_backend/plugins/base.py +++ b/backends/advanced/src/advanced_omi_backend/plugins/base.py @@ -54,12 +54,26 @@ def __init__(self, config: Dict[str, Any]): Args: config: Plugin configuration from config/plugins.yml - Contains: enabled, subscriptions, trigger, and plugin-specific config + Contains: enabled, events, condition, and plugin-specific config """ + import logging + logger = logging.getLogger(__name__) + self.config = config self.enabled = config.get('enabled', False) - self.subscriptions = config.get('subscriptions', []) - self.trigger = config.get('trigger', {'type': 'always'}) + + # NEW terminology with backward compatibility + self.events = config.get('events') or config.get('subscriptions', []) + self.condition = config.get('condition') or config.get('trigger', {'type': 'always'}) + + # Deprecation warnings + plugin_name = config.get('name', 'unknown') + if 'subscriptions' in config: + logger.warning(f"Plugin '{plugin_name}': 'subscriptions' is deprecated, use 'events' instead") + if 'trigger' in config: + logger.warning(f"Plugin '{plugin_name}': 'condition' is deprecated, use 'condition' instead") + if 'access_level' in config: + logger.warning(f"Plugin '{plugin_name}': 'access_level' is deprecated and ignored") @abstractmethod async def initialize(self): @@ -91,7 +105,7 @@ async def on_transcript(self, context: PluginContext) -> Optional[PluginResult]: - segment_id: str - Unique segment identifier - conversation_id: str - Current conversation ID - For wake_word triggers, router adds: + For wake_word conditions, router adds: - command: str - Command with wake word stripped - original_transcript: str - Full transcript diff --git a/backends/advanced/src/advanced_omi_backend/plugins/router.py b/backends/advanced/src/advanced_omi_backend/plugins/router.py index 21b82eb8..f046520c 100644 --- a/backends/advanced/src/advanced_omi_backend/plugins/router.py +++ b/backends/advanced/src/advanced_omi_backend/plugins/router.py @@ -88,20 +88,20 @@ class PluginRouter: def __init__(self): self.plugins: Dict[str, BasePlugin] = {} - # Index plugins by event subscription for fast lookup + # Index plugins by event for fast lookup self._plugins_by_event: Dict[str, List[str]] = {} def register_plugin(self, plugin_id: str, plugin: BasePlugin): """Register a plugin with the router""" self.plugins[plugin_id] = plugin - # Index by each event subscription - for event in plugin.subscriptions: + # Index by each event + for event in plugin.events: if event not in self._plugins_by_event: self._plugins_by_event[event] = [] self._plugins_by_event[event].append(plugin_id) - logger.info(f"Registered plugin '{plugin_id}' for events: {plugin.subscriptions}") + logger.info(f"Registered plugin '{plugin_id}' for events: {plugin.events}") async def dispatch_event( self, @@ -133,8 +133,8 @@ async def dispatch_event( if not plugin.enabled: continue - # Check trigger condition (wake_word, etc.) - if not await self._should_trigger(plugin, data): + # Check execution condition (wake_word, etc.) + if not await self._should_execute(plugin, data): continue # Execute plugin @@ -161,23 +161,23 @@ async def dispatch_event( return results - async def _should_trigger(self, plugin: BasePlugin, data: Dict) -> bool: - """Check if plugin should be triggered based on trigger configuration""" - trigger_type = plugin.trigger.get('type', 'always') + async def _should_execute(self, plugin: BasePlugin, data: Dict) -> bool: + """Check if plugin should be executed based on condition configuration""" + condition_type = plugin.condition.get('type', 'always') - if trigger_type == 'always': + if condition_type == 'always': return True - elif trigger_type == 'wake_word': + elif condition_type == 'wake_word': # Normalize transcript for matching (handles punctuation and spacing) transcript = data.get('transcript', '') normalized_transcript = normalize_text_for_wake_word(transcript) # Support both singular 'wake_word' and plural 'wake_words' (list) - wake_words = plugin.trigger.get('wake_words', []) + wake_words = plugin.condition.get('wake_words', []) if not wake_words: # Fallback to singular wake_word for backward compatibility - wake_word = plugin.trigger.get('wake_word', '') + wake_word = plugin.condition.get('wake_word', '') if wake_word: wake_words = [wake_word] @@ -194,7 +194,7 @@ async def _should_trigger(self, plugin: BasePlugin, data: Dict) -> bool: return False - elif trigger_type == 'conditional': + elif condition_type == 'conditional': # Future: Custom condition checking return True diff --git a/backends/advanced/src/advanced_omi_backend/routers/api_router.py b/backends/advanced/src/advanced_omi_backend/routers/api_router.py index 791bd5ca..e974ff99 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/api_router.py +++ b/backends/advanced/src/advanced_omi_backend/routers/api_router.py @@ -12,6 +12,7 @@ from .modules import ( admin_router, + annotation_router, audio_router, chat_router, client_router, @@ -32,6 +33,7 @@ # Include all sub-routers router.include_router(admin_router) +router.include_router(annotation_router) router.include_router(audio_router) router.include_router(user_router) router.include_router(chat_router) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py b/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py index 3c8e4ceb..17fe5c1a 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py @@ -7,6 +7,7 @@ - client_routes: Active client monitoring and management - conversation_routes: Conversation CRUD and audio processing - memory_routes: Memory management, search, and debug +- annotation_routes: Annotation CRUD for memories and transcripts - system_routes: System utilities and metrics - queue_routes: Job queue management and monitoring - audio_routes: Audio file uploads and processing @@ -16,6 +17,7 @@ """ from .admin_routes import router as admin_router +from .annotation_routes import router as annotation_router from .audio_routes import router as audio_router from .chat_routes import router as chat_router from .client_routes import router as client_router @@ -30,6 +32,7 @@ __all__ = [ "admin_router", + "annotation_router", "audio_router", "chat_router", "client_router", diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/admin_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/admin_routes.py index 6fbbfc56..a2ef4398 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/admin_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/admin_routes.py @@ -90,14 +90,14 @@ async def preview_cleanup( ): """Preview what would be deleted by cleanup (admin only).""" try: - from advanced_omi_backend.config import load_cleanup_settings_from_file + from advanced_omi_backend.config import get_cleanup_settings from advanced_omi_backend.models.conversation import Conversation from datetime import datetime, timedelta # Use provided retention or default from config if retention_days is None: - settings = load_cleanup_settings_from_file() - retention_days = settings.retention_days + settings_dict = get_cleanup_settings() + retention_days = settings_dict['retention_days'] cutoff_date = datetime.utcnow() - timedelta(days=retention_days) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py new file mode 100644 index 00000000..e1e99644 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py @@ -0,0 +1,318 @@ +""" +Annotation routes for Chronicle API. + +Handles annotation CRUD operations for memories and transcripts. +Supports both user edits and AI-powered suggestions. +""" + +import logging +from typing import List +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import JSONResponse + +from advanced_omi_backend.auth import current_active_user +from advanced_omi_backend.users import User +from advanced_omi_backend.models.annotation import ( + Annotation, + AnnotationType, + AnnotationStatus, + MemoryAnnotationCreate, + TranscriptAnnotationCreate, + AnnotationResponse, +) +from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.services.memory import get_memory_service + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/annotations", tags=["annotations"]) + + +@router.post("/memory", response_model=AnnotationResponse) +async def create_memory_annotation( + annotation_data: MemoryAnnotationCreate, + current_user: User = Depends(current_active_user), +): + """ + Create annotation for memory edit. + + - Validates user owns memory + - Creates annotation record + - Updates memory content in vector store + - Re-embeds if content changed + """ + try: + memory_service = get_memory_service() + + # Verify memory ownership + try: + memory = await memory_service.get_memory( + annotation_data.memory_id, current_user.user_id + ) + if not memory: + raise HTTPException(status_code=404, detail="Memory not found") + except Exception as e: + logger.error(f"Error fetching memory: {e}") + raise HTTPException(status_code=404, detail="Memory not found") + + # Create annotation + annotation = Annotation( + annotation_type=AnnotationType.MEMORY, + user_id=current_user.user_id, + memory_id=annotation_data.memory_id, + original_text=annotation_data.original_text, + corrected_text=annotation_data.corrected_text, + status=annotation_data.status, + ) + await annotation.save() + logger.info( + f"Created memory annotation {annotation.id} for memory {annotation_data.memory_id}" + ) + + # Update memory content if accepted + if annotation.status == AnnotationStatus.ACCEPTED: + try: + await memory_service.update_memory( + memory_id=annotation_data.memory_id, + content=annotation_data.corrected_text, + user_id=current_user.user_id, + ) + logger.info( + f"Updated memory {annotation_data.memory_id} with corrected text" + ) + except Exception as e: + logger.error(f"Error updating memory: {e}") + # Annotation is saved, but memory update failed - log but don't fail the request + logger.warning( + f"Memory annotation {annotation.id} saved but memory update failed" + ) + + return AnnotationResponse.model_validate(annotation) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error creating memory annotation: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to create memory annotation: {str(e)}", + ) + + +@router.post("/transcript", response_model=AnnotationResponse) +async def create_transcript_annotation( + annotation_data: TranscriptAnnotationCreate, + current_user: User = Depends(current_active_user), +): + """ + Create annotation for transcript segment edit. + + - Validates user owns conversation + - Creates annotation record + - Updates transcript segment + - Triggers memory reprocessing + """ + try: + # Verify conversation ownership + conversation = await Conversation.find_one( + Conversation.conversation_id == annotation_data.conversation_id, + Conversation.user_id == current_user.user_id, + ) + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found") + + # Validate segment index + active_transcript = conversation.get_active_transcript() + if ( + not active_transcript + or annotation_data.segment_index >= len(active_transcript.segments) + ): + raise HTTPException(status_code=400, detail="Invalid segment index") + + # Create annotation + annotation = Annotation( + annotation_type=AnnotationType.TRANSCRIPT, + user_id=current_user.user_id, + conversation_id=annotation_data.conversation_id, + segment_index=annotation_data.segment_index, + original_text=annotation_data.original_text, + corrected_text=annotation_data.corrected_text, + status=annotation_data.status, + ) + await annotation.save() + logger.info( + f"Created transcript annotation {annotation.id} for conversation {annotation_data.conversation_id} segment {annotation_data.segment_index}" + ) + + # Update transcript segment if accepted + if annotation.status == AnnotationStatus.ACCEPTED: + segment = active_transcript.segments[annotation_data.segment_index] + segment.text = annotation_data.corrected_text + await conversation.save() + logger.info( + f"Updated transcript segment {annotation_data.segment_index} in conversation {annotation_data.conversation_id}" + ) + + # Trigger memory reprocessing + try: + from advanced_omi_backend.workers.queue_manager import ( + enqueue_memory_processing, + ) + from advanced_omi_backend.models.job import JobPriority + + await enqueue_memory_processing( + client_id=conversation.client_id, + user_id=current_user.user_id, + user_email=current_user.email, + conversation_id=conversation.conversation_id, + priority=JobPriority.NORMAL, + ) + logger.info( + f"Enqueued memory reprocessing for conversation {annotation_data.conversation_id}" + ) + except Exception as e: + logger.error(f"Error enqueuing memory reprocessing: {e}") + # Annotation and segment update succeeded, but reprocessing failed + logger.warning( + f"Transcript annotation {annotation.id} saved but memory reprocessing failed" + ) + + return AnnotationResponse.model_validate(annotation) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error creating transcript annotation: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to create transcript annotation: {str(e)}", + ) + + +@router.get("/memory/{memory_id}", response_model=List[AnnotationResponse]) +async def get_memory_annotations( + memory_id: str, + current_user: User = Depends(current_active_user), +): + """Get all annotations for a memory.""" + try: + annotations = await Annotation.find( + Annotation.annotation_type == AnnotationType.MEMORY, + Annotation.memory_id == memory_id, + Annotation.user_id == current_user.user_id, + ).to_list() + + return [AnnotationResponse.model_validate(a) for a in annotations] + + except Exception as e: + logger.error(f"Error fetching memory annotations: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to fetch memory annotations: {str(e)}", + ) + + +@router.get("/transcript/{conversation_id}", response_model=List[AnnotationResponse]) +async def get_transcript_annotations( + conversation_id: str, + current_user: User = Depends(current_active_user), +): + """Get all annotations for a conversation's transcript.""" + try: + annotations = await Annotation.find( + Annotation.annotation_type == AnnotationType.TRANSCRIPT, + Annotation.conversation_id == conversation_id, + Annotation.user_id == current_user.user_id, + ).to_list() + + return [AnnotationResponse.model_validate(a) for a in annotations] + + except Exception as e: + logger.error(f"Error fetching transcript annotations: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to fetch transcript annotations: {str(e)}", + ) + + +@router.patch("/{annotation_id}/status") +async def update_annotation_status( + annotation_id: str, + status: AnnotationStatus, + current_user: User = Depends(current_active_user), +): + """ + Accept or reject AI-generated suggestions. + + Used for pending model suggestions in the UI. + """ + try: + annotation = await Annotation.find_one( + Annotation.id == annotation_id, + Annotation.user_id == current_user.user_id, + ) + if not annotation: + raise HTTPException(status_code=404, detail="Annotation not found") + + old_status = annotation.status + annotation.status = status + annotation.updated_at = datetime.now(timezone.utc) + + # If accepting a pending suggestion, apply the correction + if ( + status == AnnotationStatus.ACCEPTED + and old_status == AnnotationStatus.PENDING + ): + if annotation.is_memory_annotation(): + # Update memory + try: + memory_service = get_memory_service() + await memory_service.update_memory( + memory_id=annotation.memory_id, + content=annotation.corrected_text, + user_id=current_user.user_id, + ) + logger.info( + f"Applied suggestion to memory {annotation.memory_id}" + ) + except Exception as e: + logger.error(f"Error applying memory suggestion: {e}") + # Don't fail the status update if memory update fails + elif annotation.is_transcript_annotation(): + # Update transcript segment + try: + conversation = await Conversation.find_one( + Conversation.conversation_id == annotation.conversation_id + ) + if conversation: + transcript = conversation.get_active_transcript() + if ( + transcript + and annotation.segment_index < len(transcript.segments) + ): + transcript.segments[ + annotation.segment_index + ].text = annotation.corrected_text + await conversation.save() + logger.info( + f"Applied suggestion to transcript segment {annotation.segment_index}" + ) + except Exception as e: + logger.error(f"Error applying transcript suggestion: {e}") + # Don't fail the status update if segment update fails + + await annotation.save() + logger.info(f"Updated annotation {annotation_id} status to {status}") + + return {"status": "updated", "annotation_id": annotation_id, "new_status": status} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating annotation status: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to update annotation status: {str(e)}", + ) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py index 3f48fe1c..be2b3bc4 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py @@ -7,11 +7,12 @@ import logging from typing import Optional -from fastapi import APIRouter, Depends, Query +from fastapi import APIRouter, Depends, Query, HTTPException, Response from advanced_omi_backend.auth import current_active_user from advanced_omi_backend.controllers import conversation_controller, audio_controller from advanced_omi_backend.users import User +from advanced_omi_backend.models.conversation import Conversation logger = logging.getLogger(__name__) @@ -165,6 +166,118 @@ async def get_conversation_waveform( } +@router.get("/{conversation_id}/metadata") +async def get_conversation_metadata( + conversation_id: str, + current_user: User = Depends(current_active_user) +) -> dict: + """ + Get conversation metadata (duration, etc.) without loading audio. + + This endpoint provides lightweight access to conversation metadata, + useful for the speaker service to check duration before deciding + whether to chunk audio processing. + + Returns: + { + "conversation_id": str, + "duration": float, # Total duration in seconds + "created_at": datetime, + "has_audio": bool + } + """ + conversation = await Conversation.find_one( + Conversation.conversation_id == conversation_id + ) + + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found") + + # Check ownership (admins can access all) + if not current_user.is_superuser and conversation.user_id != str(current_user.id): + raise HTTPException(status_code=403, detail="Access denied") + + return { + "conversation_id": conversation_id, + "duration": conversation.audio_total_duration or 0.0, + "created_at": conversation.created_at, + "has_audio": (conversation.audio_total_duration or 0.0) > 0 + } + + +@router.get("/{conversation_id}/audio-segments") +async def get_audio_segment( + conversation_id: str, + start: float = Query(0.0, description="Start time in seconds"), + duration: Optional[float] = Query(None, description="Duration in seconds (omit for full audio)"), + current_user: User = Depends(current_active_user) +) -> Response: + """ + Get audio segment from a conversation. + + This endpoint enables the speaker service to fetch audio in time-bounded + segments without loading the entire file into memory. The speaker service + controls chunk size based on its own memory constraints. + + Args: + conversation_id: Conversation identifier + start: Start time in seconds (default: 0.0) + duration: Duration in seconds (if None, returns all audio from start) + + Returns: + WAV audio bytes (16kHz, mono) for the requested time range + """ + from advanced_omi_backend.utils.audio_chunk_utils import reconstruct_audio_segment + + # Verify conversation exists and user has access + conversation = await Conversation.find_one( + Conversation.conversation_id == conversation_id + ) + + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found") + + # Check ownership (admins can access all) + if not current_user.is_superuser and conversation.user_id != str(current_user.id): + raise HTTPException(status_code=403, detail="Access denied") + + # Calculate end time + total_duration = conversation.audio_total_duration or 0.0 + if total_duration == 0: + raise HTTPException(status_code=404, detail="No audio available for this conversation") + + if duration is None: + end = total_duration + else: + end = min(start + duration, total_duration) + + # Validate time range + if start < 0 or start >= total_duration: + raise HTTPException(status_code=400, detail=f"Invalid start time: {start}s (max: {total_duration}s)") + + # Get audio chunks for time range + try: + wav_bytes = await reconstruct_audio_segment( + conversation_id=conversation_id, + start_time=start, + end_time=end + ) + except Exception as e: + logger.error(f"Failed to reconstruct audio segment for {conversation_id[:12]}: {e}") + raise HTTPException(status_code=500, detail=f"Failed to reconstruct audio: {str(e)}") + + return Response( + content=wav_bytes, + media_type="audio/wav", + headers={ + "Content-Disposition": f"attachment; filename=segment_{start}_{end}.wav", + "X-Audio-Start": str(start), + "X-Audio-End": str(end), + "X-Audio-Duration": str(end - start) + } + ) + + @router.delete("/{conversation_id}") async def delete_conversation( conversation_id: str, diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py index 9d1d2378..d68843ae 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/system_routes.py @@ -30,6 +30,12 @@ class MemoryConfigRequest(BaseModel): config_yaml: str +@router.get("/config/diagnostics") +async def get_config_diagnostics(current_user: User = Depends(current_superuser)): + """Get configuration diagnostics including errors, warnings, and status. Admin only.""" + return await system_controller.get_config_diagnostics() + + @router.get("/metrics") async def get_current_metrics(current_user: User = Depends(current_superuser)): """Get current system metrics. Admin only.""" @@ -54,7 +60,7 @@ async def save_diarization_settings( current_user: User = Depends(current_superuser) ): """Save diarization settings. Admin only.""" - return await system_controller.save_diarization_settings(settings) + return await system_controller.save_diarization_settings_controller(settings) @router.get("/cleanup-settings") diff --git a/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py b/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py index fa73c526..7712ac2c 100644 --- a/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py +++ b/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py @@ -321,6 +321,77 @@ async def get_memory(self, memory_id: str, user_id: Optional[str] = None) -> Opt memory_logger.error(f"Get memory failed: {e}") return None + async def update_memory( + self, + memory_id: str, + content: Optional[str] = None, + metadata: Optional[dict[str, Any]] = None, + user_id: Optional[str] = None, + user_email: Optional[str] = None + ) -> bool: + """Update a specific memory's content and/or metadata. + + Regenerates embeddings when content is updated. + + Args: + memory_id: Unique identifier of the memory to update + content: New content for the memory (if None, content is not updated) + metadata: New metadata to merge with existing (if None, metadata is not updated) + user_id: Optional user ID for authentication + user_email: Optional user email for authentication + + Returns: + True if update succeeded, False otherwise + """ + if not self._initialized: + await self.initialize() + + try: + # Get existing memory + existing_memory = await self.vector_store.get_memory(memory_id, user_id) + if not existing_memory: + memory_logger.warning(f"Memory {memory_id} not found for update") + return False + + # Determine new content and metadata + new_content = content if content is not None else existing_memory.content + new_metadata = {**existing_memory.metadata} + if metadata: + new_metadata.update(metadata) + + # Update timestamps + new_metadata["updated_at"] = str(int(time.time())) + + # Generate new embedding if content changed + if content is not None: + new_embedding = await self.llm_provider.generate_embedding(new_content) + else: + # If content didn't change, reuse existing embedding + if existing_memory.embedding: + new_embedding = existing_memory.embedding + else: + # No existing embedding, generate one + new_embedding = await self.llm_provider.generate_embedding(new_content) + + # Update in vector store + success = await self.vector_store.update_memory( + memory_id=memory_id, + new_content=new_content, + new_embedding=new_embedding, + new_metadata=new_metadata + ) + + if success: + memory_logger.info(f"✅ Updated memory {memory_id}") + else: + memory_logger.error(f"Failed to update memory {memory_id}") + + return success + + except Exception as e: + memory_logger.error(f"Error updating memory {memory_id}: {e}", exc_info=True) + return False + async def delete_memory(self, memory_id: str, user_id: Optional[str] = None, user_email: Optional[str] = None) -> bool: """Delete a specific memory by ID. diff --git a/backends/advanced/src/advanced_omi_backend/services/plugin_service.py b/backends/advanced/src/advanced_omi_backend/services/plugin_service.py index 0dc693d6..b2078364 100644 --- a/backends/advanced/src/advanced_omi_backend/services/plugin_service.py +++ b/backends/advanced/src/advanced_omi_backend/services/plugin_service.py @@ -4,14 +4,19 @@ worker jobs to trigger plugins without accessing FastAPI app state directly. """ +import importlib +import importlib.util +import inspect import logging import os import re -from typing import Optional, Any from pathlib import Path +from typing import Any, Dict, Optional, Type + import yaml -from advanced_omi_backend.plugins import PluginRouter +from advanced_omi_backend.config_loader import get_plugins_yml_path +from advanced_omi_backend.plugins import BasePlugin, PluginRouter logger = logging.getLogger(__name__) @@ -111,7 +116,7 @@ def init_plugin_router() -> Optional[PluginRouter]: _plugin_router = PluginRouter() # Load plugin configuration - plugins_yml = Path("/app/plugins.yml") + plugins_yml = get_plugins_yml_path() logger.info(f"🔍 Looking for plugins config at: {plugins_yml}") logger.info(f"🔍 File exists: {plugins_yml.exists()}") @@ -132,13 +137,17 @@ def init_plugin_router() -> Optional[PluginRouter]: try: if plugin_id == 'homeassistant': - from advanced_omi_backend.plugins.homeassistant import HomeAssistantPlugin + from advanced_omi_backend.plugins.homeassistant import ( + HomeAssistantPlugin, + ) plugin = HomeAssistantPlugin(plugin_config) # Note: async initialization happens in app_factory lifespan _plugin_router.register_plugin(plugin_id, plugin) logger.info(f"✅ Plugin '{plugin_id}' registered") elif plugin_id == 'test_event': - from advanced_omi_backend.plugins.test_event import TestEventPlugin + from advanced_omi_backend.plugins.test_event import ( + TestEventPlugin, + ) plugin = TestEventPlugin(plugin_config) # Note: async initialization happens in app_factory lifespan _plugin_router.register_plugin(plugin_id, plugin) diff --git a/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py b/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py index fb146c08..c22e55cd 100644 --- a/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py +++ b/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py @@ -67,14 +67,21 @@ def __init__(self, service_url: Optional[str] = None): logger.info("Speaker recognition client disabled (no service URL configured)") async def diarize_identify_match( - self, audio_data: bytes, transcript_data: Dict, user_id: Optional[str] = None + self, + conversation_id: str, + backend_token: str, + transcript_data: Dict, + user_id: Optional[str] = None ) -> Dict: """ Perform diarization, speaker identification, and word-to-speaker matching. - Routes to appropriate endpoint based on diarization source configuration. + + Speaker service fetches audio from backend and handles chunking based on its + own memory constraints. Args: - audio_data: WAV audio data as bytes (in-memory) + conversation_id: Conversation ID for speaker service to fetch audio + backend_token: JWT token for speaker service to authenticate with backend transcript_data: Dict containing words array and text from transcription user_id: Optional user ID for speaker identification @@ -86,19 +93,18 @@ async def diarize_identify_match( return {} try: - logger.info(f"🎤 Identifying speakers from in-memory audio ({len(audio_data) / 1024 / 1024:.2f} MB)") + logger.info(f"🎤 Calling speaker service with conversation_id: {conversation_id[:12]}...") - # Read diarization source from existing config system - from advanced_omi_backend.config import load_diarization_settings_from_file - config = load_diarization_settings_from_file() + # Read diarization source from config system + from advanced_omi_backend.config import get_diarization_settings + config = get_diarization_settings() diarization_source = config.get("diarization_source", "pyannote") async with aiohttp.ClientSession() as session: - # Prepare the audio data for upload (no disk I/O!) + # Prepare form data with conversation_id + backend_token form_data = aiohttp.FormData() - form_data.add_field( - "file", audio_data, filename="audio.wav", content_type="audio/wav" - ) + form_data.add_field("conversation_id", conversation_id) + form_data.add_field("backend_token", backend_token) if diarization_source == "deepgram": # DEEPGRAM DIARIZATION PATH: We EXPECT transcript has speaker info from Deepgram @@ -213,9 +219,9 @@ async def diarize_and_identify( ) # Get current diarization settings from config - from advanced_omi_backend.config import load_diarization_settings_from_file + from advanced_omi_backend.config import get_diarization_settings - diarization_settings = load_diarization_settings_from_file() + diarization_settings = get_diarization_settings() # Add all diarization parameters for the diarize-and-identify endpoint min_duration = diarization_settings.get("min_duration", 0.5) diff --git a/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py b/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py index 2c5e06ed..fd3cda79 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py @@ -495,6 +495,108 @@ async def reconstruct_audio_segments( start_time += segment_duration +async def reconstruct_audio_segment( + conversation_id: str, + start_time: float, + end_time: float +) -> bytes: + """ + Reconstruct audio for a specific time range from MongoDB chunks. + + This function returns a single audio segment for the specified time range, + enabling on-demand access to conversation audio without loading the entire + file into memory. Used by the audio segment API endpoint. + + Args: + conversation_id: Conversation ID + start_time: Start time in seconds + end_time: End time in seconds + + Returns: + WAV audio bytes (16kHz mono or original format) + + Raises: + ValueError: If conversation not found or has no audio + Exception: If audio reconstruction fails + + Example: + >>> # Get first 60 seconds of audio + >>> wav_bytes = await reconstruct_audio_segment(conv_id, 0.0, 60.0) + >>> # Save to file + >>> with open("segment.wav", "wb") as f: + ... f.write(wav_bytes) + """ + from advanced_omi_backend.models.conversation import Conversation + + # Validate inputs + if start_time < 0: + raise ValueError(f"start_time must be >= 0, got {start_time}") + if end_time <= start_time: + raise ValueError(f"end_time ({end_time}) must be > start_time ({start_time})") + + # Get conversation metadata + conversation = await Conversation.get(conversation_id) + + if not conversation: + raise ValueError(f"Conversation {conversation_id} not found") + + total_duration = conversation.audio_total_duration or 0.0 + + if total_duration == 0: + raise ValueError(f"Conversation {conversation_id} has no audio") + + # Clamp end_time to conversation duration + end_time = min(end_time, total_duration) + + # Get audio format from first chunk + first_chunk = await AudioChunkDocument.find_one( + AudioChunkDocument.conversation_id == conversation_id + ) + + if not first_chunk: + raise ValueError(f"No audio chunks found for conversation {conversation_id}") + + sample_rate = first_chunk.sample_rate + channels = first_chunk.channels + + # Get chunks that overlap with this time range + chunks = await AudioChunkDocument.find( + AudioChunkDocument.conversation_id == conversation_id, + AudioChunkDocument.start_time < end_time, # Chunk starts before segment ends + AudioChunkDocument.end_time > start_time, # Chunk ends after segment starts + ).sort(+AudioChunkDocument.chunk_index).to_list() + + if not chunks: + logger.warning( + f"No chunks found for time range {start_time:.1f}s - {end_time:.1f}s " + f"in conversation {conversation_id[:8]}..." + ) + # Return silence for empty range + return await build_wav_from_pcm( + pcm_data=b"", + sample_rate=sample_rate, + channels=channels, + ) + + # Decode and concatenate chunks + pcm_data = await concatenate_chunks_to_pcm(chunks) + + # Build WAV file for this segment + wav_bytes = await build_wav_from_pcm( + pcm_data=pcm_data, + sample_rate=sample_rate, + channels=channels, + ) + + logger.info( + f"Reconstructed audio segment for {conversation_id[:8]}...: " + f"{start_time:.1f}s - {end_time:.1f}s " + f"({len(chunks)} chunks, {len(wav_bytes)} bytes)" + ) + + return wav_bytes + + def filter_transcript_by_time( transcript_data: dict, start_time: float, @@ -569,7 +671,7 @@ async def convert_audio_to_chunks( Number of chunks created Raises: - ValueError: If audio duration exceeds 30 minutes + ValueError: If audio duration exceeds 2 hours Example: >>> # Convert from memory without disk write @@ -590,12 +692,12 @@ async def convert_audio_to_chunks( # Calculate audio duration and validate maximum limit bytes_per_second = sample_rate * sample_width * channels total_duration_seconds = len(audio_data) / bytes_per_second - MAX_DURATION_SECONDS = 1800 # 30 minutes (180 chunks @ 10s each) + MAX_DURATION_SECONDS = 7200 # 2 hours (720 chunks @ 10s each) if total_duration_seconds > MAX_DURATION_SECONDS: raise ValueError( f"Audio duration ({total_duration_seconds:.1f}s) exceeds maximum allowed " - f"({MAX_DURATION_SECONDS}s / 30 minutes). Please split the file into smaller segments." + f"({MAX_DURATION_SECONDS}s / 2 hours). Please split the file into smaller segments." ) # Calculate chunk size in bytes @@ -719,7 +821,7 @@ async def convert_wav_to_chunks( Raises: FileNotFoundError: If WAV file doesn't exist - ValueError: If WAV file is invalid or exceeds 30 minutes + ValueError: If WAV file is invalid or exceeds 2 hours Example: >>> # Convert uploaded file to chunks @@ -756,12 +858,12 @@ async def convert_wav_to_chunks( # Calculate audio duration and validate maximum limit bytes_per_second = sample_rate * sample_width * channels total_duration_seconds = len(pcm_data) / bytes_per_second - MAX_DURATION_SECONDS = 1800 # 30 minutes (180 chunks @ 10s each) + MAX_DURATION_SECONDS = 7200 # 2 hours (720 chunks @ 10s each) if total_duration_seconds > MAX_DURATION_SECONDS: raise ValueError( f"Audio duration ({total_duration_seconds:.1f}s) exceeds maximum allowed " - f"({MAX_DURATION_SECONDS}s / 30 minutes). Please split the file into smaller segments." + f"({MAX_DURATION_SECONDS}s / 2 hours). Please split the file into smaller segments." ) # Calculate chunk size in bytes diff --git a/backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py new file mode 100644 index 00000000..87ed6cdc --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py @@ -0,0 +1,249 @@ +""" +Background jobs for annotation-based AI suggestions. + +These jobs run periodically via the cron scheduler to: +1. Surface potential errors in transcripts and memories for user review +2. Fine-tune error detection models using accepted/rejected annotations + +TODO: Implement actual LLM-based error detection and model training logic. +""" + +import logging +from datetime import datetime, timezone, timedelta +from typing import List + +from advanced_omi_backend.models.annotation import ( + Annotation, + AnnotationSource, + AnnotationStatus, + AnnotationType, +) +from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.models.user import User + +logger = logging.getLogger(__name__) + + +async def surface_error_suggestions(): + """ + Generate AI suggestions for potential transcript/memory errors. + Runs daily, creates PENDING annotations for user review. + + This is a PLACEHOLDER implementation. To fully implement: + 1. Query recent transcripts and memories (last N days) + 2. Use LLM to analyze content for potential errors: + - Hallucinations (made-up facts) + - Misheard words (audio transcription errors) + - Grammar/spelling issues + - Inconsistencies with other memories + 3. For each potential error: + - Create PENDING annotation with MODEL_SUGGESTION source + - Store original_text and suggested corrected_text + 4. Users can review suggestions in UI (accept/reject) + 5. Accepted suggestions improve future model accuracy + + TODO: Implement LLM-based error detection logic. + """ + logger.info("📝 Checking for annotation suggestions (placeholder)...") + + try: + # Get all users + users = await User.find_all().to_list() + logger.info(f" Found {len(users)} users to analyze") + + for user in users: + # TODO: Query recent conversations for this user (last 7 days) + # recent_conversations = await Conversation.find( + # Conversation.user_id == str(user.id), + # Conversation.created_at >= datetime.now(timezone.utc) - timedelta(days=7) + # ).to_list() + + # TODO: For each conversation, analyze transcripts + # for conversation in recent_conversations: + # active_transcript = conversation.get_active_transcript() + # if not active_transcript: + # continue + # + # # TODO: Use LLM to identify potential errors + # # suggestions = await llm_provider.analyze_transcript_for_errors( + # # segments=active_transcript.segments, + # # context=conversation.summary + # # ) + # + # # TODO: Create PENDING annotations for each suggestion + # # for suggestion in suggestions: + # # annotation = Annotation( + # # annotation_type=AnnotationType.TRANSCRIPT, + # # user_id=str(user.id), + # # conversation_id=conversation.conversation_id, + # # segment_index=suggestion.segment_index, + # # original_text=suggestion.original_text, + # # corrected_text=suggestion.suggested_text, + # # source=AnnotationSource.MODEL_SUGGESTION, + # # status=AnnotationStatus.PENDING + # # ) + # # await annotation.save() + + # TODO: Query recent memories for this user + # recent_memories = await memory_service.get_recent_memories( + # user_id=str(user.id), + # days=7 + # ) + + # TODO: Use LLM to identify potential errors in memories + # for memory in recent_memories: + # # TODO: Analyze memory content for hallucinations/errors + # # suggestions = await llm_provider.analyze_memory_for_errors( + # # content=memory.content, + # # metadata=memory.metadata + # # ) + # + # # TODO: Create PENDING annotations + # # ... + + # Placeholder logging + logger.debug(f" Analyzed user {user.email} (placeholder)") + + logger.info("✅ Suggestion check complete (placeholder implementation)") + logger.info( + " ℹ️ TODO: Implement LLM-based error detection to create actual suggestions" + ) + + except Exception as e: + logger.error(f"❌ Error in surface_error_suggestions: {e}", exc_info=True) + raise + + +async def finetune_hallucination_model(): + """ + Fine-tune error detection model using accepted/rejected annotations. + Runs weekly, improves suggestion accuracy over time. + + This is a PLACEHOLDER implementation. To fully implement: + 1. Fetch all accepted annotations (ground truth corrections) + - These show real errors that users confirmed + 2. Fetch all rejected annotations (false positives) + - These show suggestions users disagreed with + 3. Build training dataset: + - Positive examples: accepted annotations (real errors) + - Negative examples: rejected annotations (false alarms) + 4. Fine-tune LLM or update prompt engineering: + - Use accepted examples as few-shot learning + - Adjust model to reduce false positives + 5. Log metrics: + - Acceptance rate, rejection rate + - Most common error types + - Model accuracy improvement + + TODO: Implement model training logic. + """ + logger.info("🎓 Checking for model training opportunities (placeholder)...") + + try: + # Fetch annotation statistics + total_annotations = await Annotation.find().count() + accepted_count = await Annotation.find( + Annotation.status == AnnotationStatus.ACCEPTED, + Annotation.source == AnnotationSource.MODEL_SUGGESTION, + ).count() + rejected_count = await Annotation.find( + Annotation.status == AnnotationStatus.REJECTED, + Annotation.source == AnnotationSource.MODEL_SUGGESTION, + ).count() + + logger.info(f" Total annotations: {total_annotations}") + logger.info(f" Accepted suggestions: {accepted_count}") + logger.info(f" Rejected suggestions: {rejected_count}") + + if accepted_count + rejected_count == 0: + logger.info(" ℹ️ No user feedback yet, skipping training") + return + + # TODO: Fetch accepted annotations (ground truth) + # accepted_annotations = await Annotation.find( + # Annotation.status == AnnotationStatus.ACCEPTED, + # Annotation.source == AnnotationSource.MODEL_SUGGESTION + # ).to_list() + + # TODO: Fetch rejected annotations (false positives) + # rejected_annotations = await Annotation.find( + # Annotation.status == AnnotationStatus.REJECTED, + # Annotation.source == AnnotationSource.MODEL_SUGGESTION + # ).to_list() + + # TODO: Build training dataset + # training_data = [] + # for annotation in accepted_annotations: + # training_data.append({ + # "input": annotation.original_text, + # "output": annotation.corrected_text, + # "label": "error" + # }) + # + # for annotation in rejected_annotations: + # training_data.append({ + # "input": annotation.original_text, + # "output": annotation.original_text, # No change needed + # "label": "correct" + # }) + + # TODO: Fine-tune model or update prompt examples + # if len(training_data) >= MIN_TRAINING_SAMPLES: + # await llm_provider.fine_tune_error_detection( + # training_data=training_data, + # validation_split=0.2 + # ) + # logger.info("✅ Model fine-tuning complete") + # else: + # logger.info(f" ℹ️ Not enough samples for training (need {MIN_TRAINING_SAMPLES})") + + # Calculate acceptance rate + if accepted_count + rejected_count > 0: + acceptance_rate = ( + accepted_count / (accepted_count + rejected_count) + ) * 100 + logger.info(f" Suggestion acceptance rate: {acceptance_rate:.1f}%") + + logger.info("✅ Training check complete (placeholder implementation)") + logger.info( + " ℹ️ TODO: Implement model fine-tuning using user feedback data" + ) + + except Exception as e: + logger.error(f"❌ Error in finetune_hallucination_model: {e}", exc_info=True) + raise + + +# Additional helper functions for future implementation + +async def analyze_common_error_patterns() -> List[dict]: + """ + Analyze accepted annotations to identify common error patterns. + Returns list of patterns for prompt engineering or model training. + + TODO: Implement pattern analysis. + """ + # TODO: Group annotations by error type + # TODO: Find frequent patterns (e.g., "their" → "there") + # TODO: Return structured patterns for model improvement + return [] + + +async def calculate_suggestion_metrics() -> dict: + """ + Calculate metrics about suggestion quality and user engagement. + + Returns: + dict: Metrics including acceptance rate, response time, etc. + + TODO: Implement metrics calculation. + """ + # TODO: Calculate acceptance/rejection rates + # TODO: Measure time to user response + # TODO: Identify high-confidence vs low-confidence suggestions + # TODO: Track improvement over time + return { + "total_suggestions": 0, + "acceptance_rate": 0.0, + "avg_response_time_hours": 0.0, + } diff --git a/backends/advanced/src/advanced_omi_backend/workers/cleanup_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/cleanup_jobs.py index e470550d..de3b52db 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/cleanup_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/cleanup_jobs.py @@ -12,7 +12,7 @@ from advanced_omi_backend.models.audio_chunk import AudioChunkDocument from advanced_omi_backend.models.waveform import WaveformData from advanced_omi_backend.models.job import async_job -from advanced_omi_backend.config import load_cleanup_settings_from_file +from advanced_omi_backend.config import get_cleanup_settings, CleanupSettings logger = logging.getLogger(__name__) @@ -34,8 +34,8 @@ async def purge_old_deleted_conversations( """ # Get retention period from config if not specified if retention_days is None: - settings = load_cleanup_settings_from_file() - retention_days = settings.retention_days + settings_dict = get_cleanup_settings() + retention_days = settings_dict['retention_days'] cutoff_date = datetime.utcnow() - timedelta(days=retention_days) @@ -128,8 +128,8 @@ def schedule_cleanup_job(retention_days: Optional[int] = None) -> Optional[str]: Job ID if scheduled successfully, None otherwise """ # Check if auto-cleanup is enabled - settings = load_cleanup_settings_from_file() - if not settings.auto_cleanup_enabled: + settings_dict = get_cleanup_settings() + if not settings_dict['auto_cleanup_enabled']: logger.info("Auto-cleanup is disabled (auto_cleanup_enabled=false)") return None diff --git a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py index 086b3ae1..cabfc66f 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py @@ -7,96 +7,20 @@ import asyncio import logging import time -from typing import Dict, Any +from typing import Any, Dict +from advanced_omi_backend.auth import generate_jwt_for_user +from advanced_omi_backend.models.conversation import Conversation from advanced_omi_backend.models.job import async_job -from advanced_omi_backend.controllers.queue_controller import transcription_queue +from advanced_omi_backend.services.audio_stream import ( + TranscriptionResultsAggregator, +) +from advanced_omi_backend.speaker_recognition_client import SpeakerRecognitionClient +from advanced_omi_backend.users import get_user_by_id logger = logging.getLogger(__name__) -def _merge_overlapping_speaker_segments( - segments: list[dict], - overlap: float -) -> list[dict]: - """ - Merge speaker segments from overlapping audio chunks. - - This function handles segments that may overlap due to chunked processing, - merging segments from the same speaker and resolving conflicts using confidence scores. - - Args: - segments: List of speaker segment dicts with start, end, text, speaker, confidence - overlap: Overlap duration in seconds used during chunking - - Returns: - Merged list of speaker segments - - Example: - >>> segments = [ - ... {"start": 0, "end": 930, "speaker": "Alice", "text": "...", "confidence": 0.9}, - ... {"start": 900, "end": 1830, "speaker": "Alice", "text": "...", "confidence": 0.8}, - ... ] - >>> merged = _merge_overlapping_speaker_segments(segments, overlap=30.0) - >>> # Returns single merged segment from Alice - """ - if not segments: - return [] - - # Sort by start time - segments.sort(key=lambda s: s.get("start", 0)) - - merged = [] - current = segments[0].copy() # Copy to avoid mutating input - - for next_seg in segments[1:]: - # Check if segments overlap - if next_seg["start"] < current["end"]: - # Overlapping - decide how to merge - if current.get("speaker") == next_seg.get("speaker"): - # Same speaker - merge by extending end time - current["end"] = max(current["end"], next_seg["end"]) - - # Combine text (avoid duplication in overlap region) - current_text = current.get("text", "") - next_text = next_seg.get("text", "") - - # Simple text merging - just append if different - if next_text and next_text not in current_text: - current["text"] = f"{current_text} {next_text}".strip() - - # Use higher confidence - current["confidence"] = max( - current.get("confidence", 0), - next_seg.get("confidence", 0) - ) - else: - # Different speakers - use confidence to decide boundary - current_conf = current.get("confidence", 0) - next_conf = next_seg.get("confidence", 0) - - if next_conf > current_conf: - # Next segment more confident, close current and start new - merged.append(current) - current = next_seg.copy() - else: - # Current more confident, adjust next segment start - # Save current, update next to start after current - merged.append(current) - next_seg_copy = next_seg.copy() - next_seg_copy["start"] = current["end"] - current = next_seg_copy - else: - # No overlap, save current and move to next - merged.append(current) - current = next_seg.copy() - - # Don't forget last segment - merged.append(current) - - return merged - - @async_job(redis=True, beanie=True) async def check_enrolled_speakers_job( session_id: str, @@ -119,8 +43,6 @@ async def check_enrolled_speakers_job( Returns: Dict with enrolled_present, identified_speakers, and speaker_result """ - from advanced_omi_backend.services.audio_stream import TranscriptionResultsAggregator - from advanced_omi_backend.speaker_recognition_client import SpeakerRecognitionClient logger.info(f"🎤 Starting enrolled speaker check for session {session_id[:12]}") @@ -227,8 +149,6 @@ async def recognise_speakers_job( Returns: Dict with processing results """ - from advanced_omi_backend.models.conversation import Conversation - from advanced_omi_backend.speaker_recognition_client import SpeakerRecognitionClient logger.info(f"🎤 RQ: Starting speaker recognition for conversation {conversation_id}") @@ -266,14 +186,6 @@ async def recognise_speakers_job( "processing_time_seconds": 0 } - # Reconstruct audio from MongoDB chunks - from advanced_omi_backend.utils.audio_chunk_utils import ( - reconstruct_wav_from_conversation, - reconstruct_audio_segments, - filter_transcript_by_time - ) - import os - # Read transcript text and words from the transcript version # (Parameters may be empty if called via job dependency) actual_transcript_text = transcript_text or transcript_version.transcript or "" @@ -298,122 +210,51 @@ async def recognise_speakers_job( "words": actual_words } - # Check if we need to use chunked processing - total_duration = conversation.audio_total_duration or 0.0 - chunk_threshold = float(os.getenv("SPEAKER_CHUNK_THRESHOLD", "1500")) # 25 minutes default + # Generate backend token for speaker service to fetch audio + # Speaker service will check conversation duration and decide + # whether to chunk based on its own memory constraints - logger.info(f"📦 Reconstructing audio from MongoDB chunks for conversation {conversation_id}") - logger.info(f"📊 Total duration: {total_duration:.1f}s, Threshold: {chunk_threshold:.1f}s") - - # Call speaker recognition service + # Get user details for token generation try: - speaker_segments = [] - - if total_duration > chunk_threshold: - # Chunked processing for large files - logger.info(f"🎤 Using chunked processing for large file ({total_duration:.1f}s > {chunk_threshold:.1f}s)") - - segment_duration = float(os.getenv("SPEAKER_CHUNK_SIZE", "900")) # 15 minutes default - overlap = float(os.getenv("SPEAKER_CHUNK_OVERLAP", "30")) # 30 seconds default - - async for wav_data, start_time, end_time in reconstruct_audio_segments( - conversation_id=conversation_id, - segment_duration=segment_duration, - overlap=overlap - ): - logger.info( - f"📦 Processing segment {start_time:.1f}s - {end_time:.1f}s: " - f"{len(wav_data) / 1024 / 1024:.2f} MB" - ) - - # Filter transcript for this time range - segment_transcript = filter_transcript_by_time( - transcript_data, - start_time, - end_time - ) - - # Call speaker service for this segment - speaker_result = await speaker_client.diarize_identify_match( - audio_data=wav_data, - transcript_data=segment_transcript, - user_id=user_id - ) - - # Check for errors from speaker service - if speaker_result.get("error"): - error_type = speaker_result.get("error") - error_message = speaker_result.get("message", "Unknown error") - logger.error(f"🎤 Speaker service error on segment {start_time:.1f}s: {error_type}") - - # Raise exception for connection failures - if error_type in ("connection_failed", "timeout", "client_error"): - raise RuntimeError(f"Speaker recognition service unavailable: {error_type} - {error_message}") - - # For processing errors, continue with other segments - continue - - # Adjust timestamps to global time - if speaker_result and "segments" in speaker_result: - for seg in speaker_result["segments"]: - seg["start"] += start_time - seg["end"] += start_time - - speaker_segments.extend(speaker_result["segments"]) - - logger.info(f"🎤 Collected {len(speaker_segments)} segments from chunked processing") - - # Merge overlapping segments - if speaker_segments: - speaker_segments = _merge_overlapping_speaker_segments(speaker_segments, overlap) - logger.info(f"🎤 After merging overlaps: {len(speaker_segments)} segments") - - # Package as result dict for consistent handling below - speaker_result = {"segments": speaker_segments} - - else: - # Normal processing for files <= threshold - logger.info(f"🎤 Using normal processing for small file ({total_duration:.1f}s <= {chunk_threshold:.1f}s)") - - # Reconstruct WAV from MongoDB chunks (already in memory as bytes) - wav_data = await reconstruct_wav_from_conversation(conversation_id) - - logger.info( - f"📦 Reconstructed audio from MongoDB chunks: " - f"{len(wav_data) / 1024 / 1024:.2f} MB" - ) - - logger.info(f"🎤 Calling speaker recognition service...") + user = await get_user_by_id(user_id) + if not user: + logger.error(f"User {user_id} not found for token generation") + return { + "success": False, + "conversation_id": conversation_id, + "version_id": version_id, + "error": "User not found", + "processing_time_seconds": time.time() - start_time + } - # Call speaker service with in-memory audio data (no temp file needed!) - speaker_result = await speaker_client.diarize_identify_match( - audio_data=wav_data, # Pass bytes directly, no disk I/O - transcript_data=transcript_data, - user_id=user_id - ) + backend_token = generate_jwt_for_user(user_id, user.email) + logger.info(f"🔐 Generated backend token for speaker service") - except ValueError as e: - # No chunks found for conversation - logger.error(f"No audio chunks found for conversation {conversation_id}: {e}") + except Exception as token_error: + logger.error(f"Failed to generate backend token: {token_error}", exc_info=True) return { "success": False, "conversation_id": conversation_id, "version_id": version_id, - "error": f"No audio chunks found: {e}", - "processing_time_seconds": time.time() - start_time - } - except Exception as audio_error: - logger.error(f"Failed to reconstruct audio from MongoDB: {audio_error}", exc_info=True) - return { - "success": False, - "conversation_id": conversation_id, - "version_id": version_id, - "error": f"Audio reconstruction failed: {audio_error}", + "error": f"Token generation failed: {token_error}", "processing_time_seconds": time.time() - start_time } - # Continue with speaker recognition result processing + # Call speaker recognition service with conversation_id + # Speaker service will: + # 1. Fetch conversation metadata to check duration + # 2. Decide whether to chunk based on its MAX_DIARIZE_DURATION setting + # 3. Request audio segments via backend API as needed + # 4. Return merged speaker segments + logger.info(f"🎤 Calling speaker recognition service with conversation_id...") + try: + speaker_result = await speaker_client.diarize_identify_match( + conversation_id=conversation_id, + backend_token=backend_token, + transcript_data=transcript_data, + user_id=user_id + ) # Check for errors from speaker service if speaker_result.get("error"): diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index bedde106..aa8fc030 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -1,6 +1,6 @@ import { useState, useEffect, useRef, useCallback } from 'react' -import { MessageSquare, RefreshCw, Calendar, User, Play, Pause, MoreVertical, RotateCcw, Zap, ChevronDown, ChevronUp, Trash2 } from 'lucide-react' -import { conversationsApi, BACKEND_URL } from '../services/api' +import { MessageSquare, RefreshCw, Calendar, User, Play, Pause, MoreVertical, RotateCcw, Zap, ChevronDown, ChevronUp, Trash2, Save, X } from 'lucide-react' +import { conversationsApi, annotationsApi, BACKEND_URL } from '../services/api' import ConversationVersionHeader from '../components/ConversationVersionHeader' import { getStorageKey } from '../utils/storage' import { WaveformDisplay } from '../components/audio/WaveformDisplay' @@ -73,6 +73,12 @@ export default function Conversations() { const [reprocessingMemory, setReprocessingMemory] = useState>(new Set()) const [deletingConversation, setDeletingConversation] = useState>(new Set()) + // Transcript segment editing state + const [editingSegment, setEditingSegment] = useState(null) // Format: "conversationId-segmentIndex" + const [editedSegmentText, setEditedSegmentText] = useState('') + const [savingSegment, setSavingSegment] = useState(false) + const [segmentEditError, setSegmentEditError] = useState(null) + // Stable seek handler for waveform click-to-seek const handleSeek = useCallback((conversationId: string, time: number) => { console.log(`🎯 handleSeek called: conversationId=${conversationId}, time=${time.toFixed(2)}s`); @@ -270,6 +276,78 @@ export default function Conversations() { } } + // Transcript segment editing handlers + const handleStartSegmentEdit = (conversationId: string, segmentIndex: number, originalText: string) => { + const segmentKey = `${conversationId}-${segmentIndex}` + setEditingSegment(segmentKey) + setEditedSegmentText(originalText) + setSegmentEditError(null) + } + + const handleSaveSegmentEdit = async (conversationId: string, segmentIndex: number, originalText: string) => { + if (!editedSegmentText.trim()) { + setSegmentEditError('Segment text cannot be empty') + return + } + + if (editedSegmentText === originalText) { + // No changes, just cancel + handleCancelSegmentEdit() + return + } + + try { + setSavingSegment(true) + setSegmentEditError(null) + + // Create transcript annotation + await annotationsApi.createTranscriptAnnotation({ + conversation_id: conversationId, + segment_index: segmentIndex, + original_text: originalText, + corrected_text: editedSegmentText + }) + + // Update local state - find the conversation and update the segment + setConversations(prev => prev.map(conv => { + if (conv.conversation_id === conversationId && conv.segments) { + const updatedSegments = [...conv.segments] + updatedSegments[segmentIndex] = { + ...updatedSegments[segmentIndex], + text: editedSegmentText + } + return { ...conv, segments: updatedSegments } + } + return conv + })) + + // Clear editing state + setEditingSegment(null) + setEditedSegmentText('') + } catch (err: any) { + console.error('Error saving segment edit:', err) + setSegmentEditError(err.response?.data?.detail || err.message || 'Failed to save segment edit') + } finally { + setSavingSegment(false) + } + } + + const handleCancelSegmentEdit = () => { + setEditingSegment(null) + setEditedSegmentText('') + setSegmentEditError(null) + } + + const handleSegmentKeyDown = (e: React.KeyboardEvent, conversationId: string, segmentIndex: number, originalText: string) => { + if (e.key === 'Enter' && (e.ctrlKey || e.metaKey)) { + e.preventDefault() + handleSaveSegmentEdit(conversationId, segmentIndex, originalText) + } else if (e.key === 'Escape') { + e.preventDefault() + handleCancelSegmentEdit() + } + } + const toggleDetailedSummary = async (conversationId: string) => { // If already expanded, just collapse if (expandedDetailedSummaries.has(conversationId)) { @@ -776,16 +854,17 @@ export default function Conversations() { const segmentId = `${conversationKey}-${index}` const isPlaying = playingSegment === segmentId const hasAudio = !!conversation.audio_path + const isEditing = editingSegment === segmentId return (
{/* Play/Pause Button */} - {hasAudio && ( + {hasAudio && !isEditing && (