Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions backends/advanced/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
FROM python:3.12-slim-bookworm AS builder
# ============================================
# Base stage - common setup
# ============================================
FROM python:3.12-slim-bookworm AS base

# Install system dependencies for building
# Install system dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
Expand All @@ -9,39 +12,59 @@ RUN apt-get update && \
curl \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
# portaudio19-dev \

# Install uv
COPY --from=ghcr.io/astral-sh/uv:0.6.10 /uv /uvx /bin/

# Set up the working directory
# Set up working directory
WORKDIR /app

# Copy package structure and dependency files first
# Copy package structure and dependency files
COPY pyproject.toml README.md ./
COPY uv.lock .
RUN mkdir -p src/advanced_omi_backend
COPY src/advanced_omi_backend/__init__.py src/advanced_omi_backend/

# Install dependencies using uv with deepgram extra
# Use cache mount for BuildKit, fallback for legacy builds
# RUN --mount=type=cache,target=/root/.cache/uv \
# uv sync --extra deepgram
# Fallback for legacy Docker builds (CI compatibility)

# ============================================
# Production stage - production dependencies only
# ============================================
FROM base AS prod

# Install production dependencies only
RUN uv sync --extra deepgram

# Copy all application code
COPY . .

# Copy configuration files if they exist, otherwise they will be created from templates at runtime
# The files are expected to exist, but we handle the case where they don't gracefully

# Copy configuration files if they exist
COPY diarization_config.json* ./

# Copy and make startup script executable
COPY start.sh ./
RUN chmod +x start.sh

# Run the application
CMD ["./start.sh"]


# ============================================
# Dev/Test stage - includes test dependencies
# ============================================
FROM base AS dev

# Install production + test dependencies
RUN uv sync --extra deepgram --group test

# Copy all application code
COPY . .

# Copy configuration files if they exist
COPY diarization_config.json* ./

# Copy and make startup script executable
COPY start.sh ./
RUN chmod +x start.sh

# Run the application with workers
# Run the application
CMD ["./start.sh"]
12 changes: 9 additions & 3 deletions backends/advanced/docker-compose-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ services:
build:
context: .
dockerfile: Dockerfile
target: dev # Use dev stage with test dependencies
command: ["./start.sh", "--test"]
ports:
- "8001:8000" # Avoid conflict with dev on 8000
volumes:
Expand All @@ -15,6 +17,7 @@ services:
- ./data/test_debug_dir:/app/debug_dir
- ./data/test_data:/app/data
- ${CONFIG_FILE:-../../config/config.yml}:/app/config.yml # Mount config.yml for model registry and memory settings (writable for admin config updates)
- ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/plugins.yml # Mount test plugins config
environment:
# Override with test-specific settings
- MONGODB_URI=mongodb://mongo-test:27017/test_db
Expand Down Expand Up @@ -154,14 +157,16 @@ services:
build:
context: .
dockerfile: Dockerfile
command: ["uv", "run", "python", "worker_orchestrator.py"]
target: dev # Use dev stage with test dependencies
command: ["uv", "run", "--group", "test", "python", "worker_orchestrator.py"]
volumes:
- ./src:/app/src
- ./worker_orchestrator.py:/app/worker_orchestrator.py
- ./data/test_audio_chunks:/app/audio_chunks
- ./data/test_debug_dir:/app/debug_dir
- ./data/test_data:/app/data
- ${CONFIG_FILE:-../../config/config.yml}:/app/config.yml # Mount config.yml for model registry and memory settings (writable for admin config updates)
- ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/plugins.yml # Mount test plugins config
environment:
# Same environment as backend
- MONGODB_URI=mongodb://mongo-test:27017/test_db
Expand Down Expand Up @@ -205,13 +210,14 @@ services:
build:
context: .
dockerfile: Dockerfile
target: dev # Use dev stage with test dependencies
command: >
uv run python -m advanced_omi_backend.workers.audio_stream_deepgram_streaming_worker
uv run --group test python -m advanced_omi_backend.workers.audio_stream_deepgram_streaming_worker
volumes:
- ./src:/app/src
- ./data/test_data:/app/data
- ${CONFIG_FILE:-../../config/config.yml}:/app/config.yml
- ${PLUGINS_CONFIG:-../../config/plugins.yml}:/app/plugins.yml
- ${PLUGINS_CONFIG:-../../tests/config/plugins.test.yml}:/app/plugins.yml # Mount test plugins config
environment:
- DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY}
- REDIS_URL=redis://redis-test:6379/0
Expand Down
3 changes: 3 additions & 0 deletions backends/advanced/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ services:
build:
context: .
dockerfile: Dockerfile
target: prod # Use prod stage without test dependencies
ports:
- "8000:8000"
env_file:
Expand Down Expand Up @@ -84,6 +85,7 @@ services:
build:
context: .
dockerfile: Dockerfile
target: prod # Use prod stage without test dependencies
command: ["uv", "run", "python", "worker_orchestrator.py"]
env_file:
- .env
Expand Down Expand Up @@ -124,6 +126,7 @@ services:
build:
context: .
dockerfile: Dockerfile
target: prod # Use prod stage without test dependencies
command: >
uv run python -m advanced_omi_backend.workers.audio_stream_deepgram_streaming_worker
env_file:
Expand Down
1 change: 1 addition & 0 deletions backends/advanced/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,5 @@ test = [
"requests-mock>=1.12.1",
"pytest-json-report>=1.5.0",
"pytest-html>=4.0.0",
"aiosqlite>=0.20.0", # For test plugin event storage
]
8 changes: 4 additions & 4 deletions backends/advanced/src/advanced_omi_backend/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
class PluginContext:
"""Context passed to plugin execution"""
user_id: str
access_level: str
data: Dict[str, Any] # Access-level specific data
event: str # Event name (e.g., "transcript.streaming", "conversation.complete")
data: Dict[str, Any] # Event-specific data
metadata: Dict[str, Any] = field(default_factory=dict)


