Production-ready orchestration for OpenAI Agents SDK with Redis-backed task queues, SQLAlchemy activity tracking, and multiprocessing worker pools.
You've built an AI agent. It works great in development. Now you need to run it in production:
Customer Support Automation - A user submits a ticket. Your agent needs to research their account, check previous interactions, and draft a response. This takes 2-3 minutes. You can't block the HTTP request.
Document Processing Pipeline - Users upload contracts for analysis. Each document needs OCR, entity extraction, clause identification, and risk scoring. You need to process dozens concurrently while tracking progress.
Research & Reporting - Your agent researches companies, gathers data from multiple sources, and generates reports. Users need to see "Gathering financials... 40%" not just a spinning loader.
Multi-Agent Workflows - One agent discovers leads, fans out to research each one, then a final agent aggregates results. You need coordination, not chaos.
Running AI agents in production requires:
- Background execution - Agents take minutes; users shouldn't wait
- Progress tracking - Know what your agents are doing in real-time
- Fault tolerance - Handle failures gracefully with full error traces
- Scalability - Process multiple tasks across worker processes
- Observability - Complete audit trail of agent activities
- User interfaces - Components to build status dashboards and CLI monitors
agentexec provides all of this out of the box.
uv add agentexecRequirements:
- Python 3.12+
- Redis
- SQLAlchemy-compatible database (PostgreSQL, MySQL, SQLite)
A typical agentexec application has a few files working together. Here's a complete working example showing each part:
# db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax
ax.CONF.redis_url = "redis://localhost:6379/0"
engine = create_engine("sqlite:///agents.db")
ax.Base.metadata.create_all(engine) # Creates activity tracking tables
def get_db():
with Session(engine) as db:
yield db# worker.py
from uuid import UUID
from pydantic import BaseModel
from agents import Agent
import agentexec as ax
from .db import engine
class ResearchContext(BaseModel):
company: str
pool = ax.Pool(engine=engine)
@pool.task("research_company")
async def research_company(agent_id: UUID, context: ResearchContext) -> str:
runner = ax.OpenAIRunner(agent_id)
agent = Agent(
name="Research Agent",
instructions=f"Research {context.company}. {runner.prompts.report_status}",
tools=[runner.tools.report_status], # Agent can report its own progress
model="gpt-4o",
)
result = await runner.run(agent, input="Begin research")
return result.final_output
if __name__ == "__main__":
pool.run()# views.py
from uuid import UUID
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
import agentexec as ax
from .worker import ResearchContext
from .db import get_db
router = APIRouter()
@router.post("/research")
async def start_research(company: str) -> dict:
task = await ax.enqueue("research_company", ResearchContext(company=company))
return {"agent_id": str(task.agent_id), "status": "queued"} # Return agent_id for status polling
@router.get("/research/{agent_id}")
def get_status(agent_id: UUID, db: Session = Depends(get_db)) -> ax.activity.ActivityDetailSchema:
return ax.activity.detail(db, agent_id=agent_id) # Query by agent_idpython worker.pyThat's it. Tasks are queued to Redis, workers process them in parallel, progress is tracked in your database, and your API stays responsive.
Every task gets full lifecycle tracking without manual updates:
runner = ax.OpenAIRunner(agent_id=agent_id)
result = await runner.run(agent, input="...")
# Activity automatically transitions:
# QUEUED → RUNNING → COMPLETE (or ERROR on failure)Agents can report their own progress:
agent = Agent(
instructions=f"Do research. {runner.prompts.report_status}",
tools=[runner.tools.report_status],
)
# Agent calls: report_status("Analyzing financials", 60)Update progress explicitly from your task:
ax.activity.update(agent_id, "Processing batch 3 of 10", percentage=30)Control task execution order:
await ax.enqueue("urgent_task", context, priority=ax.Priority.HIGH) # Front of queue
await ax.enqueue("batch_job", context, priority=ax.Priority.LOW) # Back of queueGracefully handle conversation limits:
runner = ax.OpenAIRunner(
agent_id=agent_id,
max_turns_recovery=True,
wrap_up_prompt="Please summarize your findings.",
)
result = await runner.run(agent, max_turns=15)
# If agent hits max turns, automatically continues with wrap-upOrchestrate complex workflows with parallel execution:
import asyncio
pipeline = ax.Pipeline(pool)
class ResearchPipeline(pipeline.Base):
@pipeline.step(0, "parallel research")
async def gather_data(self, context: InputContext) -> tuple[BrandResult, MarketResult]:
return await asyncio.gather(
research_brand(context),
research_market(context),
)
@pipeline.step(1, "analysis")
async def analyze(self, brand: BrandResult, market: MarketResult) -> FinalReport:
return await analyze_results(brand, market)
# Queue pipeline
task = await pipeline.enqueue(context=InputContext(company="Anthropic"))Coordinate dynamically-queued tasks:
tracker = ax.Tracker("research", batch_id)
@function_tool
async def queue_research(company: str) -> None:
"""Discovery agent calls this for each company found."""
tracker.incr()
await ax.enqueue("research", ResearchContext(company=company, batch_id=batch_id))
@function_tool
async def save_result(result: ResearchResult) -> None:
"""Research agent calls this when done."""
save_to_database(result)
tracker.decr()
if tracker.complete:
await ax.enqueue("aggregate", AggregateContext(batch_id=batch_id))If you have an existing FastAPI/Flask/Django backend, run the worker pool in a separate process:
# main.py - Your API server
from fastapi import FastAPI
import agentexec as ax
app = FastAPI()
@app.post("/process")
async def process(data: str) -> dict:
task = await ax.enqueue("process_data", ProcessContext(data=data))
return {"agent_id": task.agent_id}# worker.py - Run separately
from .tasks import pool
if __name__ == "__main__":
pool.run()Terminal 1: Start your API server
uvicorn main:appTerminal 2: Start the workers
python worker.py# worker.py
import os
from uuid import UUID
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax
engine = create_engine(os.environ["DATABASE_URL"])
pool = ax.Pool(engine=engine)
@pool.task("my_task")
async def my_task(agent_id: UUID, context: MyContext) -> None:
# Your task implementation
pass
if __name__ == "__main__":
try:
pool.run()
except KeyboardInterrupt:
with Session(engine) as db:
ax.activity.cancel_pending(db)1. Create your worker Dockerfile:
# Dockerfile.worker
FROM ghcr.io/agent-ci/agentexec-worker:latest
COPY ./src /app/src
ENV AGENTEXEC_WORKER_MODULE=src.worker2. Create your worker module:
# src/worker.py
import atexit
import os
from uuid import UUID
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax
engine = create_engine(os.environ["DATABASE_URL"])
pool = ax.Pool(engine=engine)
def cleanup() -> None:
with Session(engine) as db:
ax.activity.cancel_pending(db)
atexit.register(cleanup)
@pool.task("my_task")
async def my_task(agent_id: UUID, context: MyContext) -> None:
pass3. Build and run:
docker build -f Dockerfile.worker -t my-worker .docker run -e DATABASE_URL=... -e REDIS_URL=... -e OPENAI_API_KEY=... my-workeragentexec uses Redis for task queuing, result storage, real-time log streaming, and coordination between workers. We chose Redis because it provides exactly the primitives we need (lists, pubsub, atomic counters) with minimal operational overhead.
AWS Compatible: Since we use standard Redis features, AWS ElastiCache works out of the box.
AGENTEXEC_REDIS_URL=redis://localhost:6379/0
# or
AGENTEXEC_REDIS_URL=redis://my-cluster.abc123.use1.cache.amazonaws.com:6379The state backend is pluggable. We're adding support for additional backends (DynamoDB, PostgreSQL, in-memory for testing). You can also implement your own:
AGENTEXEC_STATE_BACKEND=agentexec.state.redis_backend # Default
AGENTEXEC_STATE_BACKEND=myapp.state.dynamodb_backend # CustomActivity tracking uses SQLAlchemy with two tables:
agentexec_activity - Main activity records
agent_id- Unique identifier (UUID)agent_type- Task namecreated_at,updated_at- Timestamps
agentexec_activity_log - Status and progress
activity_id- Foreign keymessage- Log messagestatus- QUEUED, RUNNING, COMPLETE, ERROR, CANCELEDpercentage- Progress (0-100)
The activity tracking module exposes Pydantic schemas for building APIs:
from agentexec.activity.schemas import (
ActivityListSchema, # Paginated list response
ActivityListItemSchema, # Single item in list (lightweight)
ActivityDetailSchema, # Full activity with log history
ActivityLogSchema, # Single log entry
)List activities:
with Session(engine) as db:
result = ax.activity.list(db, page=1, page_size=20)
# Returns ActivityListSchema:
# {
# "items": [...], # List of ActivityListItemSchema
# "total": 150,
# "page": 1,
# "page_size": 20,
# "total_pages": 8
# }Get activity detail:
activity = ax.activity.detail(db, agent_id=agent_id)
# Returns ActivityDetailSchema:
# {
# "id": "...",
# "agent_id": "...",
# "agent_type": "research_company",
# "created_at": "2024-01-15T10:30:00Z",
# "updated_at": "2024-01-15T10:32:45Z",
# "logs": [
# {"status": "queued", "message": "Waiting to start", "percentage": 0, ...},
# {"status": "running", "message": "Gathering data", "percentage": 30, ...},
# {"status": "complete", "message": "Done", "percentage": 100, ...}
# ]
# }Count active agents:
count = ax.activity.active_count(db)
# Returns number of agents with status QUEUED or RUNNING# cli_monitor.py
from rich.live import Live
from rich.table import Table
from sqlalchemy import Engine
from sqlalchemy.orm import Session
import agentexec as ax
def build_table(db: Session) -> Table:
table = Table(title=f"Active Agents: {ax.activity.active_count(db)}")
table.add_column("Status")
table.add_column("Task")
table.add_column("Message")
table.add_column("Progress")
for item in ax.activity.list(db, page=1, page_size=10).items:
table.add_row(
item.status,
item.agent_type,
item.latest_log_message or "",
f"{item.percentage}%",
)
return table
def monitor(engine: Engine) -> None:
with Live(refresh_per_second=1) as live:
while True:
with Session(engine) as db:
live.update(build_table(db))
if __name__ == "__main__":
from .db import engine
monitor(engine)The agentexec-ui package provides React components for building monitoring interfaces:
npm install agentexec-uiimport {
TaskList,
TaskDetail,
ActiveAgentsBadge,
StatusBadge,
ProgressBar,
} from 'agentexec-ui';
// Display paginated task list
<TaskList
items={activities.items}
loading={isLoading}
onTaskClick={(agentId) => setSelected(agentId)}
selectedAgentId={selectedId}
/>
// Full activity detail view
<TaskDetail
activity={activityDetail}
loading={isDetailLoading}
error={error}
onClose={() => setSelected(null)}
/>
// Active count badge
<ActiveAgentsBadge count={activeCount} loading={isLoading} />
// Individual status indicators
<StatusBadge status="running" />
<ProgressBar percentage={65} status="running" />import type {
Status, // 'queued' | 'running' | 'complete' | 'error' | 'canceled'
ActivityLog,
ActivityDetail,
ActivityListItem,
ActivityList,
} from 'agentexec-ui';These types mirror the Python API schemas (ActivityDetailSchema, ActivityListSchema, etc.), so your API responses integrate directly with the components.
The components are headless (no built-in styling) and work with any CSS framework. See examples/openai-agents-fastapi/ui/ for a complete React app with TanStack Query integration.
import agentexec as ax
task = await ax.enqueue(task_name, context, priority=ax.Priority.LOW)
result = await ax.get_result(task, timeout=300)
results = await ax.gather(task1, task2, task3)import agentexec as ax
pool = ax.Pool(engine=engine)
pool = ax.Pool(database_url="postgresql://...")
@pool.task("name")
async def handler(agent_id: UUID, context: MyContext) -> None: ...
pool.run() # Blocking - runs workers
pool.start() # Non-blocking - starts workers in background
pool.shutdown() # Graceful shutdownimport agentexec as ax
# Create activity (returns agent_id for tracking)
agent_id = ax.activity.create(task_name, message="Starting...")
# Update progress
ax.activity.update(agent_id, message, percentage=50)
ax.activity.complete(agent_id, message="Done")
ax.activity.error(agent_id, error="Failed: ...")
# Query activities
activities = ax.activity.list(db, page=1, page_size=20)
activity = ax.activity.detail(db, agent_id=agent_id)
count = ax.activity.active_count(db)
# Cleanup
canceled = ax.activity.cancel_pending(db)import agentexec as ax
runner = ax.OpenAIRunner(
agent_id=agent_id,
max_turns_recovery=True,
wrap_up_prompt="Summarize...",
)
runner.prompts.report_status # Instruction text for agents
runner.tools.report_status # Pre-bound function tool
result = await runner.run(agent, input="...", max_turns=15)
result = await runner.run_streamed(agent, input="...", max_turns=15)
# Base class for custom runners
class MyRunner(ax.BaseAgentRunner):
async def run(self, agent: Agent, input: str) -> RunResult: ...import agentexec as ax
pipeline = ax.Pipeline(pool)
class MyPipeline(pipeline.Base):
@pipeline.step(0, "description")
async def step_one(self, context): ...import agentexec as ax
tracker = ax.Tracker("name", batch_id)
tracker.incr()
if tracker.complete: ... # All tasks doneimport agentexec as ax
ax.Base # SQLAlchemy declarative base for activity tablesAll settings via environment variables:
# Redis (required)
AGENTEXEC_REDIS_URL=redis://localhost:6379/0
# Workers
AGENTEXEC_NUM_WORKERS=4
AGENTEXEC_QUEUE_NAME=agentexec_tasks
AGENTEXEC_GRACEFUL_SHUTDOWN_TIMEOUT=300
# Database
AGENTEXEC_TABLE_PREFIX=agentexec_
# Results
AGENTEXEC_RESULT_TTL=3600
# State backend
AGENTEXEC_STATE_BACKEND=agentexec.state.redis_backend
AGENTEXEC_KEY_PREFIX=agentexec
# Activity messages (customizable)
AGENTEXEC_ACTIVITY_MESSAGE_CREATE="Waiting to start."
AGENTEXEC_ACTIVITY_MESSAGE_STARTED="Task started."
AGENTEXEC_ACTIVITY_MESSAGE_COMPLETE="Task completed successfully."
AGENTEXEC_ACTIVITY_MESSAGE_ERROR="Task failed with error: {error}"# Clone repository
git clone https://github.com/Agent-CI/agentexec
cd agentexec
# Install dependencies
uv sync
# Run tests
uv run pytest
# Type checking
uv run ty check
# Linting
uv run ruff check src/
# Formatting
uv run ruff format src/- Fork the repository
- Create a feature branch
- Make your changes with tests
- Run
uv run pytestanduv run ty check - Submit a pull request
MIT License - see LICENSE for details.
- PyPI: agentexec
- npm: agentexec-ui
- Documentation: docs/
- Example App: examples/openai-agents-fastapi/
- Issues: GitHub Issues