diff --git a/.github/workflows/README.md b/.github/workflows/README.md index 5e98cd18..0b8987c5 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -1,91 +1,408 @@ -# GitHub Actions CI/CD Setup for Friend Lite +# Chronicle GitHub Workflows -This sets up **automatic GitHub releases** with APK/IPA files whenever you push code. +Documentation for CI/CD workflows and test automation. -## 🚀 How This Works +## Test Workflows Overview -1. You push code to GitHub -2. GitHub automatically builds **both Android APK and iOS IPA** -3. **Creates GitHub Releases** with both files attached -4. You download directly from the **Releases** tab! +Chronicle uses **three separate test workflows** to balance fast PR feedback with comprehensive testing: -## 🎯 Quick Setup (2 Steps) +| Workflow | Trigger | Test Coverage | API Keys | Purpose | +|----------|---------|---------------|----------|---------| +| `robot-tests.yml` | All PRs | ~70% (no-API tests) | ❌ Not required | Fast PR validation | +| `full-tests-with-api.yml` | Push to dev/main | 100% (full suite) | ✅ Required | Comprehensive validation | +| `pr-tests-with-api.yml` | PR label trigger | 100% (full suite) | ✅ Required | Pre-merge API testing | -### Step 1: Get Expo Token -1. Go to [expo.dev](https://expo.dev) and sign in/create account -2. Go to [Access Tokens](https://expo.dev/accounts/[account]/settings/access-tokens) -3. Create a new token and copy it +## Workflow Details -### Step 2: Add GitHub Secret -1. In your GitHub repo: **Settings** → **Secrets and variables** → **Actions** -2. Click **New repository secret** -3. Name: `EXPO_TOKEN` -4. Value: Paste your token from Step 1 -5. Click **Add secret** +### 1. `robot-tests.yml` - PR Tests (No API Keys) -## ⚡ That's It! -# GitHub Actions Workflows +**File**: `.github/workflows/robot-tests.yml` -## Integration Tests +**Trigger**: +```yaml +on: + pull_request: + paths: + - 'tests/**/*.robot' + - 'tests/**/*.py' + - 'backends/advanced/src/**' +``` + +**Characteristics**: +- **No secrets required** - Works for external contributors +- **Excludes**: Tests tagged with `requires-api-keys` +- **Config**: `tests/configs/mock-services.yml` +- **Test Script**: `./run-no-api-tests.sh` +- **Results**: `results-no-api/` +- **Time**: ~10-15 minutes +- **Coverage**: ~70% of test suite + +**Benefits**: +- Fast feedback on PRs +- No API costs for every PR +- External contributors can run full CI +- Most development workflows covered + +**What's Tested**: +- API endpoints (auth, CRUD, permissions) +- Infrastructure (workers, queues, health) +- Basic integration (non-transcription) + +**What's Skipped**: +- Audio upload with transcription +- Memory operations requiring LLM +- Audio streaming with STT +- Full E2E pipeline tests + +### 2. `full-tests-with-api.yml` - Dev/Main Tests (Full Suite) + +**File**: `.github/workflows/full-tests-with-api.yml` + +**Trigger**: +```yaml +on: + push: + branches: [dev, main] + paths: + - 'tests/**' + - 'backends/advanced/src/**' + workflow_dispatch: # Manual trigger available +``` + +**Characteristics**: +- **Requires secrets**: `DEEPGRAM_API_KEY`, `OPENAI_API_KEY`, `HF_TOKEN` +- **Includes**: All tests (including `requires-api-keys`) +- **Config**: `tests/configs/deepgram-openai.yml` +- **Test Script**: `./run-robot-tests.sh` +- **Results**: `results/` +- **Time**: ~20-30 minutes +- **Coverage**: 100% of test suite + +**Benefits**: +- Full validation before deployment +- Catches API integration issues +- Validates real transcription and memory processing +- Comprehensive E2E coverage + +**What's Tested**: +- Everything from `robot-tests.yml` PLUS: +- Audio upload with real transcription +- Memory extraction with LLM +- Audio streaming with STT +- Full E2E pipeline validation + +### 3. `pr-tests-with-api.yml` - Label-Triggered PR Tests + +**File**: `.github/workflows/pr-tests-with-api.yml` + +**Trigger**: +```yaml +on: + pull_request: + types: [labeled, synchronize] +``` + +**Condition**: +```yaml +if: contains(github.event.pull_request.labels.*.name, 'test-with-api-keys') +``` + +**Characteristics**: +- **Requires**: PR labeled with `test-with-api-keys` +- **Requires secrets**: `DEEPGRAM_API_KEY`, `OPENAI_API_KEY`, `HF_TOKEN` +- **Includes**: All tests (same as full-tests-with-api.yml) +- **Config**: `tests/configs/deepgram-openai.yml` +- **Time**: ~20-30 minutes +- **Re-runs**: On new commits while label present -### Automatic Integration Tests (`integration-tests.yml`) -- **Triggers**: Push/PR to `main` or `develop` branches affecting backend code -- **Timeout**: 15 minutes -- **Mode**: Cached mode (better for CI environment) -- **Dependencies**: Requires `DEEPGRAM_API_KEY` and `OPENAI_API_KEY` secrets +**Benefits**: +- Test API integrations before merging +- Useful for PRs modifying transcription/LLM code +- Maintainers can trigger on trusted PRs +- Catches issues before they reach dev/main + +**Use Cases**: +- PRs that modify transcription logic +- PRs that change memory extraction +- PRs that affect audio processing pipeline +- Before merging large feature branches + +## Usage Guide + +### For Contributors + +**Normal PR Workflow**: +1. Push your branch +2. Create PR +3. `robot-tests.yml` runs automatically (~70% coverage) +4. Fix any failures +5. Merge when tests pass + +**Testing API Integrations**: +1. Push your branch +2. Create PR +3. Ask maintainer to add `test-with-api-keys` label +4. `pr-tests-with-api.yml` runs (100% coverage) +5. Fix any failures +6. Merge when tests pass + +### For Maintainers + +**Adding the Label**: +```bash +# Via GitHub UI +1. Go to PR +2. Click "Labels" on right sidebar +3. Select "test-with-api-keys" + +# Via GitHub CLI +gh pr edit --add-label "test-with-api-keys" +``` + +**When to Use Label**: +- PR modifies audio processing or transcription +- PR changes memory extraction logic +- PR affects LLM integration +- Before merging large features +- When in doubt about API changes + +**Removing the Label**: +- Label is automatically retained on new commits +- Remove manually if no longer needed +- Saves API costs if changes don't affect APIs + +## Test Results + +### PR Comments + +All workflows post results as PR comments: + +```markdown +## 🎉 Robot Framework Test Results (No API Keys) + +**Status**: ✅ All tests passed! + +| Metric | Count | +|--------|-------| +| ✅ Passed | 76 | +| ❌ Failed | 0 | +| 📊 Total | 76 | + +### 📊 View Reports +- [Test Report](https://pages-url/report.html) +- [Detailed Log](https://pages-url/log.html) +``` + +### GitHub Pages + +Test reports are automatically deployed to GitHub Pages: +- **Live Reports**: Clickable links in PR comments +- **Persistence**: 30 days retention +- **Format**: HTML reports from Robot Framework + +### Artifacts + +Downloadable artifacts for deeper analysis: +- **HTML Reports**: `robot-test-reports-html-*` +- **XML Results**: `robot-test-results-xml-*` +- **Logs**: `robot-test-logs-*` (on failure only) +- **Retention**: 30 days for reports, 7 days for logs ## Required Secrets -Add these secrets in your GitHub repository settings: +### Repository Secrets +Must be configured in GitHub repository settings: + +```bash +DEEPGRAM_API_KEY # Required for full-tests-with-api.yml +OPENAI_API_KEY # Required for full-tests-with-api.yml +HF_TOKEN # Optional (speaker recognition) ``` -DEEPGRAM_API_KEY=your-deepgram-api-key -OPENAI_API_KEY=your-openai-api-key + +**Setting Secrets**: +1. Go to repository Settings +2. Navigate to Secrets and variables → Actions +3. Click "New repository secret" +4. Add each secret + +### Secret Validation + +Workflows validate secrets before running tests: +```yaml +- name: Verify required secrets + env: + DEEPGRAM_API_KEY: ${{ secrets.DEEPGRAM_API_KEY }} + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + run: | + if [ -z "$DEEPGRAM_API_KEY" ]; then + echo "❌ ERROR: DEEPGRAM_API_KEY secret is not set" + exit 1 + fi ``` -## Test Environment +## Cost Management + +### API Cost Breakdown + +**No-API Tests** (`robot-tests.yml`): +- **Cost**: $0 per run +- **Frequency**: Every PR commit +- **Monthly**: Potentially hundreds of runs +- **Savings**: Significant with external contributors + +**Full Tests** (`full-tests-with-api.yml`, `pr-tests-with-api.yml`): +- **Transcription**: ~$0.10-0.30 per run (Deepgram) +- **LLM**: ~$0.05-0.15 per run (OpenAI) +- **Total**: ~$0.15-0.45 per run +- **Frequency**: dev/main pushes + labeled PRs +- **Monthly**: Typically 10-50 runs + +### Cost Optimization + +**Strategies**: +1. Most PRs use no-API tests (free) +2. Full tests only on protected branches +3. Label-triggered for selective full testing +4. No redundant API calls on every commit + +**Before This System**: +- Every PR: ~$0.45 cost +- 100 PRs/month: ~$45 + +**After This System**: +- Most PRs: $0 cost +- 10 dev/main pushes: ~$4.50 +- 5 labeled PRs: ~$2.25 +- Total: ~$6.75/month (85% savings) + +## Workflow Configuration + +### Common Settings -- **Runtime**: Ubuntu latest with Docker support -- **Python**: 3.12 with uv package manager -- **Services**: MongoDB (port 27018), Qdrant (ports 6335/6336), Backend (port 8001) -- **Test Data**: Isolated test directories and databases -- **Audio**: 4-minute glass blowing tutorial for end-to-end validation +All test workflows share: -## Modes +```yaml +# Performance +timeout-minutes: 30 +runs-on: ubuntu-latest -### Cached Mode (Recommended for CI) -- Reuses containers and data between test runs -- Faster startup time -- Better for containerized CI environments -- Used by default in automatic workflows +# Caching +- uses: actions/cache@v4 + with: + path: /tmp/.buildx-cache + key: ${{ runner.os }}-buildx-${{ hashFiles(...) }} -### Fresh Mode (Recommended for Local Development) -- Completely clean environment each run -- Removes all test data and containers -- Slower but more reliable for debugging -- Can be selected in manual workflow +# Python setup +- uses: actions/setup-python@v5 + with: + python-version: "3.12" + +# UV package manager +- uses: astral-sh/setup-uv@v4 + with: + version: "latest" +``` + +### Test Execution Pattern + +```yaml +- name: Run tests + env: + CLEANUP_CONTAINERS: "false" # Handled by workflow + # API keys if needed + run: | + ./run-{no-api|robot}-tests.sh + TEST_EXIT_CODE=$? + echo "test_exit_code=$TEST_EXIT_CODE" >> $GITHUB_ENV + exit 0 # Don't fail yet + +- name: Fail workflow if tests failed + if: always() + run: | + if [ "${{ env.test_exit_code }}" != "0" ]; then + echo "❌ Tests failed" + exit 1 + fi +``` + +**Benefits**: +- Artifacts uploaded even on test failure +- Clean container teardown guaranteed +- Clear separation of test execution and reporting ## Troubleshooting -1. **Test Timeout**: Increase `timeout_minutes` in manual workflow -2. **Memory Issues**: Check container logs in failed run artifacts -3. **API Key Issues**: Verify secrets are set correctly in repository settings -4. **Fresh Mode Fails**: Try cached mode for comparison +### Workflow Not Triggering -## Local Testing +**Problem**: Workflow doesn't run on PR +**Solutions**: +- Check file paths in workflow trigger +- Verify workflow file syntax (YAML) +- Check repository permissions +- Look for disabled workflows in Settings -To run the same tests locally: +### Secret Errors -```bash -cd backends/advanced-backend +**Problem**: "ERROR: DEEPGRAM_API_KEY secret is not set" +**Solutions**: +- Verify secret is set in repository settings +- Check secret name matches exactly (case-sensitive) +- Ensure workflow has access to secrets +- Fork PRs cannot access secrets (expected) + +### Test Failures + +**Problem**: Tests fail in CI but pass locally +**Solutions**: +- Check environment differences (.env.test) +- Verify test isolation (database cleanup) +- Look for timing issues (increase timeouts) +- Check Docker resource limits in CI + +### Label Workflow Not Running + +**Problem**: Added label but workflow doesn't trigger +**Solutions**: +- Verify label name is exactly `test-with-api-keys` +- Check workflow trigger includes `types: [labeled]` +- Try removing and re-adding label +- Push new commit to trigger synchronize event + +## Maintenance + +### Updating Workflows + +**When to Update**: +- Adding new test categories +- Changing test execution scripts +- Modifying timeout values +- Updating artifact retention + +**Testing Changes**: +1. Create test branch +2. Modify workflow file +3. Push to trigger workflow +4. Verify execution +5. Merge if successful + +### Monitoring + +**Key Metrics**: +- Test pass rate (target: >95%) +- Workflow execution time (target: <30min) +- API costs (target: <$10/month) +- Artifact storage usage -# Install dependencies -uv sync --dev +**Tools**: +- GitHub Actions dashboard +- Workflow run history +- Cost tracking (GitHub billing) +- Test result trends -# Set up environment (copy from .env.template) -cp .env.template .env.test -# Add your API keys to .env.test +## Reference Links -# Run Robot Framework integration tests -uv run robot --outputdir test-results --loglevel INFO tests/integration/integration_test.robot -``` \ No newline at end of file +- **Test Suite README**: `tests/README.md` +- **Testing Guidelines**: `tests/TESTING_GUIDELINES.md` +- **Tag Documentation**: `tests/tags.md` +- **GitHub Actions Docs**: https://docs.github.com/en/actions diff --git a/Docs/audio-pipeline-architecture.md b/Docs/audio-pipeline-architecture.md index f36f6e40..afba52db 100644 --- a/Docs/audio-pipeline-architecture.md +++ b/Docs/audio-pipeline-architecture.md @@ -497,9 +497,8 @@ Session Starts └─────────────┬───────────────────┘ ↓ (when conversation ends) ┌─────────────────────────────────┐ -│ Post-Conversation Pipeline │ ← Parallel batch jobs +│ Post-Conversation Pipeline │ ├─────────────────────────────────┤ -│ • transcribe_full_audio_job │ │ • recognize_speakers_job │ │ • memory_extraction_job │ │ • generate_title_summary_job │ @@ -597,32 +596,16 @@ Session Starts ### Post-Conversation Pipeline -All jobs run **in parallel** after conversation completes: +**Streaming conversations**: Use streaming transcript saved during conversation. No batch re-transcription. -#### 1. Transcribe Full Audio Job +**File uploads**: Batch transcription job runs first, then post-conversation jobs depend on it. -**File**: `backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py` - -**Function**: `transcribe_full_audio_job()` - -**Input**: Audio file from disk (`data/chunks/*.wav`) - -**Process**: -- Batch transcribes entire conversation audio -- Validates meaningful speech -- Marks conversation `deleted` if no speech detected -- Stores transcript, segments, words in MongoDB - -**Container**: `rq-worker` - -#### 2. Recognize Speakers Job +#### 1. Recognize Speakers Job **File**: `backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py` **Function**: `recognize_speakers_job()` -**Prerequisite**: `transcribe_full_audio_job` completes - **Process**: - Sends audio + segments to speaker recognition service - Identifies speakers using voice embeddings @@ -634,13 +617,13 @@ All jobs run **in parallel** after conversation completes: **External Service**: `speaker-recognition` container (if enabled) -#### 3. Memory Extraction Job +#### 2. Memory Extraction Job **File**: `backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py` **Function**: `memory_extraction_job()` -**Prerequisite**: `transcribe_full_audio_job` completes +**Prerequisite**: Speaker recognition job **Process**: - Uses LLM (OpenAI/Ollama) to extract semantic facts @@ -654,24 +637,21 @@ All jobs run **in parallel** after conversation completes: - `ollama` or OpenAI API (LLM) - `qdrant` or OpenMemory MCP (vector storage) -#### 4. Generate Title Summary Job +#### 3. Generate Title Summary Job **File**: `backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py` **Function**: `generate_title_summary_job()` -**Prerequisite**: `transcribe_full_audio_job` completes +**Prerequisite**: Speaker recognition job **Process**: -- Uses LLM to generate: - - Title (short summary) - - Summary (1-2 sentences) - - Detailed summary (paragraph) +- Uses LLM to generate title, summary, detailed summary - Updates conversation document in MongoDB **Container**: `rq-worker` -#### 5. Dispatch Conversation Complete Event +#### 4. Dispatch Conversation Complete Event **File**: `backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py` @@ -679,7 +659,24 @@ All jobs run **in parallel** after conversation completes: **Process**: - Triggers `conversation.complete` plugin event -- Only runs for **file uploads** (not streaming sessions) + +**Container**: `rq-worker` + +#### Batch Transcription Job + +**File**: `backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py` + +**Function**: `transcribe_full_audio_job()` + +**When used**: +- File uploads via `/api/process-audio-files` +- Manual reprocessing via `/api/conversations/{id}/reprocess-transcript` +- NOT used for streaming conversations + +**Process**: +- Reconstructs audio from MongoDB chunks +- Batch transcribes entire audio +- Stores transcript with word-level timestamps **Container**: `rq-worker` diff --git a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py index 7ba3b900..c0640841 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/audio_controller.py @@ -150,14 +150,38 @@ async def upload_and_process_audio_files( exc_info=True ) - # Enqueue post-conversation processing job chain - from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs + # Enqueue batch transcription job first (file uploads need transcription) + from advanced_omi_backend.controllers.queue_controller import ( + start_post_conversation_jobs, + transcription_queue, + JOB_RESULT_TTL + ) + from advanced_omi_backend.workers.transcription_jobs import transcribe_full_audio_job + + version_id = str(uuid.uuid4()) + transcribe_job_id = f"transcribe_{conversation_id[:12]}" + + transcription_job = transcription_queue.enqueue( + transcribe_full_audio_job, + conversation_id, + audio_uuid, + version_id, + "batch", # trigger + job_timeout=1800, # 30 minutes + result_ttl=JOB_RESULT_TTL, + job_id=transcribe_job_id, + description=f"Transcribe uploaded file {conversation_id[:8]}", + meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'client_id': client_id} + ) + + audio_logger.info(f"📥 Enqueued transcription job {transcription_job.id} for uploaded file") + # Enqueue post-conversation processing job chain (depends on transcription) job_ids = start_post_conversation_jobs( conversation_id=conversation_id, audio_uuid=audio_uuid, user_id=user.user_id, - post_transcription=True, # Run batch transcription for uploads + depends_on_job=transcription_job, # Wait for transcription to complete client_id=client_id # Pass client_id for UI tracking ) @@ -166,7 +190,7 @@ async def upload_and_process_audio_files( "status": "processing", "audio_uuid": audio_uuid, "conversation_id": conversation_id, - "transcript_job_id": job_ids['transcription'], + "transcript_job_id": transcription_job.id, "speaker_job_id": job_ids['speaker_recognition'], "memory_job_id": job_ids['memory'], "duration_seconds": round(duration, 2), @@ -174,7 +198,7 @@ async def upload_and_process_audio_files( audio_logger.info( f"✅ Processed {file.filename} → conversation {conversation_id}, " - f"jobs: {job_ids['transcription']} → {job_ids['speaker_recognition']} → {job_ids['memory']}" + f"jobs: {transcription_job.id} → {job_ids['speaker_recognition']} → {job_ids['memory']}" ) except (OSError, IOError) as e: diff --git a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py index b3875861..d83c7075 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/queue_controller.py @@ -367,7 +367,6 @@ def start_post_conversation_jobs( conversation_id: str, audio_uuid: str, user_id: str, - post_transcription: bool = True, transcript_version_id: Optional[str] = None, depends_on_job = None, client_id: Optional[str] = None @@ -376,27 +375,25 @@ def start_post_conversation_jobs( Start post-conversation processing jobs after conversation is created. This creates the standard processing chain after a conversation is created: - 1. [Optional] Transcription job - Batch transcription (if post_transcription=True) - 2. Speaker recognition job - Identifies speakers in audio - 3. Memory extraction job - Extracts memories from conversation (parallel) - 4. Title/summary generation job - Generates title and summary (parallel) + 1. Speaker recognition job - Identifies speakers in audio segments + 2. Memory extraction job - Extracts memories from conversation + 3. Title/summary generation job - Generates title and summary + 4. Event dispatch job - Triggers conversation.complete plugins - Note: Audio is reconstructed from MongoDB chunks, not files. + Note: Batch transcription removed - streaming conversations use streaming transcript. + For file uploads, batch transcription must be enqueued separately before calling this function. Args: conversation_id: Conversation identifier audio_uuid: Audio UUID for job tracking user_id: User identifier - post_transcription: If True, run batch transcription step (for uploads) - If False, skip transcription (streaming already has it) transcript_version_id: Transcript version ID (auto-generated if None) - depends_on_job: Optional job dependency for first job + depends_on_job: Optional job dependency for first job (e.g., transcription for file uploads) client_id: Client ID for UI tracking Returns: - Dict with job IDs (transcription will be None if post_transcription=False) + Dict with job IDs for speaker_recognition, memory, title_summary, event_dispatch """ - from advanced_omi_backend.workers.transcription_jobs import transcribe_full_audio_job from advanced_omi_backend.workers.speaker_jobs import recognise_speakers_job from advanced_omi_backend.workers.memory_jobs import process_memory_job from advanced_omi_backend.workers.conversation_jobs import generate_title_summary_job, dispatch_conversation_complete_event_job @@ -408,30 +405,7 @@ def start_post_conversation_jobs( if client_id: job_meta['client_id'] = client_id - # Step 1: Batch transcription job (ALWAYS run to get correct conversation-relative timestamps) - # Even for streaming, we need batch transcription before cropping to fix cumulative timestamps - transcribe_job_id = f"transcribe_{conversation_id[:12]}" - logger.info(f"🔍 DEBUG: Creating transcribe job with job_id={transcribe_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") - - transcription_job = transcription_queue.enqueue( - transcribe_full_audio_job, - conversation_id, - audio_uuid, - version_id, - "batch", # trigger - job_timeout=1800, # 30 minutes - result_ttl=JOB_RESULT_TTL, - depends_on=depends_on_job, - job_id=transcribe_job_id, - description=f"Transcribe conversation {conversation_id[:8]}", - meta=job_meta - ) - logger.info(f"📥 RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta}") - - # Speaker recognition depends on transcription (no cropping step) - speaker_depends_on = transcription_job - - # Step 2: Speaker recognition job + # Step 1: Speaker recognition job (uses streaming transcript from conversation document) speaker_job_id = f"speaker_{conversation_id[:12]}" logger.info(f"🔍 DEBUG: Creating speaker job with job_id={speaker_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}") @@ -441,12 +415,15 @@ def start_post_conversation_jobs( version_id, job_timeout=1200, # 20 minutes result_ttl=JOB_RESULT_TTL, - depends_on=speaker_depends_on, + depends_on=depends_on_job, # Direct dependency (no transcription job) job_id=speaker_job_id, description=f"Speaker recognition for conversation {conversation_id[:8]}", meta=job_meta ) - logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (depends on {speaker_depends_on.id})") + if depends_on_job: + logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (depends on {depends_on_job.id})") + else: + logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (no dependencies, starts immediately)") # Step 3: Memory extraction job (parallel with title/summary) memory_job_id = f"memory_{conversation_id[:12]}" @@ -504,7 +481,6 @@ def start_post_conversation_jobs( logger.info(f"📥 RQ: Enqueued conversation complete event job {event_dispatch_job.id}, meta={event_dispatch_job.meta} (depends on {memory_job.id} and {title_summary_job.id})") return { - 'transcription': transcription_job.id if transcription_job else None, 'speaker_recognition': speaker_job.id, 'memory': memory_job.id, 'title_summary': title_summary_job.id, diff --git a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py index d27c2131..1f05e497 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py @@ -890,20 +890,44 @@ async def _process_batch_audio_complete( ) # Continue anyway - transcription job will handle it - # Enqueue post-conversation processing job chain - from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs + # Enqueue batch transcription job first (file uploads need transcription) + from advanced_omi_backend.controllers.queue_controller import ( + start_post_conversation_jobs, + transcription_queue, + JOB_RESULT_TTL + ) + from advanced_omi_backend.workers.transcription_jobs import transcribe_full_audio_job + + version_id = str(uuid.uuid4()) + transcribe_job_id = f"transcribe_{conversation_id[:12]}" + + transcription_job = transcription_queue.enqueue( + transcribe_full_audio_job, + conversation_id, + audio_uuid, + version_id, + "batch", # trigger + job_timeout=1800, # 30 minutes + result_ttl=JOB_RESULT_TTL, + job_id=transcribe_job_id, + description=f"Transcribe batch audio {conversation_id[:8]}", + meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'client_id': client_id} + ) + + application_logger.info(f"📥 Batch mode: Enqueued transcription job {transcription_job.id}") + # Enqueue post-conversation processing job chain (depends on transcription) job_ids = start_post_conversation_jobs( conversation_id=conversation_id, audio_uuid=audio_uuid, user_id=None, # Will be read from conversation in DB by jobs - post_transcription=True, # Run batch transcription for uploads + depends_on_job=transcription_job, # Wait for transcription to complete client_id=client_id # Pass client_id for UI tracking ) application_logger.info( f"✅ Batch mode: Enqueued job chain for {conversation_id} - " - f"transcription ({job_ids['transcription']}) → " + f"transcription ({transcription_job.id}) → " f"speaker ({job_ids['speaker_recognition']}) → " f"memory ({job_ids['memory']})" ) diff --git a/backends/advanced/src/advanced_omi_backend/models/conversation.py b/backends/advanced/src/advanced_omi_backend/models/conversation.py index 24091ef8..3d053536 100644 --- a/backends/advanced/src/advanced_omi_backend/models/conversation.py +++ b/backends/advanced/src/advanced_omi_backend/models/conversation.py @@ -7,7 +7,7 @@ from datetime import datetime from typing import Dict, List, Optional, Any, Union -from pydantic import BaseModel, Field, model_validator, computed_field +from pydantic import BaseModel, Field, model_validator, computed_field, field_validator from enum import Enum import uuid @@ -17,17 +17,7 @@ class Conversation(Document): """Complete conversation model with versioned processing.""" - # Nested Enums - class TranscriptProvider(str, Enum): - """ - Transcription provider identifiers. - - Note: Actual providers are configured in config.yml. - Any provider name from config.yml is valid - this enum is for common values only. - """ - DEEPGRAM = "deepgram" - SPEECH_DETECTION = "speech_detection" - UNKNOWN = "unknown" + # Nested Enums - Note: TranscriptProvider accepts any string value for flexibility class MemoryProvider(str, Enum): """Supported memory providers.""" @@ -52,6 +42,13 @@ class EndReason(str, Enum): UNKNOWN = "unknown" # Unknown or legacy reason # Nested Models + class Word(BaseModel): + """Individual word with timestamp in a transcript.""" + word: str = Field(description="Word text") + start: float = Field(description="Start time in seconds") + end: float = Field(description="End time in seconds") + confidence: Optional[float] = Field(None, description="Confidence score (0-1)") + class SpeakerSegment(BaseModel): """Individual speaker segment in a transcript.""" start: float = Field(description="Start time in seconds") @@ -59,13 +56,14 @@ class SpeakerSegment(BaseModel): text: str = Field(description="Transcript text for this segment") speaker: str = Field(description="Speaker identifier") confidence: Optional[float] = Field(None, description="Confidence score (0-1)") + words: List["Conversation.Word"] = Field(default_factory=list, description="Word-level timestamps for this segment") class TranscriptVersion(BaseModel): """Version of a transcript with processing metadata.""" version_id: str = Field(description="Unique version identifier") transcript: Optional[str] = Field(None, description="Full transcript text") segments: List["Conversation.SpeakerSegment"] = Field(default_factory=list, description="Speaker segments") - provider: Optional["Conversation.TranscriptProvider"] = Field(None, description="Transcription provider used") + provider: Optional[str] = Field(None, description="Transcription provider used (deepgram, parakeet, etc.)") model: Optional[str] = Field(None, description="Model used (e.g., nova-3, parakeet)") created_at: datetime = Field(description="When this version was created") processing_time_seconds: Optional[float] = Field(None, description="Time taken to process") @@ -249,7 +247,7 @@ def add_transcript_version( version_id: str, transcript: str, segments: List["Conversation.SpeakerSegment"], - provider: "Conversation.TranscriptProvider", + provider: str, # Provider name from config.yml (deepgram, parakeet, etc.) model: Optional[str] = None, processing_time_seconds: Optional[float] = None, metadata: Optional[Dict[str, Any]] = None, diff --git a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py index 9430018e..e9e754bd 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py @@ -521,21 +521,83 @@ async def open_conversation_job( logger.info(f"📦 MongoDB audio chunks ready for conversation {conversation_id[:12]}") - # Enqueue post-conversation processing pipeline + # Get final streaming transcript and save to conversation + logger.info(f"📝 Retrieving final streaming transcript for conversation {conversation_id[:12]}") + final_transcript = await aggregator.get_combined_results(session_id) + + # Fetch conversation from database to ensure we have latest state + conversation = await Conversation.find_one(Conversation.conversation_id == conversation_id) + if not conversation: + logger.error(f"❌ Conversation {conversation_id} not found in database") + raise ValueError(f"Conversation {conversation_id} not found") + + # Create transcript version from streaming results + version_id = f"streaming_{session_id[:12]}" + transcript_text = final_transcript.get("text", "") + segments_data = final_transcript.get("segments", []) + + # Convert segments to SpeakerSegment format with word-level timestamps + segments = [ + Conversation.SpeakerSegment( + start=seg.get("start", 0.0), + end=seg.get("end", 0.0), + text=seg.get("text", ""), + speaker=seg.get("speaker", "SPEAKER_00"), + confidence=seg.get("confidence"), + words=[ + Conversation.Word( + word=w.get("word", ""), + start=w.get("start", 0.0), + end=w.get("end", 0.0), + confidence=w.get("confidence") + ) + for w in seg.get("words", []) + ] + ) + for seg in segments_data + ] + + # Determine provider from streaming results + provider = final_transcript.get("provider", "deepgram") + + # Add streaming transcript as the initial version + conversation.add_transcript_version( + version_id=version_id, + transcript=transcript_text, + segments=segments, + provider=provider, + model=provider_str, # Provider name as model + processing_time_seconds=None, # Not applicable for streaming + metadata={ + "source": "streaming", + "chunk_count": final_transcript.get("chunk_count", 0), + "word_count": len(final_transcript.get("words", [])) + }, + set_as_active=True + ) + + # Save conversation with streaming transcript + await conversation.save() + logger.info( + f"✅ Saved streaming transcript: {len(transcript_text)} chars, " + f"{len(segments)} segments, {len(final_transcript.get('words', []))} words " + f"for conversation {conversation_id[:12]}" + ) + + # Enqueue post-conversation processing pipeline (no batch transcription needed - using streaming transcript) client_id = conversation.client_id if conversation else None job_ids = start_post_conversation_jobs( conversation_id=conversation_id, audio_uuid=session_id, user_id=user_id, - post_transcription=True, # Run batch transcription for streaming audio client_id=client_id # Pass client_id for UI tracking ) logger.info( - f"📥 Pipeline: transcribe({job_ids['transcription']}) → " - f"speaker({job_ids['speaker_recognition']}) → " - f"[memory({job_ids['memory']}) + title({job_ids['title_summary']})]" + f"📥 Pipeline: speaker({job_ids['speaker_recognition']}) → " + f"[memory({job_ids['memory']}) + title({job_ids['title_summary']})] → " + f"event({job_ids['event_dispatch']})" ) # Wait a moment to ensure jobs are registered in RQ diff --git a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py index 1d0da874..023426df 100644 --- a/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py +++ b/backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py @@ -424,7 +424,7 @@ async def transcribe_full_audio_job( version_id=version_id, transcript=transcript_text, segments=speaker_segments, - provider=Conversation.TranscriptProvider(provider_normalized), + provider=provider_normalized, # Now just a string, no enum constructor needed model=provider.name, processing_time_seconds=processing_time, metadata=metadata, diff --git a/extras/speaker-recognition/docker-compose.yml b/extras/speaker-recognition/docker-compose.yml index ea41de04..716ecc27 100644 --- a/extras/speaker-recognition/docker-compose.yml +++ b/extras/speaker-recognition/docker-compose.yml @@ -33,15 +33,15 @@ services: interval: 30s timeout: 10s retries: 3 + networks: + default: + aliases: + - speaker-service # GPU Profile Configuration speaker-service-gpu: <<: *base-speaker-service profiles: ["gpu"] - networks: - default: - aliases: - - speaker-service build: context: . dockerfile: Dockerfile @@ -54,6 +54,10 @@ services: - driver: nvidia count: all capabilities: [gpu] + networks: + default: + aliases: + - speaker-service # React Web UI web-ui: diff --git a/status.py b/status.py index 82e3f041..27318a60 100644 --- a/status.py +++ b/status.py @@ -43,8 +43,8 @@ def get_container_status(service_name: str) -> Dict[str, Any]: try: # Get container status using docker compose ps - # Use 'ps -a' to get all containers regardless of profile - cmd = ['docker', 'compose', 'ps', '-a', '--format', 'json'] + # Only check containers from active profiles (excludes inactive profile services) + cmd = ['docker', 'compose', 'ps', '--format', 'json'] result = subprocess.run( cmd, diff --git a/tests/endpoints/plugin_tests.robot b/tests/endpoints/plugin_tests.robot index 893cd9fb..39cd858d 100644 --- a/tests/endpoints/plugin_tests.robot +++ b/tests/endpoints/plugin_tests.robot @@ -95,7 +95,7 @@ Event Subscription Matching Batch Transcription Should Trigger Batch Event [Documentation] Verify batch transcription triggers transcript.batch event - [Tags] audio-upload + [Tags] audio-upload requires-api-keys # Upload audio file for batch processing ${result}= Upload Single Audio File @@ -113,7 +113,7 @@ Batch Transcription Should Trigger Batch Event Streaming Transcription Should Trigger Streaming Event [Documentation] Verify streaming transcription triggers transcript.streaming event - [Tags] audio-streaming + [Tags] audio-streaming requires-api-keys # Note: This would require WebSocket streaming test infrastructure # The event dispatch happens in deepgram_stream_consumer.py:309 diff --git a/tests/integration/websocket_streaming_tests.robot b/tests/integration/websocket_streaming_tests.robot index 2edec480..9f910b7a 100644 --- a/tests/integration/websocket_streaming_tests.robot +++ b/tests/integration/websocket_streaming_tests.robot @@ -102,7 +102,8 @@ Conversation Closes On Inactivity Timeout And Restarts Speech Detection [Documentation] Verify that after SPEECH_INACTIVITY_THRESHOLD_SECONDS of silence, ... the open_conversation job closes with timeout_triggered=True, ... a new speech_detection job is created for the next conversation, - ... and post-conversation jobs are enqueued (transcription, speaker, memory, title). + ... and post-conversation jobs are enqueued (speaker, memory, title). + ... Note: Streaming conversations use streaming transcript (no batch transcription). ... ... Test environment sets SPEECH_INACTIVITY_THRESHOLD_SECONDS=5 in docker-compose-test.yml. [Tags] audio-streaming queue conversation @@ -147,12 +148,11 @@ Conversation Closes On Inactivity Timeout And Restarts Speech Detection Log To Console New speech detection job created for next conversation # Verify post-conversation jobs were enqueued (linked by conversation_id, not client_id) - # These jobs process the completed conversation: transcription, speaker recognition, memory, title - ${transcription_jobs}= Wait Until Keyword Succeeds 30s 2s - ... Job Type Exists For Conversation transcribe_full_audio_job ${conversation_id} - Log To Console Post-conversation transcription job enqueued + # These jobs process the completed conversation: speaker recognition, memory, title + # Note: Streaming conversations no longer have batch transcription - transcript comes from streaming + Log To Console Verifying post-conversation jobs (speaker, memory, title)... - # Speaker recognition job should also be created + # Speaker recognition job should be created ${speaker_jobs}= Get Jobs By Type And Conversation recognise_speakers_job ${conversation_id} Log To Console Speaker recognition jobs found: ${speaker_jobs.__len__()} diff --git a/tests/resources/queue_keywords.robot b/tests/resources/queue_keywords.robot index 734d7857..98012d9a 100644 --- a/tests/resources/queue_keywords.robot +++ b/tests/resources/queue_keywords.robot @@ -72,7 +72,7 @@ Check job status # Fail fast if job is in failed state when we're expecting completed IF '${actual_status}' == 'failed' and '${expected_status}' == 'completed' - ${error_msg}= Evaluate $job.get('exc_info') or $job.get('error', 'Unknown error') + ${error_msg}= Evaluate $job.get('error_message') or $job.get('exc_info') or $job.get('error', 'Unknown error') Fail Job ${job_id} failed: ${error_msg} END diff --git a/wizard.py b/wizard.py index 7e5e400c..10a53d6f 100755 --- a/wizard.py +++ b/wizard.py @@ -249,6 +249,9 @@ def run_service_setup(service_name, selected_services, https_enabled=False, serv # For speaker-recognition, pass HF_TOKEN from centralized configuration if service_name == 'speaker-recognition': + # Define the speaker env path + speaker_env_path = 'extras/speaker-recognition/.env' + # HF Token should have been provided via setup_hf_token_if_needed() if hf_token: cmd.extend(['--hf-token', hf_token])