Expand Down Expand Up @@ -54,11 +54,11 @@ def __init__(self, config: Dict[str, Any]):

Args:
config: Plugin configuration from config/plugins.yml
Contains: enabled, access_level, trigger, and plugin-specific config
Contains: enabled, subscriptions, trigger, and plugin-specific config
"""
self.config = config
self.enabled = config.get('enabled', False)
self.access_level = config.get('access_level')
self.subscriptions = config.get('subscriptions', [])
self.trigger = config.get('trigger', {'type': 'always'})

@abstractmethod
Expand Down
67 changes: 26 additions & 41 deletions backends/advanced/src/advanced_omi_backend/plugins/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,84 +84,69 @@ def extract_command_after_wake_word(transcript: str, wake_word: str) -> str:


class PluginRouter:
"""Routes pipeline events to appropriate plugins based on access level and triggers"""
"""Routes pipeline events to appropriate plugins based on event subscriptions"""

def __init__(self):
self.plugins: Dict[str, BasePlugin] = {}
# Index plugins by access level for fast lookup
self._plugins_by_level: Dict[str, List[str]] = {
'transcript': [],
'streaming_transcript': [],
'conversation': [],
'memory': []
}
# Index plugins by event subscription for fast lookup
self._plugins_by_event: Dict[str, List[str]] = {}

def register_plugin(self, plugin_id: str, plugin: BasePlugin):
"""Register a plugin with the router"""
self.plugins[plugin_id] = plugin

# Index by access level
access_level = plugin.access_level
if access_level in self._plugins_by_level:
self._plugins_by_level[access_level].append(plugin_id)
# Index by each event subscription
for event in plugin.subscriptions:
if event not in self._plugins_by_event:
self._plugins_by_event[event] = []
self._plugins_by_event[event].append(plugin_id)

logger.info(f"Registered plugin '{plugin_id}' for access level '{access_level}'")
logger.info(f"Registered plugin '{plugin_id}' for events: {plugin.subscriptions}")

async def trigger_plugins(
async def dispatch_event(
self,
access_level: str,
event: str,
user_id: str,
data: Dict,
metadata: Optional[Dict] = None
) -> List[PluginResult]:
"""
Trigger all plugins registered for this access level.
Dispatch event to all subscribed plugins.

Args:
access_level: 'transcript', 'streaming_transcript', 'conversation', or 'memory'
event: Event name (e.g., 'transcript.streaming', 'conversation.complete')
user_id: User ID for context
data: Access-level specific data
data: Event-specific data
metadata: Optional metadata

Returns:
List of plugin results
"""
results = []

# Hierarchical triggering logic:
# - 'streaming_transcript': trigger both 'streaming_transcript' AND 'transcript' plugins
# - 'transcript': trigger ONLY 'transcript' plugins (not 'streaming_transcript')
# - Other levels: exact match only
if access_level == 'streaming_transcript':
# Streaming mode: trigger both streaming_transcript AND transcript plugins
plugin_ids = (
self._plugins_by_level.get('streaming_transcript', []) +
self._plugins_by_level.get('transcript', [])
)
else:
# Batch mode or other modes: exact match only
plugin_ids = self._plugins_by_level.get(access_level, [])
# Get plugins subscribed to this event
plugin_ids = self._plugins_by_event.get(event, [])

for plugin_id in plugin_ids:
plugin = self.plugins[plugin_id]

if not plugin.enabled:
continue

# Check trigger condition
# Check trigger condition (wake_word, etc.)
if not await self._should_trigger(plugin, data):
continue

# Execute plugin at appropriate access level
# Execute plugin
try:
context = PluginContext(
user_id=user_id,
access_level=access_level,
event=event,
data=data,
metadata=metadata or {}
)

result = await self._execute_plugin(plugin, access_level, context)
result = await self._execute_plugin(plugin, event, context)

if result:
results.append(result)
Expand Down Expand Up @@ -218,16 +203,16 @@ async def _should_trigger(self, plugin: BasePlugin, data: Dict) -> bool:
async def _execute_plugin(
self,
plugin: BasePlugin,
access_level: str,
event: str,
context: PluginContext
) -> Optional[PluginResult]:
"""Execute plugin method for specified access level"""
# Both 'transcript' and 'streaming_transcript' call on_transcript()
if access_level in ('transcript', 'streaming_transcript'):
"""Execute plugin method for specified event"""
# Map events to plugin callback methods
if event.startswith('transcript.'):
return await plugin.on_transcript(context)
elif access_level == 'conversation':
elif event.startswith('conversation.'):
return await plugin.on_conversation_complete(context)
elif access_level == 'memory':
elif event.startswith('memory.'):
return await plugin.on_memory_processed(context)

return None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Test Event Plugin for integration testing"""

from .plugin import TestEventPlugin

__all__ = ['TestEventPlugin']
Loading
Loading