Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ def __init__(self, name: str, file_path: str, variable_name: str):
self.file_path = file_path
self.variable_name = variable_name
self._context_manager: Any = None
self._loaded_object: Any = (
None # Store original loaded object for type inference
)

@classmethod
def from_path_string(cls, name: str, file_path: str) -> Self:
Expand Down Expand Up @@ -102,9 +99,6 @@ async def load(self) -> Agent:
category=UiPathErrorCategory.USER,
)

# Store the original loaded object for type inference
self._loaded_object = agent_object

agent = await self._resolve_agent(agent_object)
if not isinstance(agent, Agent):
raise UiPathOpenAIAgentsRuntimeError(
Expand Down Expand Up @@ -179,17 +173,6 @@ async def _resolve_agent(self, agent_object: Any) -> Agent:

return agent_instance

def get_loaded_object(self) -> Any:
"""
Get the original loaded object before agent resolution.

This is useful for extracting type annotations from wrapper functions.

Returns:
The original loaded object (could be an Agent, function, or callable)
"""
return self._loaded_object

async def cleanup(self) -> None:
"""Clean up resources (e.g., exit async context managers)."""
if self._context_manager:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Factory for creating OpenAI Agents runtimes from openai_agents.json configuration."""

import asyncio
import os
from typing import Any

from agents import Agent
Expand All @@ -21,7 +20,6 @@
UiPathOpenAIAgentsRuntimeError,
)
from uipath_openai_agents.runtime.runtime import UiPathOpenAIAgentRuntime
from uipath_openai_agents.runtime.storage import SqliteAgentStorage


class UiPathOpenAIAgentRuntimeFactory:
Expand All @@ -44,111 +42,13 @@ def __init__(
self._agent_loaders: dict[str, OpenAiAgentLoader] = {}
self._agent_lock = asyncio.Lock()

self._storage: SqliteAgentStorage | None = None
self._storage_lock = asyncio.Lock()

self._setup_instrumentation()

def _setup_instrumentation(self) -> None:
"""Setup tracing and instrumentation."""
OpenAIAgentsInstrumentor().instrument()
UiPathSpanUtils.register_current_span_provider(get_current_span_wrapper)

async def _get_or_create_storage(self) -> SqliteAgentStorage | None:
"""Get or create the shared storage instance.

Returns:
Shared storage instance, or None if storage is disabled
"""
async with self._storage_lock:
if self._storage is None:
storage_path = self._get_storage_path()
if storage_path:
self._storage = SqliteAgentStorage(storage_path)
await self._storage.setup()
return self._storage

def _remove_file_with_retry(self, path: str, max_attempts: int = 5) -> None:
"""Remove file with retry logic for Windows file locking.

OpenAI SDK uses sync sqlite3 which doesn't immediately release file locks
on Windows. This retry mechanism gives the OS time to release the lock.

Args:
path: Path to file to remove
max_attempts: Maximum number of retry attempts (default: 5)

Raises:
OSError: If file cannot be removed after all retries
"""
import time

for attempt in range(max_attempts):
try:
os.remove(path)
return # Success
except PermissionError:
if attempt == max_attempts - 1:
# Last attempt failed, re-raise
raise
# Exponential backoff: 0.1s, 0.2s, 0.4s, 0.8s
time.sleep(0.1 * (2**attempt))

def _get_storage_path(self) -> str | None:
"""Get the storage path for agent state.

Returns:
Path to SQLite database for storage, or None if storage is disabled
"""
if self.context.state_file_path is not None:
return self.context.state_file_path

if self.context.runtime_dir and self.context.state_file:
path = os.path.join(self.context.runtime_dir, self.context.state_file)
if (
not self.context.resume
and self.context.job_id is None
and not self.context.keep_state_file
):
# If not resuming and no job id, delete the previous state file
if os.path.exists(path):
self._remove_file_with_retry(path)
os.makedirs(self.context.runtime_dir, exist_ok=True)
return path

default_path = os.path.join("__uipath", "state.db")
os.makedirs(os.path.dirname(default_path), exist_ok=True)
return default_path

def _get_storage_path_legacy(self, runtime_id: str) -> str | None:
"""
Get the storage path for agent session state.

Args:
runtime_id: Unique identifier for the runtime instance

Returns:
Path to SQLite database for session storage, or None if storage is disabled
"""
if self.context.runtime_dir and self.context.state_file:
# Use state file name pattern but with runtime_id
base_name = os.path.splitext(self.context.state_file)[0]
file_name = f"{base_name}_{runtime_id}.db"
path = os.path.join(self.context.runtime_dir, file_name)

if not self.context.resume and self.context.job_id is None:
# If not resuming and no job id, delete the previous state file
if os.path.exists(path):
self._remove_file_with_retry(path)

os.makedirs(self.context.runtime_dir, exist_ok=True)
return path

# Default storage path
default_dir = os.path.join("__uipath", "sessions")
os.makedirs(default_dir, exist_ok=True)
return os.path.join(default_dir, f"{runtime_id}.db")

def _load_config(self) -> OpenAiAgentsConfig:
"""Load openai_agents.json configuration."""
if self._config is None:
Expand Down Expand Up @@ -299,22 +199,10 @@ async def _create_runtime_instance(
Returns:
Configured runtime instance
"""
# Get shared storage instance
storage = await self._get_or_create_storage()
storage_path = storage.storage_path if storage else None

# Get the loaded object from the agent loader for schema inference
loaded_object = None
if entrypoint in self._agent_loaders:
loaded_object = self._agent_loaders[entrypoint].get_loaded_object()

return UiPathOpenAIAgentRuntime(
agent=agent,
runtime_id=runtime_id,
entrypoint=entrypoint,
storage_path=storage_path,
loaded_object=loaded_object,
storage=storage,
)

async def new_runtime(
Expand Down Expand Up @@ -346,8 +234,3 @@ async def dispose(self) -> None:

self._agent_loaders.clear()
self._agent_cache.clear()

# Dispose shared storage
if self._storage:
await self._storage.dispose()
self._storage = None
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from agents import (
Agent,
Runner,
SQLiteSession,
)
from uipath.runtime import (
UiPathExecuteOptions,
Expand All @@ -27,7 +26,6 @@
from ._serialize import serialize_output
from .errors import UiPathOpenAIAgentsErrorCode, UiPathOpenAIAgentsRuntimeError
from .schema import get_agent_schema, get_entrypoints_schema
from .storage import SqliteAgentStorage


class UiPathOpenAIAgentRuntime:
Expand All @@ -40,9 +38,6 @@ def __init__(
agent: Agent,
runtime_id: str | None = None,
entrypoint: str | None = None,
storage_path: str | None = None,
loaded_object: Any | None = None,
storage: SqliteAgentStorage | None = None,
):
"""
Initialize the runtime.
Expand All @@ -51,16 +46,10 @@ def __init__(
agent: The OpenAI Agent to execute
runtime_id: Unique identifier for this runtime instance
entrypoint: Optional entrypoint name (for schema generation)
storage_path: Path to SQLite database for session persistence
loaded_object: Original loaded object (for schema inference)
storage: Optional storage instance for state persistence
"""
self.agent: Agent = agent
self.runtime_id: str = runtime_id or "default"
self.entrypoint: str | None = entrypoint
self.storage_path: str | None = storage_path
self.loaded_object: Any | None = loaded_object
self.storage: SqliteAgentStorage | None = storage

# Configure OpenAI Agents SDK to use Responses API
# UiPath supports both APIs via X-UiPath-LlmGateway-ApiFlavor header
Expand Down Expand Up @@ -207,54 +196,27 @@ async def _run_agent(
Runtime events if stream_events=True, then final result
"""
agent_input = self._prepare_agent_input(input)
is_resuming = bool(options and options.resume)

# Create session for state persistence (local to this run)
# SQLiteSession automatically loads existing data from the database when created
session: SQLiteSession | None = None
if self.storage_path:
session = SQLiteSession(self.runtime_id, self.storage_path)

# Run the agent with streaming if events requested
try:
if stream_events:
# Use streaming for events
async for event_or_result in self._run_agent_streamed(
agent_input, options, stream_events, session
):
yield event_or_result
else:
# Use non-streaming for simple execution
result = await Runner.run(
starting_agent=self.agent,
input=agent_input,
session=session,
)
yield self._create_success_result(result.final_output)

except Exception:
# Clean up session on error
if session and self.storage_path and not is_resuming:
# Delete incomplete session
try:
import os

if os.path.exists(self.storage_path):
os.remove(self.storage_path)
except Exception:
pass # Best effort cleanup
raise
finally:
# Always close session after run completes with proper WAL checkpoint
if session:
self._close_session_with_checkpoint(session)
if stream_events:
# Use streaming for events
async for event_or_result in self._run_agent_streamed(
agent_input, options, stream_events
):
yield event_or_result
else:
# Use non-streaming for simple execution
result = await Runner.run(
starting_agent=self.agent,
input=agent_input,
)
yield self._create_success_result(result.final_output)

async def _run_agent_streamed(
self,
agent_input: str | list[Any],
options: UiPathExecuteOptions | UiPathStreamOptions | None,
stream_events: bool,
session: SQLiteSession | None,
) -> AsyncGenerator[UiPathRuntimeEvent | UiPathRuntimeResult, None]:
"""
Run agent using streaming API to enable event streaming.
Expand All @@ -272,7 +234,6 @@ async def _run_agent_streamed(
result = Runner.run_streamed(
starting_agent=self.agent,
input=agent_input,
session=session,
)

# Stream events from the agent
Expand Down Expand Up @@ -477,7 +438,7 @@ async def get_schema(self) -> UiPathRuntimeSchema:
Returns:
UiPathRuntimeSchema with input/output schemas and graph structure
"""
entrypoints_schema = get_entrypoints_schema(self.agent, self.loaded_object)
entrypoints_schema = get_entrypoints_schema(self.agent)

return UiPathRuntimeSchema(
filePath=self.entrypoint,
Expand All @@ -488,45 +449,6 @@ async def get_schema(self) -> UiPathRuntimeSchema:
graph=get_agent_schema(self.agent),
)

def _close_session_with_checkpoint(self, session: SQLiteSession) -> None:
"""Close SQLite session with WAL checkpoint to release file locks.

OpenAI SDK uses sync sqlite3 which doesn't release file locks on Windows
without explicit WAL checkpoint. This is especially important for cleanup.

Args:
session: The SQLiteSession to close
"""
try:
# Get the underlying connection
conn = session._get_connection()

# Commit any pending transactions
try:
conn.commit()
except Exception:
pass # Best effort

# Force WAL checkpoint to release shared memory files
# This is especially important on Windows
try:
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
conn.commit()
except Exception:
pass # Best effort

except Exception:
pass # Best effort cleanup

finally:
# Always call the session's close method
try:
session.close()
except Exception:
pass # Best effort

async def dispose(self) -> None:
"""Cleanup runtime resources."""
# Sessions are closed immediately after each run in _run_agent()
# Storage is shared across runtimes and managed by the factory
pass
Loading