Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
441 changes: 379 additions & 62 deletions .github/workflows/README.md

Large diffs are not rendered by default.

59 changes: 28 additions & 31 deletions Docs/audio-pipeline-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,8 @@ Session Starts
└─────────────┬───────────────────┘
↓ (when conversation ends)
┌─────────────────────────────────┐
│ Post-Conversation Pipeline │ ← Parallel batch jobs
│ Post-Conversation Pipeline │
├─────────────────────────────────┤
│ • transcribe_full_audio_job │
│ • recognize_speakers_job │
│ • memory_extraction_job │
│ • generate_title_summary_job │
Expand Down Expand Up @@ -597,32 +596,16 @@ Session Starts

### Post-Conversation Pipeline

All jobs run **in parallel** after conversation completes:
**Streaming conversations**: Use streaming transcript saved during conversation. No batch re-transcription.

#### 1. Transcribe Full Audio Job
**File uploads**: Batch transcription job runs first, then post-conversation jobs depend on it.

**File**: `backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py`

**Function**: `transcribe_full_audio_job()`

**Input**: Audio file from disk (`data/chunks/*.wav`)

**Process**:
- Batch transcribes entire conversation audio
- Validates meaningful speech
- Marks conversation `deleted` if no speech detected
- Stores transcript, segments, words in MongoDB

**Container**: `rq-worker`

#### 2. Recognize Speakers Job
#### 1. Recognize Speakers Job

**File**: `backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py`

**Function**: `recognize_speakers_job()`

**Prerequisite**: `transcribe_full_audio_job` completes

**Process**:
- Sends audio + segments to speaker recognition service
- Identifies speakers using voice embeddings
Expand All @@ -634,13 +617,13 @@ All jobs run **in parallel** after conversation completes:

**External Service**: `speaker-recognition` container (if enabled)

#### 3. Memory Extraction Job
#### 2. Memory Extraction Job

**File**: `backends/advanced/src/advanced_omi_backend/workers/memory_jobs.py`

**Function**: `memory_extraction_job()`

**Prerequisite**: `transcribe_full_audio_job` completes
**Prerequisite**: Speaker recognition job

**Process**:
- Uses LLM (OpenAI/Ollama) to extract semantic facts
Expand All @@ -654,32 +637,46 @@ All jobs run **in parallel** after conversation completes:
- `ollama` or OpenAI API (LLM)
- `qdrant` or OpenMemory MCP (vector storage)

#### 4. Generate Title Summary Job
#### 3. Generate Title Summary Job

**File**: `backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py`

**Function**: `generate_title_summary_job()`

**Prerequisite**: `transcribe_full_audio_job` completes
**Prerequisite**: Speaker recognition job

**Process**:
- Uses LLM to generate:
- Title (short summary)
- Summary (1-2 sentences)
- Detailed summary (paragraph)
- Uses LLM to generate title, summary, detailed summary
- Updates conversation document in MongoDB

**Container**: `rq-worker`

#### 5. Dispatch Conversation Complete Event
#### 4. Dispatch Conversation Complete Event

**File**: `backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py`

**Function**: `dispatch_conversation_complete_event_job()`

**Process**:
- Triggers `conversation.complete` plugin event
- Only runs for **file uploads** (not streaming sessions)

**Container**: `rq-worker`

#### Batch Transcription Job

**File**: `backends/advanced/src/advanced_omi_backend/workers/transcription_jobs.py`

**Function**: `transcribe_full_audio_job()`

**When used**:
- File uploads via `/api/process-audio-files`
- Manual reprocessing via `/api/conversations/{id}/reprocess-transcript`
- NOT used for streaming conversations

**Process**:
- Reconstructs audio from MongoDB chunks
- Batch transcribes entire audio
- Stores transcript with word-level timestamps

**Container**: `rq-worker`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,38 @@ async def upload_and_process_audio_files(
exc_info=True
)

# Enqueue post-conversation processing job chain
from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs
# Enqueue batch transcription job first (file uploads need transcription)
from advanced_omi_backend.controllers.queue_controller import (
start_post_conversation_jobs,
transcription_queue,
JOB_RESULT_TTL
)
from advanced_omi_backend.workers.transcription_jobs import transcribe_full_audio_job

version_id = str(uuid.uuid4())
transcribe_job_id = f"transcribe_{conversation_id[:12]}"

transcription_job = transcription_queue.enqueue(
transcribe_full_audio_job,
conversation_id,
audio_uuid,
version_id,
"batch", # trigger
job_timeout=1800, # 30 minutes
result_ttl=JOB_RESULT_TTL,
job_id=transcribe_job_id,
description=f"Transcribe uploaded file {conversation_id[:8]}",
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'client_id': client_id}
)

