From f76a47d15c15fab45b5efb30f95883285e418f2b Mon Sep 17 00:00:00 2001 From: Ankush Malaker <43288948+AnkushMalaker@users.noreply.github.com> Date: Wed, 14 Jan 2026 14:14:24 +0000 Subject: [PATCH 1/9] Implement annotation system and enhance audio processing capabilities - Introduced a new annotation model to support user edits and AI-powered suggestions for memories and transcripts. - Added annotation routes for CRUD operations, enabling the creation and management of annotations via the API. - Enhanced the audio processing workflow to support fetching audio segments from the backend, improving speaker recognition accuracy. - Updated the speaker recognition client to handle conversation-based audio fetching, allowing for better management of large audio files. - Implemented a cron job for generating AI suggestions on potential errors in transcripts and memories, improving user experience and content accuracy. - Enhanced the web UI to support inline editing of transcript segments and memory content, providing a more interactive user experience. - Updated configuration files to support new features and improve overall system flexibility. --- backends/advanced/.env.template | 7 - backends/advanced/docker-compose.yml | 27 ++ .../src/advanced_omi_backend/app_factory.py | 3 +- .../controllers/audio_controller.py | 1 + .../advanced/src/advanced_omi_backend/cron.py | 121 +++++++ .../advanced_omi_backend/models/annotation.py | 139 ++++++++ .../routers/api_router.py | 2 + .../routers/modules/__init__.py | 3 + .../routers/modules/annotation_routes.py | 318 ++++++++++++++++++ .../routers/modules/conversation_routes.py | 115 ++++++- .../services/memory/providers/chronicle.py | 71 ++++ .../speaker_recognition_client.py | 22 +- .../utils/audio_chunk_utils.py | 114 ++++++- .../workers/annotation_jobs.py | 249 ++++++++++++++ .../workers/speaker_jobs.py | 229 ++----------- .../webui/src/pages/Conversations.tsx | 133 +++++++- .../advanced/webui/src/pages/MemoryDetail.tsx | 135 +++++++- backends/advanced/webui/src/services/api.ts | 30 ++ config/config.yml.template | 4 + config/defaults.yml | 9 + extras/speaker-recognition/.env.template | 9 + extras/speaker-recognition/docker-compose.yml | 1 + extras/speaker-recognition/pyproject.toml | 2 + .../api/routers/identification.py | 113 ++++++- .../simple_speaker_recognition/api/service.py | 58 ++++ .../core/audio_backend.py | 116 ++++++- .../core/backend_client.py | 103 ++++++ 27 files changed, 1879 insertions(+), 255 deletions(-) create mode 100644 backends/advanced/src/advanced_omi_backend/cron.py create mode 100644 backends/advanced/src/advanced_omi_backend/models/annotation.py create mode 100644 backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py create mode 100644 backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py create mode 100644 extras/speaker-recognition/src/simple_speaker_recognition/core/backend_client.py diff --git a/backends/advanced/.env.template b/backends/advanced/.env.template index 752f140c..9c11af67 100644 --- a/backends/advanced/.env.template +++ b/backends/advanced/.env.template @@ -165,13 +165,6 @@ DEBUG_DIR=./data/debug_dir # HF_TOKEN= # SPEAKER_SERVICE_URL=http://speaker-recognition:8001 -# Speaker recognition chunking configuration (for large files) -# Files longer than SPEAKER_CHUNK_THRESHOLD will be split into smaller segments -# for processing to avoid memory issues -SPEAKER_CHUNK_THRESHOLD=1500 # 25 minutes - chunk files larger than this (seconds) -SPEAKER_CHUNK_SIZE=900 # 15 minutes - size of each chunk (seconds) -SPEAKER_CHUNK_OVERLAP=30 # 30 seconds - overlap between chunks for continuity - # Audio processing settings # NEW_CONVERSATION_TIMEOUT_MINUTES=1.5 # AUDIO_CROPPING_ENABLED=true diff --git a/backends/advanced/docker-compose.yml b/backends/advanced/docker-compose.yml index ed9e8356..c5d718a3 100644 --- a/backends/advanced/docker-compose.yml +++ b/backends/advanced/docker-compose.yml @@ -115,6 +115,33 @@ services: condition: service_started restart: unless-stopped + # Annotation Cron Scheduler + # Runs periodic jobs for AI-powered annotation suggestions: + # - Daily: Surface potential errors in transcripts/memories + # - Weekly: Fine-tune error detection models using user feedback + # Set DEV_MODE=true in .env for 1-minute intervals (testing) + annotation-cron: + build: + context: . + dockerfile: Dockerfile + target: prod + command: ["uv", "run", "python", "-m", "advanced_omi_backend.cron"] + container_name: chronicle-annotation-cron + env_file: + - .env + environment: + - MONGODB_URI=mongodb://mongo:27017 + - DEV_MODE=${DEV_MODE:-false} + - OPENAI_API_KEY=${OPENAI_API_KEY} + - LLM_PROVIDER=${LLM_PROVIDER:-openai} + - OLLAMA_BASE_URL=${OLLAMA_BASE_URL} + depends_on: + mongo: + condition: service_healthy + restart: unless-stopped + profiles: + - annotation # Optional profile - enable with: docker compose --profile annotation up + webui: build: context: ./webui diff --git a/backends/advanced/src/advanced_omi_backend/app_factory.py b/backends/advanced/src/advanced_omi_backend/app_factory.py index cf44ba34..79f893c0 100644 --- a/backends/advanced/src/advanced_omi_backend/app_factory.py +++ b/backends/advanced/src/advanced_omi_backend/app_factory.py @@ -57,10 +57,11 @@ async def lifespan(app: FastAPI): from advanced_omi_backend.models.audio_chunk import AudioChunkDocument from advanced_omi_backend.models.user import User from advanced_omi_backend.models.waveform import WaveformData + from advanced_omi_backend.models.annotation import Annotation await init_beanie( database=config.db, - document_models=[User, Conversation, AudioChunkDocument, WaveformData], + document_models=[User, Conversation, AudioChunkDocument, WaveformData, Annotation], ) application_logger.info("Beanie initialized for all document models") except Exception as e: diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 3a2a8af8..29d303b6 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -200,6 +200,7 @@ async def upload_and_process_audio_files( conversation_id=conversation_id, audio_uuid=audio_uuid, user_id=user.user_id, + transcript_version_id=version_id, # Pass the version_id from transcription job depends_on_job=transcription_job, # Wait for transcription to complete client_id=client_id # Pass client_id for UI tracking ) diff --git a/backends/advanced/src/advanced_omi_backend/cron.py b/backends/advanced/src/advanced_omi_backend/cron.py new file mode 100644 index 00000000..161ceb31 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/cron.py @@ -0,0 +1,121 @@ +""" +Annotation cron scheduler for AI-powered suggestion surfacing. + +This scheduler runs background jobs to: +1. Surface AI suggestions for potential transcript/memory errors (daily) +2. Fine-tune error detection models using user feedback (weekly) + +Configuration via environment variables: +- MONGODB_URI: MongoDB connection string +- DEV_MODE: When true, uses 1-minute intervals for testing + +Usage: + uv run python -m advanced_omi_backend.cron +""" + +import asyncio +import logging +import os +from datetime import datetime, timezone + +from beanie import init_beanie +from motor.motor_asyncio import AsyncIOMotorClient + +from advanced_omi_backend.models.annotation import Annotation +from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.models.user import User +from advanced_omi_backend.workers.annotation_jobs import ( + finetune_hallucination_model, + surface_error_suggestions, +) + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +# Configuration +MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://mongo:27017") +DEV_MODE = os.getenv("DEV_MODE", "false").lower() == "true" + +# Intervals (1 minute in dev, normal in production) +if DEV_MODE: + SUGGESTION_INTERVAL = 60 # 1 minute for dev testing + TRAINING_INTERVAL = 60 # 1 minute for dev testing + logger.info("๐Ÿ”ง DEV_MODE enabled - using 1-minute intervals for testing") +else: + SUGGESTION_INTERVAL = 24 * 60 * 60 # Daily + TRAINING_INTERVAL = 7 * 24 * 60 * 60 # Weekly + logger.info("๐Ÿ“… Production mode - using daily/weekly intervals") + + +async def init_db(): + """Initialize database connection""" + try: + client = AsyncIOMotorClient(MONGODB_URI) + await init_beanie( + database=client.chronicle, + document_models=[Annotation, Conversation, User], + ) + logger.info("โœ… Database connection initialized") + except Exception as e: + logger.error(f"โŒ Failed to initialize database: {e}") + raise + + +async def run_scheduler(): + """Main scheduler loop""" + await init_db() + logger.info("๐Ÿ• Annotation cron scheduler started") + logger.info(f" - Suggestion interval: {SUGGESTION_INTERVAL}s") + logger.info(f" - Training interval: {TRAINING_INTERVAL}s") + + last_suggestion_run = datetime.now(timezone.utc) + last_training_run = datetime.now(timezone.utc) + + while True: + try: + now = datetime.now(timezone.utc) + + # Daily: Surface AI suggestions + if (now - last_suggestion_run).total_seconds() >= SUGGESTION_INTERVAL: + logger.info(f"๐Ÿค– Running suggestion surfacing at {now}") + try: + await surface_error_suggestions() + last_suggestion_run = now + logger.info("โœ… Suggestion surfacing completed") + except Exception as e: + logger.error(f"โŒ Suggestion job failed: {e}", exc_info=True) + + # Weekly: Fine-tune model + if (now - last_training_run).total_seconds() >= TRAINING_INTERVAL: + logger.info(f"๐ŸŽ“ Running model fine-tuning at {now}") + try: + await finetune_hallucination_model() + last_training_run = now + logger.info("โœ… Model fine-tuning completed") + except Exception as e: + logger.error(f"โŒ Training job failed: {e}", exc_info=True) + + # Sleep for check interval + await asyncio.sleep(60) # Check every minute + + except KeyboardInterrupt: + logger.info("โ›” Scheduler stopped by user") + break + except Exception as e: + logger.error(f"โŒ Unexpected error in scheduler loop: {e}", exc_info=True) + # Continue running despite errors + await asyncio.sleep(60) + + +if __name__ == "__main__": + logger.info("๐Ÿš€ Starting annotation cron scheduler...") + try: + asyncio.run(run_scheduler()) + except KeyboardInterrupt: + logger.info("๐Ÿ‘‹ Annotation cron scheduler stopped") + except Exception as e: + logger.error(f"๐Ÿ’ฅ Fatal error: {e}", exc_info=True) + exit(1) diff --git a/backends/advanced/src/advanced_omi_backend/models/annotation.py b/backends/advanced/src/advanced_omi_backend/models/annotation.py new file mode 100644 index 00000000..3e3dbdb1 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/models/annotation.py @@ -0,0 +1,139 @@ +""" +Unified annotation system for Chronicle. + +Supports annotations for memories, transcripts, and future content types. +Enables both user edits and AI-powered suggestions. +""" + +from enum import Enum +from typing import Optional +from datetime import datetime, timezone +import uuid + +from beanie import Document, Indexed +from pydantic import BaseModel, Field + + +class AnnotationType(str, Enum): + """Type of content being annotated.""" + MEMORY = "memory" + TRANSCRIPT = "transcript" + + +class AnnotationSource(str, Enum): + """Origin of the annotation.""" + USER = "user" # User-created edit + MODEL_SUGGESTION = "model_suggestion" # AI-generated suggestion + + +class AnnotationStatus(str, Enum): + """Lifecycle status of annotation.""" + PENDING = "pending" # Waiting for user review (suggestions) + ACCEPTED = "accepted" # Applied to content + REJECTED = "rejected" # User dismissed suggestion + + +class Annotation(Document): + """ + Unified annotation model for all content types. + + Supports both user edits and AI-powered suggestions across + memories, transcripts, and future content types (chat, action items, etc.). + + Design: Polymorphic model with type-specific fields based on annotation_type. + """ + + # Identity + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + + # Classification + annotation_type: AnnotationType + user_id: Indexed(str) + source: AnnotationSource = Field(default=AnnotationSource.USER) + status: AnnotationStatus = Field(default=AnnotationStatus.ACCEPTED) + + # Content + original_text: str # Text before correction + corrected_text: str # Text after correction + + # Polymorphic References (based on annotation_type) + # For MEMORY annotations: + memory_id: Optional[str] = None + + # For TRANSCRIPT annotations: + conversation_id: Optional[str] = None + segment_index: Optional[int] = None + + # Timestamps (Python 3.12+ compatible) + created_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) + ) + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) + ) + + class Settings: + name = "annotations" + # Create indexes on commonly queried fields + # Note: Enum fields and Optional fields don't use Indexed() wrapper + indexes = [ + "annotation_type", # Query by type (memory vs transcript) + "user_id", # User-scoped queries + "status", # Filter by status (pending/accepted/rejected) + "memory_id", # Lookup annotations for specific memory + "conversation_id", # Lookup annotations for specific conversation + ] + + def is_memory_annotation(self) -> bool: + """Check if this is a memory annotation.""" + return self.annotation_type == AnnotationType.MEMORY + + def is_transcript_annotation(self) -> bool: + """Check if this is a transcript annotation.""" + return self.annotation_type == AnnotationType.TRANSCRIPT + + def is_pending_suggestion(self) -> bool: + """Check if this is a pending AI suggestion.""" + return ( + self.source == AnnotationSource.MODEL_SUGGESTION + and self.status == AnnotationStatus.PENDING + ) + + +# Pydantic Request/Response Models + + +class AnnotationCreateBase(BaseModel): + """Base model for annotation creation.""" + original_text: str + corrected_text: str + status: AnnotationStatus = AnnotationStatus.ACCEPTED + + +class MemoryAnnotationCreate(AnnotationCreateBase): + """Create memory annotation request.""" + memory_id: str + + +class TranscriptAnnotationCreate(AnnotationCreateBase): + """Create transcript annotation request.""" + conversation_id: str + segment_index: int + + +class AnnotationResponse(BaseModel): + """Annotation response for API.""" + id: str + annotation_type: AnnotationType + user_id: str + memory_id: Optional[str] = None + conversation_id: Optional[str] = None + segment_index: Optional[int] = None + original_text: str + corrected_text: str + status: AnnotationStatus + source: AnnotationSource + created_at: datetime + + class Config: + from_attributes = True # Pydantic v2 compatibility diff --git a/backends/advanced/src/advanced_omi_backend/routers/api_router.py b/backends/advanced/src/advanced_omi_backend/routers/api_router.py index 791bd5ca..e974ff99 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/api_router.py +++ b/backends/advanced/src/advanced_omi_backend/routers/api_router.py @@ -12,6 +12,7 @@ from .modules import ( admin_router, + annotation_router, audio_router, chat_router, client_router, @@ -32,6 +33,7 @@ # Include all sub-routers router.include_router(admin_router) +router.include_router(annotation_router) router.include_router(audio_router) router.include_router(user_router) router.include_router(chat_router) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py b/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py index 3c8e4ceb..17fe5c1a 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/__init__.py @@ -7,6 +7,7 @@ - client_routes: Active client monitoring and management - conversation_routes: Conversation CRUD and audio processing - memory_routes: Memory management, search, and debug +- annotation_routes: Annotation CRUD for memories and transcripts - system_routes: System utilities and metrics - queue_routes: Job queue management and monitoring - audio_routes: Audio file uploads and processing @@ -16,6 +17,7 @@ """ from .admin_routes import router as admin_router +from .annotation_routes import router as annotation_router from .audio_routes import router as audio_router from .chat_routes import router as chat_router from .client_routes import router as client_router @@ -30,6 +32,7 @@ __all__ = [ "admin_router", + "annotation_router", "audio_router", "chat_router", "client_router", diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py new file mode 100644 index 00000000..e1e99644 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/annotation_routes.py @@ -0,0 +1,318 @@ +""" +Annotation routes for Chronicle API. + +Handles annotation CRUD operations for memories and transcripts. +Supports both user edits and AI-powered suggestions. +""" + +import logging +from typing import List +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import JSONResponse + +from advanced_omi_backend.auth import current_active_user +from advanced_omi_backend.users import User +from advanced_omi_backend.models.annotation import ( + Annotation, + AnnotationType, + AnnotationStatus, + MemoryAnnotationCreate, + TranscriptAnnotationCreate, + AnnotationResponse, +) +from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.services.memory import get_memory_service + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/annotations", tags=["annotations"]) + + +@router.post("/memory", response_model=AnnotationResponse) +async def create_memory_annotation( + annotation_data: MemoryAnnotationCreate, + current_user: User = Depends(current_active_user), +): + """ + Create annotation for memory edit. + + - Validates user owns memory + - Creates annotation record + - Updates memory content in vector store + - Re-embeds if content changed + """ + try: + memory_service = get_memory_service() + + # Verify memory ownership + try: + memory = await memory_service.get_memory( + annotation_data.memory_id, current_user.user_id + ) + if not memory: + raise HTTPException(status_code=404, detail="Memory not found") + except Exception as e: + logger.error(f"Error fetching memory: {e}") + raise HTTPException(status_code=404, detail="Memory not found") + + # Create annotation + annotation = Annotation( + annotation_type=AnnotationType.MEMORY, + user_id=current_user.user_id, + memory_id=annotation_data.memory_id, + original_text=annotation_data.original_text, + corrected_text=annotation_data.corrected_text, + status=annotation_data.status, + ) + await annotation.save() + logger.info( + f"Created memory annotation {annotation.id} for memory {annotation_data.memory_id}" + ) + + # Update memory content if accepted + if annotation.status == AnnotationStatus.ACCEPTED: + try: + await memory_service.update_memory( + memory_id=annotation_data.memory_id, + content=annotation_data.corrected_text, + user_id=current_user.user_id, + ) + logger.info( + f"Updated memory {annotation_data.memory_id} with corrected text" + ) + except Exception as e: + logger.error(f"Error updating memory: {e}") + # Annotation is saved, but memory update failed - log but don't fail the request + logger.warning( + f"Memory annotation {annotation.id} saved but memory update failed" + ) + + return AnnotationResponse.model_validate(annotation) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error creating memory annotation: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to create memory annotation: {str(e)}", + ) + + +@router.post("/transcript", response_model=AnnotationResponse) +async def create_transcript_annotation( + annotation_data: TranscriptAnnotationCreate, + current_user: User = Depends(current_active_user), +): + """ + Create annotation for transcript segment edit. + + - Validates user owns conversation + - Creates annotation record + - Updates transcript segment + - Triggers memory reprocessing + """ + try: + # Verify conversation ownership + conversation = await Conversation.find_one( + Conversation.conversation_id == annotation_data.conversation_id, + Conversation.user_id == current_user.user_id, + ) + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found") + + # Validate segment index + active_transcript = conversation.get_active_transcript() + if ( + not active_transcript + or annotation_data.segment_index >= len(active_transcript.segments) + ): + raise HTTPException(status_code=400, detail="Invalid segment index") + + # Create annotation + annotation = Annotation( + annotation_type=AnnotationType.TRANSCRIPT, + user_id=current_user.user_id, + conversation_id=annotation_data.conversation_id, + segment_index=annotation_data.segment_index, + original_text=annotation_data.original_text, + corrected_text=annotation_data.corrected_text, + status=annotation_data.status, + ) + await annotation.save() + logger.info( + f"Created transcript annotation {annotation.id} for conversation {annotation_data.conversation_id} segment {annotation_data.segment_index}" + ) + + # Update transcript segment if accepted + if annotation.status == AnnotationStatus.ACCEPTED: + segment = active_transcript.segments[annotation_data.segment_index] + segment.text = annotation_data.corrected_text + await conversation.save() + logger.info( + f"Updated transcript segment {annotation_data.segment_index} in conversation {annotation_data.conversation_id}" + ) + + # Trigger memory reprocessing + try: + from advanced_omi_backend.workers.queue_manager import ( + enqueue_memory_processing, + ) + from advanced_omi_backend.models.job import JobPriority + + await enqueue_memory_processing( + client_id=conversation.client_id, + user_id=current_user.user_id, + user_email=current_user.email, + conversation_id=conversation.conversation_id, + priority=JobPriority.NORMAL, + ) + logger.info( + f"Enqueued memory reprocessing for conversation {annotation_data.conversation_id}" + ) + except Exception as e: + logger.error(f"Error enqueuing memory reprocessing: {e}") + # Annotation and segment update succeeded, but reprocessing failed + logger.warning( + f"Transcript annotation {annotation.id} saved but memory reprocessing failed" + ) + + return AnnotationResponse.model_validate(annotation) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error creating transcript annotation: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to create transcript annotation: {str(e)}", + ) + + +@router.get("/memory/{memory_id}", response_model=List[AnnotationResponse]) +async def get_memory_annotations( + memory_id: str, + current_user: User = Depends(current_active_user), +): + """Get all annotations for a memory.""" + try: + annotations = await Annotation.find( + Annotation.annotation_type == AnnotationType.MEMORY, + Annotation.memory_id == memory_id, + Annotation.user_id == current_user.user_id, + ).to_list() + + return [AnnotationResponse.model_validate(a) for a in annotations] + + except Exception as e: + logger.error(f"Error fetching memory annotations: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to fetch memory annotations: {str(e)}", + ) + + +@router.get("/transcript/{conversation_id}", response_model=List[AnnotationResponse]) +async def get_transcript_annotations( + conversation_id: str, + current_user: User = Depends(current_active_user), +): + """Get all annotations for a conversation's transcript.""" + try: + annotations = await Annotation.find( + Annotation.annotation_type == AnnotationType.TRANSCRIPT, + Annotation.conversation_id == conversation_id, + Annotation.user_id == current_user.user_id, + ).to_list() + + return [AnnotationResponse.model_validate(a) for a in annotations] + + except Exception as e: + logger.error(f"Error fetching transcript annotations: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to fetch transcript annotations: {str(e)}", + ) + + +@router.patch("/{annotation_id}/status") +async def update_annotation_status( + annotation_id: str, + status: AnnotationStatus, + current_user: User = Depends(current_active_user), +): + """ + Accept or reject AI-generated suggestions. + + Used for pending model suggestions in the UI. + """ + try: + annotation = await Annotation.find_one( + Annotation.id == annotation_id, + Annotation.user_id == current_user.user_id, + ) + if not annotation: + raise HTTPException(status_code=404, detail="Annotation not found") + + old_status = annotation.status + annotation.status = status + annotation.updated_at = datetime.now(timezone.utc) + + # If accepting a pending suggestion, apply the correction + if ( + status == AnnotationStatus.ACCEPTED + and old_status == AnnotationStatus.PENDING + ): + if annotation.is_memory_annotation(): + # Update memory + try: + memory_service = get_memory_service() + await memory_service.update_memory( + memory_id=annotation.memory_id, + content=annotation.corrected_text, + user_id=current_user.user_id, + ) + logger.info( + f"Applied suggestion to memory {annotation.memory_id}" + ) + except Exception as e: + logger.error(f"Error applying memory suggestion: {e}") + # Don't fail the status update if memory update fails + elif annotation.is_transcript_annotation(): + # Update transcript segment + try: + conversation = await Conversation.find_one( + Conversation.conversation_id == annotation.conversation_id + ) + if conversation: + transcript = conversation.get_active_transcript() + if ( + transcript + and annotation.segment_index < len(transcript.segments) + ): + transcript.segments[ + annotation.segment_index + ].text = annotation.corrected_text + await conversation.save() + logger.info( + f"Applied suggestion to transcript segment {annotation.segment_index}" + ) + except Exception as e: + logger.error(f"Error applying transcript suggestion: {e}") + # Don't fail the status update if segment update fails + + await annotation.save() + logger.info(f"Updated annotation {annotation_id} status to {status}") + + return {"status": "updated", "annotation_id": annotation_id, "new_status": status} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating annotation status: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to update annotation status: {str(e)}", + ) diff --git a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py index 3f48fe1c..be2b3bc4 100644 --- a/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py +++ b/backends/advanced/src/advanced_omi_backend/routers/modules/conversation_routes.py @@ -7,11 +7,12 @@ import logging from typing import Optional -from fastapi import APIRouter, Depends, Query +from fastapi import APIRouter, Depends, Query, HTTPException, Response from advanced_omi_backend.auth import current_active_user from advanced_omi_backend.controllers import conversation_controller, audio_controller from advanced_omi_backend.users import User +from advanced_omi_backend.models.conversation import Conversation logger = logging.getLogger(__name__) @@ -165,6 +166,118 @@ async def get_conversation_waveform( } +@router.get("/{conversation_id}/metadata") +async def get_conversation_metadata( + conversation_id: str, + current_user: User = Depends(current_active_user) +) -> dict: + """ + Get conversation metadata (duration, etc.) without loading audio. + + This endpoint provides lightweight access to conversation metadata, + useful for the speaker service to check duration before deciding + whether to chunk audio processing. + + Returns: + { + "conversation_id": str, + "duration": float, # Total duration in seconds + "created_at": datetime, + "has_audio": bool + } + """ + conversation = await Conversation.find_one( + Conversation.conversation_id == conversation_id + ) + + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found") + + # Check ownership (admins can access all) + if not current_user.is_superuser and conversation.user_id != str(current_user.id): + raise HTTPException(status_code=403, detail="Access denied") + + return { + "conversation_id": conversation_id, + "duration": conversation.audio_total_duration or 0.0, + "created_at": conversation.created_at, + "has_audio": (conversation.audio_total_duration or 0.0) > 0 + } + + +@router.get("/{conversation_id}/audio-segments") +async def get_audio_segment( + conversation_id: str, + start: float = Query(0.0, description="Start time in seconds"), + duration: Optional[float] = Query(None, description="Duration in seconds (omit for full audio)"), + current_user: User = Depends(current_active_user) +) -> Response: + """ + Get audio segment from a conversation. + + This endpoint enables the speaker service to fetch audio in time-bounded + segments without loading the entire file into memory. The speaker service + controls chunk size based on its own memory constraints. + + Args: + conversation_id: Conversation identifier + start: Start time in seconds (default: 0.0) + duration: Duration in seconds (if None, returns all audio from start) + + Returns: + WAV audio bytes (16kHz, mono) for the requested time range + """ + from advanced_omi_backend.utils.audio_chunk_utils import reconstruct_audio_segment + + # Verify conversation exists and user has access + conversation = await Conversation.find_one( + Conversation.conversation_id == conversation_id + ) + + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found") + + # Check ownership (admins can access all) + if not current_user.is_superuser and conversation.user_id != str(current_user.id): + raise HTTPException(status_code=403, detail="Access denied") + + # Calculate end time + total_duration = conversation.audio_total_duration or 0.0 + if total_duration == 0: + raise HTTPException(status_code=404, detail="No audio available for this conversation") + + if duration is None: + end = total_duration + else: + end = min(start + duration, total_duration) + + # Validate time range + if start < 0 or start >= total_duration: + raise HTTPException(status_code=400, detail=f"Invalid start time: {start}s (max: {total_duration}s)") + + # Get audio chunks for time range + try: + wav_bytes = await reconstruct_audio_segment( + conversation_id=conversation_id, + start_time=start, + end_time=end + ) + except Exception as e: + logger.error(f"Failed to reconstruct audio segment for {conversation_id[:12]}: {e}") + raise HTTPException(status_code=500, detail=f"Failed to reconstruct audio: {str(e)}") + + return Response( + content=wav_bytes, + media_type="audio/wav", + headers={ + "Content-Disposition": f"attachment; filename=segment_{start}_{end}.wav", + "X-Audio-Start": str(start), + "X-Audio-End": str(end), + "X-Audio-Duration": str(end - start) + } + ) + + @router.delete("/{conversation_id}") async def delete_conversation( conversation_id: str, diff --git a/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py b/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py index fa73c526..7712ac2c 100644 --- a/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py +++ b/backends/advanced/src/advanced_omi_backend/services/memory/providers/chronicle.py @@ -321,6 +321,77 @@ async def get_memory(self, memory_id: str, user_id: Optional[str] = None) -> Opt memory_logger.error(f"Get memory failed: {e}") return None + async def update_memory( + self, + memory_id: str, + content: Optional[str] = None, + metadata: Optional[dict[str, Any]] = None, + user_id: Optional[str] = None, + user_email: Optional[str] = None + ) -> bool: + """Update a specific memory's content and/or metadata. + + Regenerates embeddings when content is updated. + + Args: + memory_id: Unique identifier of the memory to update + content: New content for the memory (if None, content is not updated) + metadata: New metadata to merge with existing (if None, metadata is not updated) + user_id: Optional user ID for authentication + user_email: Optional user email for authentication + + Returns: + True if update succeeded, False otherwise + """ + if not self._initialized: + await self.initialize() + + try: + # Get existing memory + existing_memory = await self.vector_store.get_memory(memory_id, user_id) + if not existing_memory: + memory_logger.warning(f"Memory {memory_id} not found for update") + return False + + # Determine new content and metadata + new_content = content if content is not None else existing_memory.content + new_metadata = {**existing_memory.metadata} + if metadata: + new_metadata.update(metadata) + + # Update timestamps + new_metadata["updated_at"] = str(int(time.time())) + + # Generate new embedding if content changed + if content is not None: + new_embedding = await self.llm_provider.generate_embedding(new_content) + else: + # If content didn't change, reuse existing embedding + if existing_memory.embedding: + new_embedding = existing_memory.embedding + else: + # No existing embedding, generate one + new_embedding = await self.llm_provider.generate_embedding(new_content) + + # Update in vector store + success = await self.vector_store.update_memory( + memory_id=memory_id, + new_content=new_content, + new_embedding=new_embedding, + new_metadata=new_metadata + ) + + if success: + memory_logger.info(f"โœ… Updated memory {memory_id}") + else: + memory_logger.error(f"Failed to update memory {memory_id}") + + return success + + except Exception as e: + memory_logger.error(f"Error updating memory {memory_id}: {e}", exc_info=True) + return False + async def delete_memory(self, memory_id: str, user_id: Optional[str] = None, user_email: Optional[str] = None) -> bool: """Delete a specific memory by ID. diff --git a/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py b/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py index fb146c08..cc47169e 100644 --- a/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py +++ b/backends/advanced/src/advanced_omi_backend/speaker_recognition_client.py @@ -67,14 +67,21 @@ def __init__(self, service_url: Optional[str] = None): logger.info("Speaker recognition client disabled (no service URL configured)") async def diarize_identify_match( - self, audio_data: bytes, transcript_data: Dict, user_id: Optional[str] = None + self, + conversation_id: str, + backend_token: str, + transcript_data: Dict, + user_id: Optional[str] = None ) -> Dict: """ Perform diarization, speaker identification, and word-to-speaker matching. - Routes to appropriate endpoint based on diarization source configuration. + + Speaker service fetches audio from backend and handles chunking based on its + own memory constraints. Args: - audio_data: WAV audio data as bytes (in-memory) + conversation_id: Conversation ID for speaker service to fetch audio + backend_token: JWT token for speaker service to authenticate with backend transcript_data: Dict containing words array and text from transcription user_id: Optional user ID for speaker identification @@ -86,7 +93,7 @@ async def diarize_identify_match( return {} try: - logger.info(f"๐ŸŽค Identifying speakers from in-memory audio ({len(audio_data) / 1024 / 1024:.2f} MB)") + logger.info(f"๐ŸŽค Calling speaker service with conversation_id: {conversation_id[:12]}...") # Read diarization source from existing config system from advanced_omi_backend.config import load_diarization_settings_from_file @@ -94,11 +101,10 @@ async def diarize_identify_match( diarization_source = config.get("diarization_source", "pyannote") async with aiohttp.ClientSession() as session: - # Prepare the audio data for upload (no disk I/O!) + # Prepare form data with conversation_id + backend_token form_data = aiohttp.FormData() - form_data.add_field( - "file", audio_data, filename="audio.wav", content_type="audio/wav" - ) + form_data.add_field("conversation_id", conversation_id) + form_data.add_field("backend_token", backend_token) if diarization_source == "deepgram": # DEEPGRAM DIARIZATION PATH: We EXPECT transcript has speaker info from Deepgram diff --git a/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py b/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py index 2c5e06ed..fd3cda79 100644 --- a/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py +++ b/backends/advanced/src/advanced_omi_backend/utils/audio_chunk_utils.py @@ -495,6 +495,108 @@ async def reconstruct_audio_segments( start_time += segment_duration +async def reconstruct_audio_segment( + conversation_id: str, + start_time: float, + end_time: float +) -> bytes: + """ + Reconstruct audio for a specific time range from MongoDB chunks. + + This function returns a single audio segment for the specified time range, + enabling on-demand access to conversation audio without loading the entire + file into memory. Used by the audio segment API endpoint. + + Args: + conversation_id: Conversation ID + start_time: Start time in seconds + end_time: End time in seconds + + Returns: + WAV audio bytes (16kHz mono or original format) + + Raises: + ValueError: If conversation not found or has no audio + Exception: If audio reconstruction fails + + Example: + >>> # Get first 60 seconds of audio + >>> wav_bytes = await reconstruct_audio_segment(conv_id, 0.0, 60.0) + >>> # Save to file + >>> with open("segment.wav", "wb") as f: + ... f.write(wav_bytes) + """ + from advanced_omi_backend.models.conversation import Conversation + + # Validate inputs + if start_time < 0: + raise ValueError(f"start_time must be >= 0, got {start_time}") + if end_time <= start_time: + raise ValueError(f"end_time ({end_time}) must be > start_time ({start_time})") + + # Get conversation metadata + conversation = await Conversation.get(conversation_id) + + if not conversation: + raise ValueError(f"Conversation {conversation_id} not found") + + total_duration = conversation.audio_total_duration or 0.0 + + if total_duration == 0: + raise ValueError(f"Conversation {conversation_id} has no audio") + + # Clamp end_time to conversation duration + end_time = min(end_time, total_duration) + + # Get audio format from first chunk + first_chunk = await AudioChunkDocument.find_one( + AudioChunkDocument.conversation_id == conversation_id + ) + + if not first_chunk: + raise ValueError(f"No audio chunks found for conversation {conversation_id}") + + sample_rate = first_chunk.sample_rate + channels = first_chunk.channels + + # Get chunks that overlap with this time range + chunks = await AudioChunkDocument.find( + AudioChunkDocument.conversation_id == conversation_id, + AudioChunkDocument.start_time < end_time, # Chunk starts before segment ends + AudioChunkDocument.end_time > start_time, # Chunk ends after segment starts + ).sort(+AudioChunkDocument.chunk_index).to_list() + + if not chunks: + logger.warning( + f"No chunks found for time range {start_time:.1f}s - {end_time:.1f}s " + f"in conversation {conversation_id[:8]}..." + ) + # Return silence for empty range + return await build_wav_from_pcm( + pcm_data=b"", + sample_rate=sample_rate, + channels=channels, + ) + + # Decode and concatenate chunks + pcm_data = await concatenate_chunks_to_pcm(chunks) + + # Build WAV file for this segment + wav_bytes = await build_wav_from_pcm( + pcm_data=pcm_data, + sample_rate=sample_rate, + channels=channels, + ) + + logger.info( + f"Reconstructed audio segment for {conversation_id[:8]}...: " + f"{start_time:.1f}s - {end_time:.1f}s " + f"({len(chunks)} chunks, {len(wav_bytes)} bytes)" + ) + + return wav_bytes + + def filter_transcript_by_time( transcript_data: dict, start_time: float, @@ -569,7 +671,7 @@ async def convert_audio_to_chunks( Number of chunks created Raises: - ValueError: If audio duration exceeds 30 minutes + ValueError: If audio duration exceeds 2 hours Example: >>> # Convert from memory without disk write @@ -590,12 +692,12 @@ async def convert_audio_to_chunks( # Calculate audio duration and validate maximum limit bytes_per_second = sample_rate * sample_width * channels total_duration_seconds = len(audio_data) / bytes_per_second - MAX_DURATION_SECONDS = 1800 # 30 minutes (180 chunks @ 10s each) + MAX_DURATION_SECONDS = 7200 # 2 hours (720 chunks @ 10s each) if total_duration_seconds > MAX_DURATION_SECONDS: raise ValueError( f"Audio duration ({total_duration_seconds:.1f}s) exceeds maximum allowed " - f"({MAX_DURATION_SECONDS}s / 30 minutes). Please split the file into smaller segments." + f"({MAX_DURATION_SECONDS}s / 2 hours). Please split the file into smaller segments." ) # Calculate chunk size in bytes @@ -719,7 +821,7 @@ async def convert_wav_to_chunks( Raises: FileNotFoundError: If WAV file doesn't exist - ValueError: If WAV file is invalid or exceeds 30 minutes + ValueError: If WAV file is invalid or exceeds 2 hours Example: >>> # Convert uploaded file to chunks @@ -756,12 +858,12 @@ async def convert_wav_to_chunks( # Calculate audio duration and validate maximum limit bytes_per_second = sample_rate * sample_width * channels total_duration_seconds = len(pcm_data) / bytes_per_second - MAX_DURATION_SECONDS = 1800 # 30 minutes (180 chunks @ 10s each) + MAX_DURATION_SECONDS = 7200 # 2 hours (720 chunks @ 10s each) if total_duration_seconds > MAX_DURATION_SECONDS: raise ValueError( f"Audio duration ({total_duration_seconds:.1f}s) exceeds maximum allowed " - f"({MAX_DURATION_SECONDS}s / 30 minutes). Please split the file into smaller segments." + f"({MAX_DURATION_SECONDS}s / 2 hours). Please split the file into smaller segments." ) # Calculate chunk size in bytes diff --git a/backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py new file mode 100644 index 00000000..87ed6cdc --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/workers/annotation_jobs.py @@ -0,0 +1,249 @@ +""" +Background jobs for annotation-based AI suggestions. + +These jobs run periodically via the cron scheduler to: +1. Surface potential errors in transcripts and memories for user review +2. Fine-tune error detection models using accepted/rejected annotations + +TODO: Implement actual LLM-based error detection and model training logic. +""" + +import logging +from datetime import datetime, timezone, timedelta +from typing import List + +from advanced_omi_backend.models.annotation import ( + Annotation, + AnnotationSource, + AnnotationStatus, + AnnotationType, +) +from advanced_omi_backend.models.conversation import Conversation +from advanced_omi_backend.models.user import User + +logger = logging.getLogger(__name__) + + +async def surface_error_suggestions(): + """ + Generate AI suggestions for potential transcript/memory errors. + Runs daily, creates PENDING annotations for user review. + + This is a PLACEHOLDER implementation. To fully implement: + 1. Query recent transcripts and memories (last N days) + 2. Use LLM to analyze content for potential errors: + - Hallucinations (made-up facts) + - Misheard words (audio transcription errors) + - Grammar/spelling issues + - Inconsistencies with other memories + 3. For each potential error: + - Create PENDING annotation with MODEL_SUGGESTION source + - Store original_text and suggested corrected_text + 4. Users can review suggestions in UI (accept/reject) + 5. Accepted suggestions improve future model accuracy + + TODO: Implement LLM-based error detection logic. + """ + logger.info("๐Ÿ“ Checking for annotation suggestions (placeholder)...") + + try: + # Get all users + users = await User.find_all().to_list() + logger.info(f" Found {len(users)} users to analyze") + + for user in users: + # TODO: Query recent conversations for this user (last 7 days) + # recent_conversations = await Conversation.find( + # Conversation.user_id == str(user.id), + # Conversation.created_at >= datetime.now(timezone.utc) - timedelta(days=7) + # ).to_list() + + # TODO: For each conversation, analyze transcripts + # for conversation in recent_conversations: + # active_transcript = conversation.get_active_transcript() + # if not active_transcript: + # continue + # + # # TODO: Use LLM to identify potential errors + # # suggestions = await llm_provider.analyze_transcript_for_errors( + # # segments=active_transcript.segments, + # # context=conversation.summary + # # ) + # + # # TODO: Create PENDING annotations for each suggestion + # # for suggestion in suggestions: + # # annotation = Annotation( + # # annotation_type=AnnotationType.TRANSCRIPT, + # # user_id=str(user.id), + # # conversation_id=conversation.conversation_id, + # # segment_index=suggestion.segment_index, + # # original_text=suggestion.original_text, + # # corrected_text=suggestion.suggested_text, + # # source=AnnotationSource.MODEL_SUGGESTION, + # # status=AnnotationStatus.PENDING + # # ) + # # await annotation.save() + + # TODO: Query recent memories for this user + # recent_memories = await memory_service.get_recent_memories( + # user_id=str(user.id), + # days=7 + # ) + + # TODO: Use LLM to identify potential errors in memories + # for memory in recent_memories: + # # TODO: Analyze memory content for hallucinations/errors + # # suggestions = await llm_provider.analyze_memory_for_errors( + # # content=memory.content, + # # metadata=memory.metadata + # # ) + # + # # TODO: Create PENDING annotations + # # ... + + # Placeholder logging + logger.debug(f" Analyzed user {user.email} (placeholder)") + + logger.info("โœ… Suggestion check complete (placeholder implementation)") + logger.info( + " โ„น๏ธ TODO: Implement LLM-based error detection to create actual suggestions" + ) + + except Exception as e: + logger.error(f"โŒ Error in surface_error_suggestions: {e}", exc_info=True) + raise + + +async def finetune_hallucination_model(): + """ + Fine-tune error detection model using accepted/rejected annotations. + Runs weekly, improves suggestion accuracy over time. + + This is a PLACEHOLDER implementation. To fully implement: + 1. Fetch all accepted annotations (ground truth corrections) + - These show real errors that users confirmed + 2. Fetch all rejected annotations (false positives) + - These show suggestions users disagreed with + 3. Build training dataset: + - Positive examples: accepted annotations (real errors) + - Negative examples: rejected annotations (false alarms) + 4. Fine-tune LLM or update prompt engineering: + - Use accepted examples as few-shot learning + - Adjust model to reduce false positives + 5. Log metrics: + - Acceptance rate, rejection rate + - Most common error types + - Model accuracy improvement + + TODO: Implement model training logic. + """ + logger.info("๐ŸŽ“ Checking for model training opportunities (placeholder)...") + + try: + # Fetch annotation statistics + total_annotations = await Annotation.find().count() + accepted_count = await Annotation.find( + Annotation.status == AnnotationStatus.ACCEPTED, + Annotation.source == AnnotationSource.MODEL_SUGGESTION, + ).count() + rejected_count = await Annotation.find( + Annotation.status == AnnotationStatus.REJECTED, + Annotation.source == AnnotationSource.MODEL_SUGGESTION, + ).count() + + logger.info(f" Total annotations: {total_annotations}") + logger.info(f" Accepted suggestions: {accepted_count}") + logger.info(f" Rejected suggestions: {rejected_count}") + + if accepted_count + rejected_count == 0: + logger.info(" โ„น๏ธ No user feedback yet, skipping training") + return + + # TODO: Fetch accepted annotations (ground truth) + # accepted_annotations = await Annotation.find( + # Annotation.status == AnnotationStatus.ACCEPTED, + # Annotation.source == AnnotationSource.MODEL_SUGGESTION + # ).to_list() + + # TODO: Fetch rejected annotations (false positives) + # rejected_annotations = await Annotation.find( + # Annotation.status == AnnotationStatus.REJECTED, + # Annotation.source == AnnotationSource.MODEL_SUGGESTION + # ).to_list() + + # TODO: Build training dataset + # training_data = [] + # for annotation in accepted_annotations: + # training_data.append({ + # "input": annotation.original_text, + # "output": annotation.corrected_text, + # "label": "error" + # }) + # + # for annotation in rejected_annotations: + # training_data.append({ + # "input": annotation.original_text, + # "output": annotation.original_text, # No change needed + # "label": "correct" + # }) + + # TODO: Fine-tune model or update prompt examples + # if len(training_data) >= MIN_TRAINING_SAMPLES: + # await llm_provider.fine_tune_error_detection( + # training_data=training_data, + # validation_split=0.2 + # ) + # logger.info("โœ… Model fine-tuning complete") + # else: + # logger.info(f" โ„น๏ธ Not enough samples for training (need {MIN_TRAINING_SAMPLES})") + + # Calculate acceptance rate + if accepted_count + rejected_count > 0: + acceptance_rate = ( + accepted_count / (accepted_count + rejected_count) + ) * 100 + logger.info(f" Suggestion acceptance rate: {acceptance_rate:.1f}%") + + logger.info("โœ… Training check complete (placeholder implementation)") + logger.info( + " โ„น๏ธ TODO: Implement model fine-tuning using user feedback data" + ) + + except Exception as e: + logger.error(f"โŒ Error in finetune_hallucination_model: {e}", exc_info=True) + raise + + +# Additional helper functions for future implementation + +async def analyze_common_error_patterns() -> List[dict]: + """ + Analyze accepted annotations to identify common error patterns. + Returns list of patterns for prompt engineering or model training. + + TODO: Implement pattern analysis. + """ + # TODO: Group annotations by error type + # TODO: Find frequent patterns (e.g., "their" โ†’ "there") + # TODO: Return structured patterns for model improvement + return [] + + +async def calculate_suggestion_metrics() -> dict: + """ + Calculate metrics about suggestion quality and user engagement. + + Returns: + dict: Metrics including acceptance rate, response time, etc. + + TODO: Implement metrics calculation. + """ + # TODO: Calculate acceptance/rejection rates + # TODO: Measure time to user response + # TODO: Identify high-confidence vs low-confidence suggestions + # TODO: Track improvement over time + return { + "total_suggestions": 0, + "acceptance_rate": 0.0, + "avg_response_time_hours": 0.0, + } diff --git a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py index 086b3ae1..acdc82c5 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/speaker_jobs.py @@ -15,88 +15,6 @@ logger = logging.getLogger(__name__) -def _merge_overlapping_speaker_segments( - segments: list[dict], - overlap: float -) -> list[dict]: - """ - Merge speaker segments from overlapping audio chunks. - - This function handles segments that may overlap due to chunked processing, - merging segments from the same speaker and resolving conflicts using confidence scores. - - Args: - segments: List of speaker segment dicts with start, end, text, speaker, confidence - overlap: Overlap duration in seconds used during chunking - - Returns: - Merged list of speaker segments - - Example: - >>> segments = [ - ... {"start": 0, "end": 930, "speaker": "Alice", "text": "...", "confidence": 0.9}, - ... {"start": 900, "end": 1830, "speaker": "Alice", "text": "...", "confidence": 0.8}, - ... ] - >>> merged = _merge_overlapping_speaker_segments(segments, overlap=30.0) - >>> # Returns single merged segment from Alice - """ - if not segments: - return [] - - # Sort by start time - segments.sort(key=lambda s: s.get("start", 0)) - - merged = [] - current = segments[0].copy() # Copy to avoid mutating input - - for next_seg in segments[1:]: - # Check if segments overlap - if next_seg["start"] < current["end"]: - # Overlapping - decide how to merge - if current.get("speaker") == next_seg.get("speaker"): - # Same speaker - merge by extending end time - current["end"] = max(current["end"], next_seg["end"]) - - # Combine text (avoid duplication in overlap region) - current_text = current.get("text", "") - next_text = next_seg.get("text", "") - - # Simple text merging - just append if different - if next_text and next_text not in current_text: - current["text"] = f"{current_text} {next_text}".strip() - - # Use higher confidence - current["confidence"] = max( - current.get("confidence", 0), - next_seg.get("confidence", 0) - ) - else: - # Different speakers - use confidence to decide boundary - current_conf = current.get("confidence", 0) - next_conf = next_seg.get("confidence", 0) - - if next_conf > current_conf: - # Next segment more confident, close current and start new - merged.append(current) - current = next_seg.copy() - else: - # Current more confident, adjust next segment start - # Save current, update next to start after current - merged.append(current) - next_seg_copy = next_seg.copy() - next_seg_copy["start"] = current["end"] - current = next_seg_copy - else: - # No overlap, save current and move to next - merged.append(current) - current = next_seg.copy() - - # Don't forget last segment - merged.append(current) - - return merged - - @async_job(redis=True, beanie=True) async def check_enrolled_speakers_job( session_id: str, @@ -266,14 +184,6 @@ async def recognise_speakers_job( "processing_time_seconds": 0 } - # Reconstruct audio from MongoDB chunks - from advanced_omi_backend.utils.audio_chunk_utils import ( - reconstruct_wav_from_conversation, - reconstruct_audio_segments, - filter_transcript_by_time - ) - import os - # Read transcript text and words from the transcript version # (Parameters may be empty if called via job dependency) actual_transcript_text = transcript_text or transcript_version.transcript or "" @@ -298,122 +208,53 @@ async def recognise_speakers_job( "words": actual_words } - # Check if we need to use chunked processing - total_duration = conversation.audio_total_duration or 0.0 - chunk_threshold = float(os.getenv("SPEAKER_CHUNK_THRESHOLD", "1500")) # 25 minutes default + # Generate backend token for speaker service to fetch audio + # Speaker service will check conversation duration and decide + # whether to chunk based on its own memory constraints + from advanced_omi_backend.auth import generate_jwt_for_user + from advanced_omi_backend.users import UserManager - logger.info(f"๐Ÿ“ฆ Reconstructing audio from MongoDB chunks for conversation {conversation_id}") - logger.info(f"๐Ÿ“Š Total duration: {total_duration:.1f}s, Threshold: {chunk_threshold:.1f}s") - - # Call speaker recognition service + # Get user details for token generation try: - speaker_segments = [] - - if total_duration > chunk_threshold: - # Chunked processing for large files - logger.info(f"๐ŸŽค Using chunked processing for large file ({total_duration:.1f}s > {chunk_threshold:.1f}s)") - - segment_duration = float(os.getenv("SPEAKER_CHUNK_SIZE", "900")) # 15 minutes default - overlap = float(os.getenv("SPEAKER_CHUNK_OVERLAP", "30")) # 30 seconds default - - async for wav_data, start_time, end_time in reconstruct_audio_segments( - conversation_id=conversation_id, - segment_duration=segment_duration, - overlap=overlap - ): - logger.info( - f"๐Ÿ“ฆ Processing segment {start_time:.1f}s - {end_time:.1f}s: " - f"{len(wav_data) / 1024 / 1024:.2f} MB" - ) - - # Filter transcript for this time range - segment_transcript = filter_transcript_by_time( - transcript_data, - start_time, - end_time - ) - - # Call speaker service for this segment - speaker_result = await speaker_client.diarize_identify_match( - audio_data=wav_data, - transcript_data=segment_transcript, - user_id=user_id - ) - - # Check for errors from speaker service - if speaker_result.get("error"): - error_type = speaker_result.get("error") - error_message = speaker_result.get("message", "Unknown error") - logger.error(f"๐ŸŽค Speaker service error on segment {start_time:.1f}s: {error_type}") - - # Raise exception for connection failures - if error_type in ("connection_failed", "timeout", "client_error"): - raise RuntimeError(f"Speaker recognition service unavailable: {error_type} - {error_message}") - - # For processing errors, continue with other segments - continue - - # Adjust timestamps to global time - if speaker_result and "segments" in speaker_result: - for seg in speaker_result["segments"]: - seg["start"] += start_time - seg["end"] += start_time - - speaker_segments.extend(speaker_result["segments"]) - - logger.info(f"๐ŸŽค Collected {len(speaker_segments)} segments from chunked processing") - - # Merge overlapping segments - if speaker_segments: - speaker_segments = _merge_overlapping_speaker_segments(speaker_segments, overlap) - logger.info(f"๐ŸŽค After merging overlaps: {len(speaker_segments)} segments") - - # Package as result dict for consistent handling below - speaker_result = {"segments": speaker_segments} - - else: - # Normal processing for files <= threshold - logger.info(f"๐ŸŽค Using normal processing for small file ({total_duration:.1f}s <= {chunk_threshold:.1f}s)") - - # Reconstruct WAV from MongoDB chunks (already in memory as bytes) - wav_data = await reconstruct_wav_from_conversation(conversation_id) - - logger.info( - f"๐Ÿ“ฆ Reconstructed audio from MongoDB chunks: " - f"{len(wav_data) / 1024 / 1024:.2f} MB" - ) - - logger.info(f"๐ŸŽค Calling speaker recognition service...") + user = await UserManager.get(user_id) + if not user: + logger.error(f"User {user_id} not found for token generation") + return { + "success": False, + "conversation_id": conversation_id, + "version_id": version_id, + "error": "User not found", + "processing_time_seconds": time.time() - start_time + } - # Call speaker service with in-memory audio data (no temp file needed!) - speaker_result = await speaker_client.diarize_identify_match( - audio_data=wav_data, # Pass bytes directly, no disk I/O - transcript_data=transcript_data, - user_id=user_id - ) + backend_token = generate_jwt_for_user(user_id, user.email) + logger.info(f"๐Ÿ” Generated backend token for speaker service") - except ValueError as e: - # No chunks found for conversation - logger.error(f"No audio chunks found for conversation {conversation_id}: {e}") + except Exception as token_error: + logger.error(f"Failed to generate backend token: {token_error}", exc_info=True) return { "success": False, "conversation_id": conversation_id, "version_id": version_id, - "error": f"No audio chunks found: {e}", - "processing_time_seconds": time.time() - start_time - } - except Exception as audio_error: - logger.error(f"Failed to reconstruct audio from MongoDB: {audio_error}", exc_info=True) - return { - "success": False, - "conversation_id": conversation_id, - "version_id": version_id, - "error": f"Audio reconstruction failed: {audio_error}", + "error": f"Token generation failed: {token_error}", "processing_time_seconds": time.time() - start_time } - # Continue with speaker recognition result processing + # Call speaker recognition service with conversation_id + # Speaker service will: + # 1. Fetch conversation metadata to check duration + # 2. Decide whether to chunk based on its MAX_DIARIZE_DURATION setting + # 3. Request audio segments via backend API as needed + # 4. Return merged speaker segments + logger.info(f"๐ŸŽค Calling speaker recognition service with conversation_id...") + try: + speaker_result = await speaker_client.diarize_identify_match( + conversation_id=conversation_id, + backend_token=backend_token, + transcript_data=transcript_data, + user_id=user_id + ) # Check for errors from speaker service if speaker_result.get("error"): diff --git a/backends/advanced/webui/src/pages/Conversations.tsx b/backends/advanced/webui/src/pages/Conversations.tsx index bedde106..aa8fc030 100644 --- a/backends/advanced/webui/src/pages/Conversations.tsx +++ b/backends/advanced/webui/src/pages/Conversations.tsx @@ -1,6 +1,6 @@ import { useState, useEffect, useRef, useCallback } from 'react' -import { MessageSquare, RefreshCw, Calendar, User, Play, Pause, MoreVertical, RotateCcw, Zap, ChevronDown, ChevronUp, Trash2 } from 'lucide-react' -import { conversationsApi, BACKEND_URL } from '../services/api' +import { MessageSquare, RefreshCw, Calendar, User, Play, Pause, MoreVertical, RotateCcw, Zap, ChevronDown, ChevronUp, Trash2, Save, X } from 'lucide-react' +import { conversationsApi, annotationsApi, BACKEND_URL } from '../services/api' import ConversationVersionHeader from '../components/ConversationVersionHeader' import { getStorageKey } from '../utils/storage' import { WaveformDisplay } from '../components/audio/WaveformDisplay' @@ -73,6 +73,12 @@ export default function Conversations() { const [reprocessingMemory, setReprocessingMemory] = useState>(new Set()) const [deletingConversation, setDeletingConversation] = useState>(new Set()) + // Transcript segment editing state + const [editingSegment, setEditingSegment] = useState(null) // Format: "conversationId-segmentIndex" + const [editedSegmentText, setEditedSegmentText] = useState('') + const [savingSegment, setSavingSegment] = useState(false) + const [segmentEditError, setSegmentEditError] = useState(null) + // Stable seek handler for waveform click-to-seek const handleSeek = useCallback((conversationId: string, time: number) => { console.log(`๐ŸŽฏ handleSeek called: conversationId=${conversationId}, time=${time.toFixed(2)}s`); @@ -270,6 +276,78 @@ export default function Conversations() { } } + // Transcript segment editing handlers + const handleStartSegmentEdit = (conversationId: string, segmentIndex: number, originalText: string) => { + const segmentKey = `${conversationId}-${segmentIndex}` + setEditingSegment(segmentKey) + setEditedSegmentText(originalText) + setSegmentEditError(null) + } + + const handleSaveSegmentEdit = async (conversationId: string, segmentIndex: number, originalText: string) => { + if (!editedSegmentText.trim()) { + setSegmentEditError('Segment text cannot be empty') + return + } + + if (editedSegmentText === originalText) { + // No changes, just cancel + handleCancelSegmentEdit() + return + } + + try { + setSavingSegment(true) + setSegmentEditError(null) + + // Create transcript annotation + await annotationsApi.createTranscriptAnnotation({ + conversation_id: conversationId, + segment_index: segmentIndex, + original_text: originalText, + corrected_text: editedSegmentText + }) + + // Update local state - find the conversation and update the segment + setConversations(prev => prev.map(conv => { + if (conv.conversation_id === conversationId && conv.segments) { + const updatedSegments = [...conv.segments] + updatedSegments[segmentIndex] = { + ...updatedSegments[segmentIndex], + text: editedSegmentText + } + return { ...conv, segments: updatedSegments } + } + return conv + })) + + // Clear editing state + setEditingSegment(null) + setEditedSegmentText('') + } catch (err: any) { + console.error('Error saving segment edit:', err) + setSegmentEditError(err.response?.data?.detail || err.message || 'Failed to save segment edit') + } finally { + setSavingSegment(false) + } + } + + const handleCancelSegmentEdit = () => { + setEditingSegment(null) + setEditedSegmentText('') + setSegmentEditError(null) + } + + const handleSegmentKeyDown = (e: React.KeyboardEvent, conversationId: string, segmentIndex: number, originalText: string) => { + if (e.key === 'Enter' && (e.ctrlKey || e.metaKey)) { + e.preventDefault() + handleSaveSegmentEdit(conversationId, segmentIndex, originalText) + } else if (e.key === 'Escape') { + e.preventDefault() + handleCancelSegmentEdit() + } + } + const toggleDetailedSummary = async (conversationId: string) => { // If already expanded, just collapse if (expandedDetailedSummaries.has(conversationId)) { @@ -776,16 +854,17 @@ export default function Conversations() { const segmentId = `${conversationKey}-${index}` const isPlaying = playingSegment === segmentId const hasAudio = !!conversation.audio_path + const isEditing = editingSegment === segmentId return (
{/* Play/Pause Button */} - {hasAudio && ( + {hasAudio && !isEditing && (