audio_logger.info(f"📥 Enqueued transcription job {transcription_job.id} for uploaded file")

# Enqueue post-conversation processing job chain (depends on transcription)
job_ids = start_post_conversation_jobs(
conversation_id=conversation_id,
audio_uuid=audio_uuid,
user_id=user.user_id,
post_transcription=True, # Run batch transcription for uploads
depends_on_job=transcription_job, # Wait for transcription to complete
client_id=client_id # Pass client_id for UI tracking
)

Expand All @@ -166,15 +190,15 @@ async def upload_and_process_audio_files(
"status": "processing",
"audio_uuid": audio_uuid,
"conversation_id": conversation_id,
"transcript_job_id": job_ids['transcription'],
"transcript_job_id": transcription_job.id,
"speaker_job_id": job_ids['speaker_recognition'],
"memory_job_id": job_ids['memory'],
"duration_seconds": round(duration, 2),
})

audio_logger.info(
f"✅ Processed {file.filename} → conversation {conversation_id}, "
f"jobs: {job_ids['transcription']} → {job_ids['speaker_recognition']} → {job_ids['memory']}"
f"jobs: {transcription_job.id} → {job_ids['speaker_recognition']} → {job_ids['memory']}"
)

except (OSError, IOError) as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ def start_post_conversation_jobs(
conversation_id: str,
audio_uuid: str,
user_id: str,
post_transcription: bool = True,
transcript_version_id: Optional[str] = None,
depends_on_job = None,
client_id: Optional[str] = None
Expand All @@ -376,27 +375,25 @@ def start_post_conversation_jobs(
Start post-conversation processing jobs after conversation is created.

This creates the standard processing chain after a conversation is created:
1. [Optional] Transcription job - Batch transcription (if post_transcription=True)
2. Speaker recognition job - Identifies speakers in audio
3. Memory extraction job - Extracts memories from conversation (parallel)
4. Title/summary generation job - Generates title and summary (parallel)
1. Speaker recognition job - Identifies speakers in audio segments
2. Memory extraction job - Extracts memories from conversation
3. Title/summary generation job - Generates title and summary
4. Event dispatch job - Triggers conversation.complete plugins

Note: Audio is reconstructed from MongoDB chunks, not files.
Note: Batch transcription removed - streaming conversations use streaming transcript.
For file uploads, batch transcription must be enqueued separately before calling this function.

Args:
conversation_id: Conversation identifier
audio_uuid: Audio UUID for job tracking
user_id: User identifier
post_transcription: If True, run batch transcription step (for uploads)
If False, skip transcription (streaming already has it)
transcript_version_id: Transcript version ID (auto-generated if None)
depends_on_job: Optional job dependency for first job
depends_on_job: Optional job dependency for first job (e.g., transcription for file uploads)
client_id: Client ID for UI tracking

Returns:
Dict with job IDs (transcription will be None if post_transcription=False)
Dict with job IDs for speaker_recognition, memory, title_summary, event_dispatch
"""
from advanced_omi_backend.workers.transcription_jobs import transcribe_full_audio_job
from advanced_omi_backend.workers.speaker_jobs import recognise_speakers_job
from advanced_omi_backend.workers.memory_jobs import process_memory_job
from advanced_omi_backend.workers.conversation_jobs import generate_title_summary_job, dispatch_conversation_complete_event_job
Expand All @@ -408,30 +405,7 @@ def start_post_conversation_jobs(
if client_id:
job_meta['client_id'] = client_id

# Step 1: Batch transcription job (ALWAYS run to get correct conversation-relative timestamps)
# Even for streaming, we need batch transcription before cropping to fix cumulative timestamps
transcribe_job_id = f"transcribe_{conversation_id[:12]}"
logger.info(f"🔍 DEBUG: Creating transcribe job with job_id={transcribe_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}")

transcription_job = transcription_queue.enqueue(
transcribe_full_audio_job,
conversation_id,
audio_uuid,
version_id,
"batch", # trigger
job_timeout=1800, # 30 minutes
result_ttl=JOB_RESULT_TTL,
depends_on=depends_on_job,
job_id=transcribe_job_id,
description=f"Transcribe conversation {conversation_id[:8]}",
meta=job_meta
)
logger.info(f"📥 RQ: Enqueued transcription job {transcription_job.id}, meta={transcription_job.meta}")

# Speaker recognition depends on transcription (no cropping step)
speaker_depends_on = transcription_job

# Step 2: Speaker recognition job
# Step 1: Speaker recognition job (uses streaming transcript from conversation document)
speaker_job_id = f"speaker_{conversation_id[:12]}"
logger.info(f"🔍 DEBUG: Creating speaker job with job_id={speaker_job_id}, conversation_id={conversation_id[:12]}, audio_uuid={audio_uuid[:12]}")

Expand All @@ -441,12 +415,15 @@ def start_post_conversation_jobs(
version_id,
job_timeout=1200, # 20 minutes
result_ttl=JOB_RESULT_TTL,
depends_on=speaker_depends_on,
depends_on=depends_on_job, # Direct dependency (no transcription job)
job_id=speaker_job_id,
description=f"Speaker recognition for conversation {conversation_id[:8]}",
meta=job_meta
)
logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (depends on {speaker_depends_on.id})")
if depends_on_job:
logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (depends on {depends_on_job.id})")
else:
logger.info(f"📥 RQ: Enqueued speaker recognition job {speaker_job.id}, meta={speaker_job.meta} (no dependencies, starts immediately)")

# Step 3: Memory extraction job (parallel with title/summary)
memory_job_id = f"memory_{conversation_id[:12]}"
Expand Down Expand Up @@ -504,7 +481,6 @@ def start_post_conversation_jobs(
logger.info(f"📥 RQ: Enqueued conversation complete event job {event_dispatch_job.id}, meta={event_dispatch_job.meta} (depends on {memory_job.id} and {title_summary_job.id})")

return {
'transcription': transcription_job.id if transcription_job else None,
'speaker_recognition': speaker_job.id,
'memory': memory_job.id,
'title_summary': title_summary_job.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,20 +890,44 @@ async def _process_batch_audio_complete(
)
# Continue anyway - transcription job will handle it

# Enqueue post-conversation processing job chain
from advanced_omi_backend.controllers.queue_controller import start_post_conversation_jobs
# Enqueue batch transcription job first (file uploads need transcription)
from advanced_omi_backend.controllers.queue_controller import (
start_post_conversation_jobs,
transcription_queue,
JOB_RESULT_TTL
)
from advanced_omi_backend.workers.transcription_jobs import transcribe_full_audio_job

version_id = str(uuid.uuid4())
transcribe_job_id = f"transcribe_{conversation_id[:12]}"

transcription_job = transcription_queue.enqueue(
transcribe_full_audio_job,
conversation_id,
audio_uuid,
version_id,
"batch", # trigger
job_timeout=1800, # 30 minutes
result_ttl=JOB_RESULT_TTL,
job_id=transcribe_job_id,
description=f"Transcribe batch audio {conversation_id[:8]}",
meta={'audio_uuid': audio_uuid, 'conversation_id': conversation_id, 'client_id': client_id}
)

application_logger.info(f"📥 Batch mode: Enqueued transcription job {transcription_job.id}")

# Enqueue post-conversation processing job chain (depends on transcription)
job_ids = start_post_conversation_jobs(
conversation_id=conversation_id,
audio_uuid=audio_uuid,
user_id=None, # Will be read from conversation in DB by jobs
post_transcription=True, # Run batch transcription for uploads
depends_on_job=transcription_job, # Wait for transcription to complete
client_id=client_id # Pass client_id for UI tracking
)

application_logger.info(
f"✅ Batch mode: Enqueued job chain for {conversation_id} - "
f"transcription ({job_ids['transcription']}) → "
f"transcription ({transcription_job.id}) → "
f"speaker ({job_ids['speaker_recognition']}) → "
f"memory ({job_ids['memory']})"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,21 @@ class EndReason(str, Enum):
UNKNOWN = "unknown" # Unknown or legacy reason

# Nested Models
class Word(BaseModel):
"""Individual word with timestamp in a transcript."""
word: str = Field(description="Word text")
start: float = Field(description="Start time in seconds")
end: float = Field(description="End time in seconds")
confidence: Optional[float] = Field(None, description="Confidence score (0-1)")

class SpeakerSegment(BaseModel):
"""Individual speaker segment in a transcript."""
start: float = Field(description="Start time in seconds")
end: float = Field(description="End time in seconds")
text: str = Field(description="Transcript text for this segment")
speaker: str = Field(description="Speaker identifier")
confidence: Optional[float] = Field(None, description="Confidence score (0-1)")
words: List["Conversation.Word"] = Field(default_factory=list, description="Word-level timestamps for this segment")

class TranscriptVersion(BaseModel):
"""Version of a transcript with processing metadata."""
Expand Down
Loading
Loading