diff --git a/packages/uipath-openai-agents/samples/agent-as-tools/main.py b/packages/uipath-openai-agents/samples/agent-as-tools/main.py index d595ef3..ebaa8af 100644 --- a/packages/uipath-openai-agents/samples/agent-as-tools/main.py +++ b/packages/uipath-openai-agents/samples/agent-as-tools/main.py @@ -1,9 +1,8 @@ -import dotenv -from agents import Agent, AgentOutputSchema, Runner, trace +from agents import Agent, AgentOutputSchema +from agents.models import _openai_shared from pydantic import BaseModel, Field -from uipath.tracing import traced -dotenv.load_dotenv() +from uipath_openai_agents.chat import UiPathChatOpenAI """ This example shows the agents-as-tools pattern adapted for UiPath coded agents. @@ -39,95 +38,58 @@ class TranslationOutput(BaseModel): ) -spanish_agent = Agent( - name="spanish_agent", - instructions="You translate the user's message to Spanish", - handoff_description="An english to spanish translator", -) - -french_agent = Agent( - name="french_agent", - instructions="You translate the user's message to French", - handoff_description="An english to french translator", -) - -italian_agent = Agent( - name="italian_agent", - instructions="You translate the user's message to Italian", - handoff_description="An english to italian translator", -) - -# Orchestrator agent that uses other agents as tools -# Uses output_type for structured outputs (native OpenAI Agents pattern) -# Note: Using AgentOutputSchema with strict_json_schema=False because -# dict[str, str] is not compatible with OpenAI's strict JSON schema mode -orchestrator_agent = Agent( - name="orchestrator_agent", - instructions=( - "You are a translation agent. You use the tools given to you to translate. " - "If asked for multiple translations, you call the relevant tools in order. " - "You never translate on your own, you always use the provided tools." - ), - tools=[ - spanish_agent.as_tool( - tool_name="translate_to_spanish", - tool_description="Translate the user's message to Spanish", - ), - french_agent.as_tool( - tool_name="translate_to_french", - tool_description="Translate the user's message to French", - ), - italian_agent.as_tool( - tool_name="translate_to_italian", - tool_description="Translate the user's message to Italian", +def main() -> Agent: + """Configure UiPath OpenAI client and return the orchestrator agent.""" + # Configure UiPath OpenAI client for agent execution + # This routes all OpenAI API calls through UiPath's LLM Gateway + uipath_openai_client = UiPathChatOpenAI(model_name="gpt-4o-2024-11-20") + _openai_shared.set_default_openai_client(uipath_openai_client.async_client) + + # Define specialized translation agents + spanish_agent = Agent( + name="spanish_agent", + instructions="You translate the user's message to Spanish", + handoff_description="An english to spanish translator", + ) + + french_agent = Agent( + name="french_agent", + instructions="You translate the user's message to French", + handoff_description="An english to french translator", + ) + + italian_agent = Agent( + name="italian_agent", + instructions="You translate the user's message to Italian", + handoff_description="An english to italian translator", + ) + + # Orchestrator agent that uses other agents as tools + # Uses output_type for structured outputs (native OpenAI Agents pattern) + # Note: Using AgentOutputSchema with strict_json_schema=False because + # dict[str, str] is not compatible with OpenAI's strict JSON schema mode + orchestrator_agent = Agent( + name="orchestrator_agent", + instructions=( + "You are a translation agent. You use the tools given to you to translate. " + "If asked for multiple translations, you call the relevant tools in order. " + "You never translate on your own, you always use the provided tools." ), - ], - output_type=AgentOutputSchema(TranslationOutput, strict_json_schema=False), -) - - -@traced(name="Translation Orchestrator Main") -async def main(input_data: TranslationInput) -> TranslationOutput: - """ - Main function to orchestrate translations using agent-as-tools pattern. - - This function demonstrates parameter inference - the Input/Output models - are automatically extracted to generate schemas for UiPath workflows. - - Args: - input_data: Input containing text and target languages - - Returns: - TranslationOutput: Result containing translations for requested languages - """ - print(f"\nTranslating: '{input_data.text}'") - print(f"Target languages: {', '.join(input_data.target_languages)}\n") - - # Build the prompt based on requested languages - language_list = ", ".join(input_data.target_languages) - prompt = f"Translate this text to {language_list}: {input_data.text}" - - with trace("Translation Orchestrator"): - # Run the orchestrator agent - result = await Runner.run( - starting_agent=orchestrator_agent, - input=[{"content": prompt, "role": "user"}], - ) - - # Extract translations from the response - # In a real implementation, you'd parse the structured response - final_response = result.final_output - print(f"\nAgent response: {final_response}\n") - - # For demonstration, create structured output - # In production, you'd parse the agent's structured response - translations = {} - for lang in input_data.target_languages: - # Placeholder - in real usage, extract from agent response - translations[lang] = f"[Translation to {lang}]" - - return TranslationOutput( - original_text=input_data.text, - translations=translations, - languages_used=input_data.target_languages, + tools=[ + spanish_agent.as_tool( + tool_name="translate_to_spanish", + tool_description="Translate the user's message to Spanish", + ), + french_agent.as_tool( + tool_name="translate_to_french", + tool_description="Translate the user's message to French", + ), + italian_agent.as_tool( + tool_name="translate_to_italian", + tool_description="Translate the user's message to Italian", + ), + ], + output_type=AgentOutputSchema(TranslationOutput, strict_json_schema=False), ) + + return orchestrator_agent diff --git a/packages/uipath-openai-agents/samples/agent-as-tools/openai_agents.json b/packages/uipath-openai-agents/samples/agent-as-tools/openai_agents.json index 1b4dd1a..5675c8b 100644 --- a/packages/uipath-openai-agents/samples/agent-as-tools/openai_agents.json +++ b/packages/uipath-openai-agents/samples/agent-as-tools/openai_agents.json @@ -1,5 +1,5 @@ { "agents": { - "agent": "main.py:orchestrator_agent" + "agent": "main.py:main" } } diff --git a/packages/uipath-openai-agents/samples/rag-assistant/main.py b/packages/uipath-openai-agents/samples/rag-assistant/main.py index 7f26d11..1b94605 100644 --- a/packages/uipath-openai-agents/samples/rag-assistant/main.py +++ b/packages/uipath-openai-agents/samples/rag-assistant/main.py @@ -9,12 +9,11 @@ - Streaming responses support """ -import dotenv -from agents import Agent, Runner +from agents import Agent +from agents.models import _openai_shared from pydantic import BaseModel, Field -from uipath.tracing import traced -dotenv.load_dotenv() +from uipath_openai_agents.chat import UiPathChatOpenAI # Required Input/Output models for UiPath coded agents @@ -31,11 +30,18 @@ class Output(BaseModel): agent_used: str = Field(description="The name of the agent that answered") -# Define the assistant agent -# Model defaults to gpt-4.1 which automatically maps to gpt-4o-2024-11-20 -assistant_agent = Agent( - name="assistant_agent", - instructions="""You are a helpful AI assistant that provides clear, concise answers. +def main() -> Agent: + """Configure UiPath OpenAI client and return the assistant agent.""" + # Configure UiPath OpenAI client for agent execution + # This routes all OpenAI API calls through UiPath's LLM Gateway + uipath_openai_client = UiPathChatOpenAI(model_name="gpt-4o-2024-11-20") + _openai_shared.set_default_openai_client(uipath_openai_client.async_client) + + # Define the assistant agent + # Model defaults to gpt-4.1 which automatically maps to gpt-4o-2024-11-20 + assistant_agent = Agent( + name="assistant_agent", + instructions="""You are a helpful AI assistant that provides clear, concise answers. Your capabilities: - Answer questions accurately @@ -43,34 +49,6 @@ class Output(BaseModel): - Be helpful and informative Always aim for clarity and accuracy in your responses.""", -) - - -@traced(name="RAG Assistant Main") -async def main(input_data: Input) -> Output: - """Main function for RAG assistant using OpenAI Agents SDK. - - This function demonstrates the basic OpenAI Agents pattern with UiPath integration. - - Args: - input_data: Input containing the question to ask - - Returns: - Output: Result containing the answer and agent used - """ - print(f"\n🔍 Question: {input_data.question}\n") - - # Run the assistant agent (non-streaming for simplicity) - result = await Runner.run( - starting_agent=assistant_agent, - input=[{"content": input_data.question, "role": "user"}], ) - # Extract the final response - final_response = result.final_output - agent_used = result.current_agent.name - - print(f"\n💬 Answer: {final_response}") - print(f"✅ Agent used: {agent_used}\n") - - return Output(answer=final_response, agent_used=agent_used) + return assistant_agent diff --git a/packages/uipath-openai-agents/samples/rag-assistant/openai_agents.json b/packages/uipath-openai-agents/samples/rag-assistant/openai_agents.json index f451c3a..5675c8b 100644 --- a/packages/uipath-openai-agents/samples/rag-assistant/openai_agents.json +++ b/packages/uipath-openai-agents/samples/rag-assistant/openai_agents.json @@ -1,5 +1,5 @@ { "agents": { - "agent": "main.py:assistant_agent" + "agent": "main.py:main" } } diff --git a/packages/uipath-openai-agents/samples/triage-agent/main.py b/packages/uipath-openai-agents/samples/triage-agent/main.py index f689681..cef91f8 100644 --- a/packages/uipath-openai-agents/samples/triage-agent/main.py +++ b/packages/uipath-openai-agents/samples/triage-agent/main.py @@ -1,12 +1,8 @@ -import asyncio - -import dotenv -from agents import Agent, RawResponsesStreamEvent, Runner, trace -from openai.types.responses import ResponseContentPartDoneEvent, ResponseTextDeltaEvent +from agents import Agent +from agents.models import _openai_shared from pydantic import BaseModel -from uipath.tracing import traced -dotenv.load_dotenv() +from uipath_openai_agents.chat import UiPathChatOpenAI """ This example shows the handoffs/routing pattern adapted for UiPath coded agents. @@ -31,76 +27,34 @@ class Output(BaseModel): agent_used: str -# Define specialized agents for different languages -french_agent = Agent( - name="french_agent", - instructions="You only speak French", -) - -spanish_agent = Agent( - name="spanish_agent", - instructions="You only speak Spanish", -) - -english_agent = Agent( - name="english_agent", - instructions="You only speak English", -) - -# Triage agent routes to appropriate language agent -triage_agent = Agent( - name="triage_agent", - instructions="Handoff to the appropriate agent based on the language of the request.", - handoffs=[french_agent, spanish_agent, english_agent], -) - - -@traced(name="Language Routing Agent Main") -async def main(input_data: Input) -> Output: - """Main function to run the language routing agent. - - Args: - input_data: Input model with a message for the agent. - - Returns: - Output: Result containing the agent's response and which agent was used. - """ - print(f"\nProcessing message: {input_data.message}") - - with trace("Language Routing Agent"): - # Run the agent with streaming - result = Runner.run_streamed( - triage_agent, - input=[{"content": input_data.message, "role": "user"}], - ) - - # Collect the response - response_parts = [] - async for event in result.stream_events(): - if not isinstance(event, RawResponsesStreamEvent): - continue - data = event.data - if isinstance(data, ResponseTextDeltaEvent): - print(data.delta, end="", flush=True) - response_parts.append(data.delta) - elif isinstance(data, ResponseContentPartDoneEvent): - print() - - # Get the final response and agent used - final_response = "".join(response_parts) - agent_used = result.current_agent.name - - print(f"\n\nAgent used: {agent_used}") - return Output(response=final_response, agent_used=agent_used) - - -if __name__ == "__main__": - # Example usage with different languages: - # 1. English message - # asyncio.run(main(Input(message="Hello, how are you?"))) - - # 2. French message - # asyncio.run(main(Input(message="Bonjour, comment allez-vous?"))) - - # 3. Spanish message - asyncio.run(main(Input(message="Hola, ¿cómo estás?"))) +def main() -> Agent: + """Configure UiPath OpenAI client and return the triage agent.""" + # Configure UiPath OpenAI client for agent execution + # This routes all OpenAI API calls through UiPath's LLM Gateway + uipath_openai_client = UiPathChatOpenAI(model_name="gpt-4o-2024-11-20") + _openai_shared.set_default_openai_client(uipath_openai_client.async_client) + + # Define specialized agents for different languages + french_agent = Agent( + name="french_agent", + instructions="You only speak French", + ) + + spanish_agent = Agent( + name="spanish_agent", + instructions="You only speak Spanish", + ) + + english_agent = Agent( + name="english_agent", + instructions="You only speak English", + ) + + # Triage agent routes to appropriate language agent + triage_agent = Agent( + name="triage_agent", + instructions="Handoff to the appropriate agent based on the language of the request.", + handoffs=[french_agent, spanish_agent, english_agent], + ) + + return triage_agent diff --git a/packages/uipath-openai-agents/samples/triage-agent/openai_agents.json b/packages/uipath-openai-agents/samples/triage-agent/openai_agents.json index 5d4b2a3..5675c8b 100644 --- a/packages/uipath-openai-agents/samples/triage-agent/openai_agents.json +++ b/packages/uipath-openai-agents/samples/triage-agent/openai_agents.json @@ -1,5 +1,5 @@ { "agents": { - "agent": "main.py:triage_agent" + "agent": "main.py:main" } } diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/agent.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/agent.py index 3f6a200..1e33622 100644 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/agent.py +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/agent.py @@ -28,9 +28,6 @@ def __init__(self, name: str, file_path: str, variable_name: str): self.file_path = file_path self.variable_name = variable_name self._context_manager: Any = None - self._loaded_object: Any = ( - None # Store original loaded object for type inference - ) @classmethod def from_path_string(cls, name: str, file_path: str) -> Self: @@ -102,9 +99,6 @@ async def load(self) -> Agent: category=UiPathErrorCategory.USER, ) - # Store the original loaded object for type inference - self._loaded_object = agent_object - agent = await self._resolve_agent(agent_object) if not isinstance(agent, Agent): raise UiPathOpenAIAgentsRuntimeError( @@ -179,17 +173,6 @@ async def _resolve_agent(self, agent_object: Any) -> Agent: return agent_instance - def get_loaded_object(self) -> Any: - """ - Get the original loaded object before agent resolution. - - This is useful for extracting type annotations from wrapper functions. - - Returns: - The original loaded object (could be an Agent, function, or callable) - """ - return self._loaded_object - async def cleanup(self) -> None: """Clean up resources (e.g., exit async context managers).""" if self._context_manager: diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py index 1149bc1..534f9dd 100644 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py @@ -1,7 +1,6 @@ """Factory for creating OpenAI Agents runtimes from openai_agents.json configuration.""" import asyncio -import os from typing import Any from agents import Agent @@ -21,7 +20,6 @@ UiPathOpenAIAgentsRuntimeError, ) from uipath_openai_agents.runtime.runtime import UiPathOpenAIAgentRuntime -from uipath_openai_agents.runtime.storage import SqliteAgentStorage class UiPathOpenAIAgentRuntimeFactory: @@ -44,9 +42,6 @@ def __init__( self._agent_loaders: dict[str, OpenAiAgentLoader] = {} self._agent_lock = asyncio.Lock() - self._storage: SqliteAgentStorage | None = None - self._storage_lock = asyncio.Lock() - self._setup_instrumentation() def _setup_instrumentation(self) -> None: @@ -54,101 +49,6 @@ def _setup_instrumentation(self) -> None: OpenAIAgentsInstrumentor().instrument() UiPathSpanUtils.register_current_span_provider(get_current_span_wrapper) - async def _get_or_create_storage(self) -> SqliteAgentStorage | None: - """Get or create the shared storage instance. - - Returns: - Shared storage instance, or None if storage is disabled - """ - async with self._storage_lock: - if self._storage is None: - storage_path = self._get_storage_path() - if storage_path: - self._storage = SqliteAgentStorage(storage_path) - await self._storage.setup() - return self._storage - - def _remove_file_with_retry(self, path: str, max_attempts: int = 5) -> None: - """Remove file with retry logic for Windows file locking. - - OpenAI SDK uses sync sqlite3 which doesn't immediately release file locks - on Windows. This retry mechanism gives the OS time to release the lock. - - Args: - path: Path to file to remove - max_attempts: Maximum number of retry attempts (default: 5) - - Raises: - OSError: If file cannot be removed after all retries - """ - import time - - for attempt in range(max_attempts): - try: - os.remove(path) - return # Success - except PermissionError: - if attempt == max_attempts - 1: - # Last attempt failed, re-raise - raise - # Exponential backoff: 0.1s, 0.2s, 0.4s, 0.8s - time.sleep(0.1 * (2**attempt)) - - def _get_storage_path(self) -> str | None: - """Get the storage path for agent state. - - Returns: - Path to SQLite database for storage, or None if storage is disabled - """ - if self.context.state_file_path is not None: - return self.context.state_file_path - - if self.context.runtime_dir and self.context.state_file: - path = os.path.join(self.context.runtime_dir, self.context.state_file) - if ( - not self.context.resume - and self.context.job_id is None - and not self.context.keep_state_file - ): - # If not resuming and no job id, delete the previous state file - if os.path.exists(path): - self._remove_file_with_retry(path) - os.makedirs(self.context.runtime_dir, exist_ok=True) - return path - - default_path = os.path.join("__uipath", "state.db") - os.makedirs(os.path.dirname(default_path), exist_ok=True) - return default_path - - def _get_storage_path_legacy(self, runtime_id: str) -> str | None: - """ - Get the storage path for agent session state. - - Args: - runtime_id: Unique identifier for the runtime instance - - Returns: - Path to SQLite database for session storage, or None if storage is disabled - """ - if self.context.runtime_dir and self.context.state_file: - # Use state file name pattern but with runtime_id - base_name = os.path.splitext(self.context.state_file)[0] - file_name = f"{base_name}_{runtime_id}.db" - path = os.path.join(self.context.runtime_dir, file_name) - - if not self.context.resume and self.context.job_id is None: - # If not resuming and no job id, delete the previous state file - if os.path.exists(path): - self._remove_file_with_retry(path) - - os.makedirs(self.context.runtime_dir, exist_ok=True) - return path - - # Default storage path - default_dir = os.path.join("__uipath", "sessions") - os.makedirs(default_dir, exist_ok=True) - return os.path.join(default_dir, f"{runtime_id}.db") - def _load_config(self) -> OpenAiAgentsConfig: """Load openai_agents.json configuration.""" if self._config is None: @@ -299,22 +199,10 @@ async def _create_runtime_instance( Returns: Configured runtime instance """ - # Get shared storage instance - storage = await self._get_or_create_storage() - storage_path = storage.storage_path if storage else None - - # Get the loaded object from the agent loader for schema inference - loaded_object = None - if entrypoint in self._agent_loaders: - loaded_object = self._agent_loaders[entrypoint].get_loaded_object() - return UiPathOpenAIAgentRuntime( agent=agent, runtime_id=runtime_id, entrypoint=entrypoint, - storage_path=storage_path, - loaded_object=loaded_object, - storage=storage, ) async def new_runtime( @@ -346,8 +234,3 @@ async def dispose(self) -> None: self._agent_loaders.clear() self._agent_cache.clear() - - # Dispose shared storage - if self._storage: - await self._storage.dispose() - self._storage = None diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py index 5d36882..a8b9d6c 100644 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py @@ -1,15 +1,10 @@ """Runtime class for executing OpenAI Agents within the UiPath framework.""" import json -import os from typing import Any, AsyncGenerator from uuid import uuid4 -from agents import ( - Agent, - Runner, - SQLiteSession, -) +from agents import Agent, Runner from uipath.runtime import ( UiPathExecuteOptions, UiPathRuntimeResult, @@ -27,7 +22,6 @@ from ._serialize import serialize_output from .errors import UiPathOpenAIAgentsErrorCode, UiPathOpenAIAgentsRuntimeError from .schema import get_agent_schema, get_entrypoints_schema -from .storage import SqliteAgentStorage class UiPathOpenAIAgentRuntime: @@ -40,9 +34,6 @@ def __init__( agent: Agent, runtime_id: str | None = None, entrypoint: str | None = None, - storage_path: str | None = None, - loaded_object: Any | None = None, - storage: SqliteAgentStorage | None = None, ): """ Initialize the runtime. @@ -51,86 +42,10 @@ def __init__( agent: The OpenAI Agent to execute runtime_id: Unique identifier for this runtime instance entrypoint: Optional entrypoint name (for schema generation) - storage_path: Path to SQLite database for session persistence - loaded_object: Original loaded object (for schema inference) - storage: Optional storage instance for state persistence """ self.agent: Agent = agent self.runtime_id: str = runtime_id or "default" self.entrypoint: str | None = entrypoint - self.storage_path: str | None = storage_path - self.loaded_object: Any | None = loaded_object - self.storage: SqliteAgentStorage | None = storage - - # Configure OpenAI Agents SDK to use Responses API - # UiPath supports both APIs via X-UiPath-LlmGateway-ApiFlavor header - # Using responses API for enhanced agent capabilities (conversation state, reasoning) - from agents import set_default_openai_api - - set_default_openai_api("responses") - - # Inject UiPath OpenAI client if UiPath credentials are available - self._setup_uipath_client() - - def _setup_uipath_client(self) -> None: - """Set up UiPath OpenAI client for agents to use UiPath gateway. - - This injects the UiPath OpenAI client into the OpenAI Agents SDK - so all agents use the UiPath LLM Gateway instead of direct OpenAI. - - The model is automatically extracted from the agent's `model` parameter. - If not specified in Agent(), the SDK uses agents.models.get_default_model(). - - If UiPath credentials are not available, falls back to default OpenAI client. - """ - try: - # Import here to avoid circular dependency - from uipath_openai_agents.chat import UiPathChatOpenAI - - # Check if UiPath credentials are available - org_id = os.getenv("UIPATH_ORGANIZATION_ID") - tenant_id = os.getenv("UIPATH_TENANT_ID") - token = os.getenv("UIPATH_ACCESS_TOKEN") - uipath_url = os.getenv("UIPATH_URL") - - if org_id and tenant_id and token and uipath_url: - # Extract model from agent definition - from agents.models import get_default_model - - from uipath_openai_agents.chat.supported_models import OpenAIModels - - if hasattr(self.agent, "model") and self.agent.model: - model_name = str(self.agent.model) - else: - model_name = get_default_model() - - # Normalize generic model names to UiPath-specific versions - model_name = OpenAIModels.normalize_model_name(model_name) - - # Update agent's model to normalized version so SDK sends correct model in body - self.agent.model = model_name - - # Create UiPath OpenAI client - uipath_client = UiPathChatOpenAI( - token=token, - org_id=org_id, - tenant_id=tenant_id, - model_name=model_name, - ) - - # Inject into OpenAI Agents SDK - # This makes all agents use UiPath gateway - from agents.models import _openai_shared - - _openai_shared.set_default_openai_client(uipath_client.async_client) - - except ImportError: - # UiPath chat module not available, skip injection - pass - except Exception: - # If injection fails, fall back to default OpenAI client - # Agents will use OPENAI_API_KEY if set - pass async def execute( self, @@ -207,54 +122,27 @@ async def _run_agent( Runtime events if stream_events=True, then final result """ agent_input = self._prepare_agent_input(input) - is_resuming = bool(options and options.resume) - - # Create session for state persistence (local to this run) - # SQLiteSession automatically loads existing data from the database when created - session: SQLiteSession | None = None - if self.storage_path: - session = SQLiteSession(self.runtime_id, self.storage_path) # Run the agent with streaming if events requested - try: - if stream_events: - # Use streaming for events - async for event_or_result in self._run_agent_streamed( - agent_input, options, stream_events, session - ): - yield event_or_result - else: - # Use non-streaming for simple execution - result = await Runner.run( - starting_agent=self.agent, - input=agent_input, - session=session, - ) - yield self._create_success_result(result.final_output) - - except Exception: - # Clean up session on error - if session and self.storage_path and not is_resuming: - # Delete incomplete session - try: - import os - - if os.path.exists(self.storage_path): - os.remove(self.storage_path) - except Exception: - pass # Best effort cleanup - raise - finally: - # Always close session after run completes with proper WAL checkpoint - if session: - self._close_session_with_checkpoint(session) + if stream_events: + # Use streaming for events + async for event_or_result in self._run_agent_streamed( + agent_input, options, stream_events + ): + yield event_or_result + else: + # Use non-streaming for simple execution + result = await Runner.run( + starting_agent=self.agent, + input=agent_input, + ) + yield self._create_success_result(result.final_output) async def _run_agent_streamed( self, agent_input: str | list[Any], options: UiPathExecuteOptions | UiPathStreamOptions | None, stream_events: bool, - session: SQLiteSession | None, ) -> AsyncGenerator[UiPathRuntimeEvent | UiPathRuntimeResult, None]: """ Run agent using streaming API to enable event streaming. @@ -272,7 +160,6 @@ async def _run_agent_streamed( result = Runner.run_streamed( starting_agent=self.agent, input=agent_input, - session=session, ) # Stream events from the agent @@ -477,7 +364,7 @@ async def get_schema(self) -> UiPathRuntimeSchema: Returns: UiPathRuntimeSchema with input/output schemas and graph structure """ - entrypoints_schema = get_entrypoints_schema(self.agent, self.loaded_object) + entrypoints_schema = get_entrypoints_schema(self.agent) return UiPathRuntimeSchema( filePath=self.entrypoint, @@ -488,45 +375,6 @@ async def get_schema(self) -> UiPathRuntimeSchema: graph=get_agent_schema(self.agent), ) - def _close_session_with_checkpoint(self, session: SQLiteSession) -> None: - """Close SQLite session with WAL checkpoint to release file locks. - - OpenAI SDK uses sync sqlite3 which doesn't release file locks on Windows - without explicit WAL checkpoint. This is especially important for cleanup. - - Args: - session: The SQLiteSession to close - """ - try: - # Get the underlying connection - conn = session._get_connection() - - # Commit any pending transactions - try: - conn.commit() - except Exception: - pass # Best effort - - # Force WAL checkpoint to release shared memory files - # This is especially important on Windows - try: - conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") - conn.commit() - except Exception: - pass # Best effort - - except Exception: - pass # Best effort cleanup - - finally: - # Always call the session's close method - try: - session.close() - except Exception: - pass # Best effort - async def dispose(self) -> None: """Cleanup runtime resources.""" - # Sessions are closed immediately after each run in _run_agent() - # Storage is shared across runtimes and managed by the factory pass diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/schema.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/schema.py index f881055..03483f6 100644 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/schema.py +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/schema.py @@ -1,7 +1,7 @@ """Schema extraction utilities for OpenAI Agents.""" import inspect -from typing import Any, get_args, get_origin, get_type_hints +from typing import Any, get_args, get_origin from agents import Agent from pydantic import BaseModel, TypeAdapter @@ -41,106 +41,14 @@ def _is_pydantic_model(type_hint: Any) -> bool: return False -def _extract_schema_from_callable(callable_obj: Any) -> dict[str, Any] | None: - """ - Extract input/output schemas from a callable's type annotations. - - Args: - callable_obj: A callable object (function, async function, etc.) - - Returns: - Dictionary with input and output schemas if type hints are found, - None otherwise - """ - if not callable(callable_obj): - return None - - try: - # Get type hints from the callable - type_hints = get_type_hints(callable_obj) - - if not type_hints: - return None - - # Get function signature to identify parameters - sig = inspect.signature(callable_obj) - params = list(sig.parameters.values()) - - # Find the first parameter (usually the input) - input_type = None - - for param in params: - if param.name in ("self", "cls"): - continue - if param.name in type_hints: - input_type = type_hints[param.name] - break - - # Get return type - return_type = type_hints.get("return") - - schema: dict[str, Any] = { - "input": {"type": "object", "properties": {}, "required": []}, - "output": {"type": "object", "properties": {}, "required": []}, - } - - # Extract input schema from Pydantic model - if input_type and _is_pydantic_model(input_type): - adapter = TypeAdapter(input_type) - input_schema = adapter.json_schema() - unpacked_input = _resolve_refs(input_schema) - - schema["input"]["properties"] = _process_nullable_types( - unpacked_input.get("properties", {}) - ) - schema["input"]["required"] = unpacked_input.get("required", []) - - # Add title and description if available - if "title" in unpacked_input: - schema["input"]["title"] = unpacked_input["title"] - if "description" in unpacked_input: - schema["input"]["description"] = unpacked_input["description"] - - # Extract output schema from Pydantic model - if return_type and _is_pydantic_model(return_type): - adapter = TypeAdapter(return_type) - output_schema = adapter.json_schema() - unpacked_output = _resolve_refs(output_schema) - - schema["output"]["properties"] = _process_nullable_types( - unpacked_output.get("properties", {}) - ) - schema["output"]["required"] = unpacked_output.get("required", []) - - # Add title and description if available - if "title" in unpacked_output: - schema["output"]["title"] = unpacked_output["title"] - if "description" in unpacked_output: - schema["output"]["description"] = unpacked_output["description"] - - # Only return schema if we found at least one Pydantic model - if schema["input"]["properties"] or schema["output"]["properties"]: - return schema - - except Exception: - # If schema extraction fails, return None to fall back to default - pass - - return None - - -def get_entrypoints_schema( - agent: Agent, loaded_object: Any | None = None -) -> dict[str, Any]: +def get_entrypoints_schema(agent: Agent) -> dict[str, Any]: """ Extract input/output schema from an OpenAI Agent. - Prioritizes the agent's native output_type attribute (OpenAI Agents pattern), - with optional fallback to wrapper function type hints (UiPath pattern). + Uses the agent's native output_type attribute for schema extraction. Args: agent: An OpenAI Agent instance - loaded_object: Optional original loaded object (function/callable) with type annotations Returns: Dictionary with input and output schemas @@ -170,19 +78,19 @@ def get_entrypoints_schema( "required": ["message"], } - # Extract output schema - PRIORITY 1: Agent's output_type (native OpenAI Agents pattern) + # Extract output schema - Agent's output_type (native OpenAI Agents pattern) output_type = getattr(agent, "output_type", None) output_extracted = False # Unwrap AgentOutputSchema if present (OpenAI Agents SDK wrapper) - # Check for AgentOutputSchema by looking for 'schema' attribute on non-type instances + # AgentOutputSchema wraps the actual Pydantic model in an 'output_type' attribute if ( output_type is not None - and hasattr(output_type, "schema") + and hasattr(output_type, "output_type") and not isinstance(output_type, type) ): # This is an AgentOutputSchema wrapper instance, extract the actual model - output_type = output_type.schema + output_type = output_type.output_type if output_type is not None and _is_pydantic_model(output_type): try: @@ -207,15 +115,6 @@ def get_entrypoints_schema( # Continue to fallback if extraction fails pass - # Extract output schema - PRIORITY 2: Wrapper function type hints (UiPath pattern) - # This allows UiPath-specific patterns where agents are wrapped in typed functions - if not output_extracted and loaded_object is not None: - wrapper_schema = _extract_schema_from_callable(loaded_object) - if wrapper_schema is not None: - # Use the wrapper's output schema, but keep the default input (messages) - schema["output"] = wrapper_schema["output"] - output_extracted = True - # Fallback: Default output schema for agents without explicit output_type if not output_extracted: schema["output"] = { diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/storage.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/storage.py deleted file mode 100644 index 9bb8f54..0000000 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/storage.py +++ /dev/null @@ -1,357 +0,0 @@ -"""Storage implementation for OpenAI Agents runtime. - -Provides persistence for agent sessions, resume triggers, and key-value storage. -Based on the UiPath LlamaIndex storage implementation but adapted for OpenAI Agents. -""" - -from __future__ import annotations - -import json -import os -from typing import Any, cast - -from pydantic import BaseModel -from uipath.core.errors import ErrorCategory, UiPathFaultedTriggerError -from uipath.runtime import ( - UiPathApiTrigger, - UiPathResumeTrigger, - UiPathResumeTriggerName, - UiPathResumeTriggerType, -) - -from ._sqlite import AsyncSqlite - - -class SqliteAgentStorage: - """SQLite database storage for agent sessions, resume triggers, and state.""" - - def __init__(self, storage_path: str): - """ - Initialize SQLite storage. - - Args: - storage_path: Path to the SQLite database file - """ - self.storage_path = storage_path - self._db: AsyncSqlite | None = None - - async def _get_db(self) -> AsyncSqlite: - """Get or create database connection.""" - if self._db is None: - self._db = AsyncSqlite(self.storage_path, timeout=30.0) - await self._db.connect() - return self._db - - async def dispose(self) -> None: - """Dispose of the storage and close database connection.""" - if self._db: - await self._db.close() - self._db = None - - async def __aenter__(self) -> SqliteAgentStorage: - """Async context manager entry.""" - await self.setup() - return self - - async def __aexit__(self, *args) -> None: - """Async context manager exit.""" - await self.dispose() - - async def setup(self) -> None: - """Ensure storage directory and database tables exist.""" - dir_name = os.path.dirname(self.storage_path) - if dir_name: - os.makedirs(dir_name, exist_ok=True) - - try: - db = await self._get_db() - - # Table for agent state/metadata - await db.execute(""" - CREATE TABLE IF NOT EXISTS agent_state ( - runtime_id TEXT PRIMARY KEY, - state_data TEXT NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """) - - # Table for resume triggers - await db.execute(""" - CREATE TABLE IF NOT EXISTS resume_triggers ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - runtime_id TEXT NOT NULL, - interrupt_id TEXT NOT NULL, - trigger_data TEXT NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """) - - await db.execute( - """ - CREATE INDEX IF NOT EXISTS idx_resume_triggers_runtime_id - ON resume_triggers(runtime_id) - """ - ) - - # Table for key-value storage - await db.execute( - """ - CREATE TABLE IF NOT EXISTS runtime_kv ( - runtime_id TEXT NOT NULL, - namespace TEXT NOT NULL, - key TEXT NOT NULL, - value TEXT, - timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')), - PRIMARY KEY (runtime_id, namespace, key) - ) - """ - ) - - await db.commit() - except Exception as exc: - msg = f"Failed to initialize SQLite storage at {self.storage_path!r}: {exc}" - raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc - - async def save_triggers( - self, runtime_id: str, triggers: list[UiPathResumeTrigger] - ) -> None: - """Save resume triggers to SQLite database.""" - try: - db = await self._get_db() - - # Delete all existing triggers for this runtime_id - await db.execute( - """ - DELETE FROM resume_triggers - WHERE runtime_id = ? - """, - (runtime_id,), - ) - - # Insert new triggers - for trigger in triggers: - trigger_dict = self._serialize_trigger(trigger) - trigger_json = json.dumps(trigger_dict) - await db.execute( - "INSERT INTO resume_triggers (runtime_id, interrupt_id, trigger_data) VALUES (?, ?, ?)", - (runtime_id, trigger.interrupt_id, trigger_json), - ) - - await db.commit() - except Exception as exc: - msg = f"Failed to save resume triggers to database {self.storage_path!r}: {exc}" - raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc - - async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None: - """Get resume triggers from SQLite database.""" - try: - db = await self._get_db() - rows = await db.fetchall( - "SELECT trigger_data FROM resume_triggers WHERE runtime_id = ? ORDER BY id ASC", - (runtime_id,), - ) - except Exception as exc: - msg = f"Failed to retrieve resume triggers from database {self.storage_path!r}: {exc}" - raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc - - if not rows: - return None - - triggers = [] - for row in rows: - trigger_dict = json.loads(row[0]) - triggers.append(self._deserialize_trigger(trigger_dict)) - return triggers - - async def delete_trigger( - self, runtime_id: str, trigger: UiPathResumeTrigger - ) -> None: - """Delete resume trigger from storage.""" - try: - db = await self._get_db() - await db.execute( - """ - DELETE FROM resume_triggers - WHERE runtime_id = ? AND interrupt_id = ? - """, - (runtime_id, trigger.interrupt_id), - ) - await db.commit() - except Exception as exc: - msg = f"Failed to delete resume trigger from database {self.storage_path!r}: {exc}" - raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc - - async def save_state(self, runtime_id: str, state_data: dict[str, Any]) -> None: - """ - Save agent state to SQLite database. - - Args: - runtime_id: Unique identifier for the runtime instance - state_data: Serialized agent state dictionary - """ - state_json = json.dumps(state_data) - - try: - db = await self._get_db() - await db.execute( - """ - INSERT INTO agent_state (runtime_id, state_data) - VALUES (?, ?) - ON CONFLICT(runtime_id) DO UPDATE SET - state_data = excluded.state_data, - timestamp = CURRENT_TIMESTAMP - """, - (runtime_id, state_json), - ) - await db.commit() - except Exception as exc: - msg = f"Failed to save agent state to database {self.storage_path!r}: {exc}" - raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc - - async def load_state(self, runtime_id: str) -> dict[str, Any] | None: - """ - Load agent state from SQLite database. - - Args: - runtime_id: Unique identifier for the runtime instance - - Returns: - Serialized agent state dictionary or None if not found - """ - try: - db = await self._get_db() - row = await db.fetchone( - "SELECT state_data FROM agent_state WHERE runtime_id = ?", - (runtime_id,), - ) - except Exception as exc: - msg = ( - f"Failed to load agent state from database {self.storage_path!r}: {exc}" - ) - raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc - - if not row: - return None - - return json.loads(row[0]) - - async def set_value( - self, - runtime_id: str, - namespace: str, - key: str, - value: Any, - ) -> None: - """Save arbitrary key-value pair to database.""" - if not ( - isinstance(value, str) - or isinstance(value, dict) - or isinstance(value, BaseModel) - or value is None - ): - raise TypeError("Value must be str, dict, BaseModel or None.") - - value_text = self._dump_value(value) - - db = await self._get_db() - await db.execute( - """ - INSERT INTO runtime_kv (runtime_id, namespace, key, value) - VALUES (?, ?, ?, ?) - ON CONFLICT(runtime_id, namespace, key) - DO UPDATE SET - value = excluded.value, - timestamp = (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')) - """, - (runtime_id, namespace, key, value_text), - ) - await db.commit() - - async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: - """Get arbitrary key-value pair from database (scoped by runtime_id + namespace).""" - db = await self._get_db() - row = await db.fetchone( - """ - SELECT value - FROM runtime_kv - WHERE runtime_id = ? AND namespace = ? AND key = ? - LIMIT 1 - """, - (runtime_id, namespace, key), - ) - - if not row: - return None - - return self._load_value(cast(str | None, row[0])) - - def _serialize_trigger(self, trigger: UiPathResumeTrigger) -> dict[str, Any]: - """Serialize a resume trigger to a dictionary.""" - trigger_key = ( - trigger.api_resume.inbox_id if trigger.api_resume else trigger.item_key - ) - payload = ( - json.dumps(trigger.payload) - if isinstance(trigger.payload, dict) - else str(trigger.payload) - if trigger.payload - else None - ) - - return { - "type": trigger.trigger_type.value, - "key": trigger_key, - "name": trigger.trigger_name.value, - "payload": payload, - "interrupt_id": trigger.interrupt_id, - "folder_path": trigger.folder_path, - "folder_key": trigger.folder_key, - } - - def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrigger: - """Deserialize a resume trigger from a dictionary.""" - trigger_type = trigger_data["type"] - key = trigger_data["key"] - name = trigger_data["name"] - folder_path = trigger_data.get("folder_path") - folder_key = trigger_data.get("folder_key") - payload = trigger_data.get("payload") - interrupt_id = trigger_data.get("interrupt_id") - - resume_trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType(trigger_type), - trigger_name=UiPathResumeTriggerName(name), - item_key=key, - folder_path=folder_path, - folder_key=folder_key, - payload=payload, - interrupt_id=interrupt_id, - ) - - if resume_trigger.trigger_type == UiPathResumeTriggerType.API: - resume_trigger.api_resume = UiPathApiTrigger( - inbox_id=resume_trigger.item_key, request=resume_trigger.payload - ) - - return resume_trigger - - def _dump_value(self, value: str | dict[str, Any] | BaseModel | None) -> str | None: - if value is None: - return None - if isinstance(value, BaseModel): - return "j:" + json.dumps(value.model_dump()) - if isinstance(value, dict): - return "j:" + json.dumps(value) - return "s:" + value - - def _load_value(self, raw: str | None) -> Any: - if raw is None: - return None - if raw.startswith("s:"): - return raw[2:] - if raw.startswith("j:"): - return json.loads(raw[2:]) - return raw - - -__all__ = ["SqliteAgentStorage"] diff --git a/packages/uipath-openai-agents/testcases/triage-agent/openai_agents.json b/packages/uipath-openai-agents/testcases/triage-agent/openai_agents.json index cbfba4a..1e4f137 100644 --- a/packages/uipath-openai-agents/testcases/triage-agent/openai_agents.json +++ b/packages/uipath-openai-agents/testcases/triage-agent/openai_agents.json @@ -1,5 +1,5 @@ { "agents": { - "agent": "src/main.py:agent" + "agent": "src/main.py:main" } } diff --git a/packages/uipath-openai-agents/testcases/triage-agent/src/main.py b/packages/uipath-openai-agents/testcases/triage-agent/src/main.py index fa8f8c1..e6f9230 100644 --- a/packages/uipath-openai-agents/testcases/triage-agent/src/main.py +++ b/packages/uipath-openai-agents/testcases/triage-agent/src/main.py @@ -5,38 +5,46 @@ """ -import dotenv from agents import Agent - -dotenv.load_dotenv() - -# Define specialized agents for different languages -# Explicitly set model to gpt-4o-2024-11-20 (OpenAI Agents SDK normalizes gpt-4.1 automatically) -MODEL = "gpt-4o-2024-11-20" - -french_agent = Agent( - name="french_agent", - instructions="You only speak French", - model=MODEL, -) - -spanish_agent = Agent( - name="spanish_agent", - instructions="You only speak Spanish", - model=MODEL, -) - -english_agent = Agent( - name="english_agent", - instructions="You only speak English", - model=MODEL, -) - -# Triage agent routes to appropriate language agent -# Entry point - messages come in as JSON and are handled directly by the agent -agent = Agent( - name="triage_agent", - instructions="Handoff to the appropriate agent based on the language of the request.", - handoffs=[french_agent, spanish_agent, english_agent], - model=MODEL, -) +from agents.models import _openai_shared + +from uipath_openai_agents.chat import UiPathChatOpenAI + + +def main() -> Agent: + """Configure UiPath OpenAI client and return the triage agent.""" + # Configure UiPath OpenAI client for agent execution + # This routes all OpenAI API calls through UiPath's LLM Gateway + MODEL = "gpt-4o-2024-11-20" + uipath_openai_client = UiPathChatOpenAI(model_name=MODEL) + _openai_shared.set_default_openai_client(uipath_openai_client.async_client) + + # Define specialized agents for different languages + french_agent = Agent( + name="french_agent", + instructions="You only speak French", + model=MODEL, + ) + + spanish_agent = Agent( + name="spanish_agent", + instructions="You only speak Spanish", + model=MODEL, + ) + + english_agent = Agent( + name="english_agent", + instructions="You only speak English", + model=MODEL, + ) + + # Triage agent routes to appropriate language agent + # Entry point - messages come in as JSON and are handled directly by the agent + agent = Agent( + name="triage_agent", + instructions="Handoff to the appropriate agent based on the language of the request.", + handoffs=[french_agent, spanish_agent, english_agent], + model=MODEL, + ) + + return agent diff --git a/packages/uipath-openai-agents/tests/conftest.py b/packages/uipath-openai-agents/tests/conftest.py index 4b94af7..3dc776a 100644 --- a/packages/uipath-openai-agents/tests/conftest.py +++ b/packages/uipath-openai-agents/tests/conftest.py @@ -1,24 +1,10 @@ import tempfile from typing import Generator -from unittest.mock import patch import pytest from click.testing import CliRunner -@pytest.fixture(autouse=True) -def use_in_memory_database(): - """Patch storage to use in-memory SQLite database for all tests. - - This prevents Windows file locking issues during test cleanup. - """ - with patch( - "uipath_openai_agents.runtime.factory.UiPathOpenAIAgentRuntimeFactory._get_storage_path", - return_value=":memory:", - ): - yield - - @pytest.fixture def runner() -> CliRunner: """Provide a Click CLI test runner.""" diff --git a/packages/uipath-openai-agents/tests/demo_schema_inference.py b/packages/uipath-openai-agents/tests/demo_schema_inference.py index de3c6fb..0d88aa1 100644 --- a/packages/uipath-openai-agents/tests/demo_schema_inference.py +++ b/packages/uipath-openai-agents/tests/demo_schema_inference.py @@ -1,4 +1,4 @@ -"""Demonstration of parameter inference from type annotations.""" +"""Demonstration of schema extraction from OpenAI Agents.""" import json @@ -8,18 +8,7 @@ from uipath_openai_agents.runtime.schema import get_entrypoints_schema -# Define input/output models -class CustomerQuery(BaseModel): - """Customer support query input.""" - - customer_id: str = Field(description="Unique customer identifier") - message: str = Field(description="Customer's question or issue") - priority: int = Field(default=1, description="Priority level (1-5)", ge=1, le=5) - category: str | None = Field( - default=None, description="Optional category classification" - ) - - +# Define output model class SupportResponse(BaseModel): """Customer support response output.""" @@ -35,76 +24,72 @@ class SupportResponse(BaseModel): ) -# Create agent -support_agent = Agent( +# Create agent WITH output_type +support_agent_with_schema = Agent( name="support_agent", instructions="You are a helpful customer support agent", + output_type=SupportResponse, ) - -async def handle_customer_query(query: CustomerQuery) -> SupportResponse: - """ - Handle a customer support query. - - Args: - query: The customer's query with context - - Returns: - A structured response from the support agent - """ - # Implementation would go here - return SupportResponse( - response="Thank you for contacting us!", - status="resolved", - follow_up_needed=False, - resolution_time_seconds=1.5, - ) +# Create agent WITHOUT output_type (for comparison) +support_agent_without_schema = Agent( + name="support_agent", + instructions="You are a helpful customer support agent", +) def main(): - """Demonstrate schema inference.""" + """Demonstrate schema extraction.""" print("=" * 80) - print("Parameter Inference for OpenAI Agents") + print("Schema Extraction for OpenAI Agents") print("=" * 80) - # Extract schema with type inference - print("\n1. Schema WITH type annotations (from wrapper function):") + # Extract schema from agent with output_type + print("\n1. Schema WITH output_type (native OpenAI Agents pattern):") print("-" * 80) - schema_with_types = get_entrypoints_schema(support_agent, handle_customer_query) - print(json.dumps(schema_with_types, indent=2)) + schema_with_output_type = get_entrypoints_schema(support_agent_with_schema) + print(json.dumps(schema_with_output_type, indent=2)) - # Extract schema without type inference - print("\n\n2. Schema WITHOUT type annotations (default fallback):") + # Extract schema from agent without output_type + print("\n\n2. Schema WITHOUT output_type (default fallback):") print("-" * 80) - schema_without_types = get_entrypoints_schema(support_agent, None) - print(json.dumps(schema_without_types, indent=2)) + schema_without_output_type = get_entrypoints_schema(support_agent_without_schema) + print(json.dumps(schema_without_output_type, indent=2)) # Show the difference print("\n\n" + "=" * 80) print("Key Differences:") print("=" * 80) - print("\nWith type annotations:") + print("\nWith output_type:") + print( + f" - Input properties: {list(schema_with_output_type['input']['properties'].keys())}" + ) + print( + f" - Required inputs: {schema_with_output_type['input'].get('required', [])}" + ) print( - f" - Input properties: {list(schema_with_types['input']['properties'].keys())}" + f" - Output properties: {list(schema_with_output_type['output']['properties'].keys())}" ) - print(f" - Required inputs: {schema_with_types['input'].get('required', [])}") print( - f" - Output properties: {list(schema_with_types['output']['properties'].keys())}" + f" - Required outputs: {schema_with_output_type['output'].get('required', [])}" ) - print(f" - Required outputs: {schema_with_types['output'].get('required', [])}") - print("\nWithout type annotations (default):") + print("\nWithout output_type (default):") + print( + f" - Input properties: {list(schema_without_output_type['input']['properties'].keys())}" + ) + print( + f" - Required inputs: {schema_without_output_type['input'].get('required', [])}" + ) print( - f" - Input properties: {list(schema_without_types['input']['properties'].keys())}" + f" - Output properties: {list(schema_without_output_type['output']['properties'].keys())}" ) - print(f" - Required inputs: {schema_without_types['input'].get('required', [])}") print( - f" - Output properties: {list(schema_without_types['output']['properties'].keys())}" + f" - Required outputs: {schema_without_output_type['output'].get('required', [])}" ) - print(f" - Required outputs: {schema_without_types['output'].get('required', [])}") print("\n" + "=" * 80) - print("✓ Parameter inference extracts rich type information automatically!") + print("✓ Schema extraction uses agent.output_type for structured outputs!") print("=" * 80) diff --git a/packages/uipath-openai-agents/tests/test_agent_as_tools_schema.py b/packages/uipath-openai-agents/tests/test_agent_as_tools_schema.py index 5a6e04f..91c206e 100644 --- a/packages/uipath-openai-agents/tests/test_agent_as_tools_schema.py +++ b/packages/uipath-openai-agents/tests/test_agent_as_tools_schema.py @@ -1,8 +1,15 @@ """Tests for agent-as-tools sample schema extraction.""" +import os import sys from pathlib import Path +# Set up mock environment variables for sample imports +os.environ.setdefault("UIPATH_URL", "https://mock.uipath.com") +os.environ.setdefault("UIPATH_ORGANIZATION_ID", "mock-org-id") +os.environ.setdefault("UIPATH_TENANT_ID", "mock-tenant-id") +os.environ.setdefault("UIPATH_ACCESS_TOKEN", "mock-token") + # Add samples directory to path samples_dir = Path(__file__).parent.parent / "samples" / "agent-as-tools" sys.path.insert(0, str(samples_dir)) @@ -11,7 +18,6 @@ TranslationInput, TranslationOutput, main, - orchestrator_agent, ) from uipath_openai_agents.runtime.schema import get_entrypoints_schema # noqa: E402 @@ -19,7 +25,8 @@ def test_agent_as_tools_input_schema(): """Test that input schema uses default messages format (OpenAI Agents pattern).""" - schema = get_entrypoints_schema(orchestrator_agent, main) + orchestrator_agent = main() + schema = get_entrypoints_schema(orchestrator_agent) # Verify input schema structure - should use default messages assert "input" in schema @@ -42,7 +49,8 @@ def test_agent_as_tools_input_schema(): def test_agent_as_tools_output_schema(): """Test that output schema is extracted from agent's output_type.""" - schema = get_entrypoints_schema(orchestrator_agent, main) + orchestrator_agent = main() + schema = get_entrypoints_schema(orchestrator_agent) # Verify output schema structure assert "output" in schema @@ -73,7 +81,8 @@ def test_agent_as_tools_output_schema(): def test_agent_as_tools_schema_metadata(): """Test that schema includes model metadata from agent's output_type.""" - schema = get_entrypoints_schema(orchestrator_agent, main) + orchestrator_agent = main() + schema = get_entrypoints_schema(orchestrator_agent) # Input uses default messages format (no custom title/description) assert "input" in schema diff --git a/packages/uipath-openai-agents/tests/test_integration.py b/packages/uipath-openai-agents/tests/test_integration.py index ee84d7f..87a6fbc 100644 --- a/packages/uipath-openai-agents/tests/test_integration.py +++ b/packages/uipath-openai-agents/tests/test_integration.py @@ -1,10 +1,17 @@ """Integration test demonstrating new runtime features.""" +import os import sys from pathlib import Path import pytest +# Set up mock environment variables for sample imports +os.environ.setdefault("UIPATH_URL", "https://mock.uipath.com") +os.environ.setdefault("UIPATH_ORGANIZATION_ID", "mock-org-id") +os.environ.setdefault("UIPATH_TENANT_ID", "mock-tenant-id") +os.environ.setdefault("UIPATH_ACCESS_TOKEN", "mock-token") + # Add samples directory to path samples_dir = Path(__file__).parent.parent / "samples" / "agent-as-tools" sys.path.insert(0, str(samples_dir)) @@ -13,22 +20,15 @@ TranslationInput, TranslationOutput, main, - orchestrator_agent, ) from uipath_openai_agents.runtime.errors import ( # noqa: E402 UiPathOpenAIAgentsErrorCode, UiPathOpenAIAgentsRuntimeError, ) -from uipath_openai_agents.runtime.runtime import ( # noqa: E402 - UiPathOpenAIAgentRuntime, -) from uipath_openai_agents.runtime.schema import ( # noqa: E402 get_entrypoints_schema, ) -from uipath_openai_agents.runtime.storage import ( # noqa: E402 - SqliteAgentStorage, -) def test_error_handling(): @@ -52,7 +52,8 @@ def test_error_handling(): def test_schema_extraction_with_new_serialization(): """Test that schema extraction works with the serialization improvements.""" - schema = get_entrypoints_schema(orchestrator_agent, main) + orchestrator_agent = main() + schema = get_entrypoints_schema(orchestrator_agent) # Verify input schema (messages format) assert "input" in schema @@ -68,77 +69,6 @@ def test_schema_extraction_with_new_serialization(): assert schema["output"]["title"] == "TranslationOutput" -async def test_runtime_initialization_with_storage(): - """Test that runtime can be initialized with storage.""" - import tempfile - - with tempfile.TemporaryDirectory() as tmpdir: - storage_path = f"{tmpdir}/test.db" - - # Create storage - storage = SqliteAgentStorage(storage_path) - await storage.setup() - - # Create runtime with storage - runtime = UiPathOpenAIAgentRuntime( - agent=orchestrator_agent, - runtime_id="test_runtime", - entrypoint="test", - storage_path=storage_path, - storage=storage, - loaded_object=main, - ) - - # Verify runtime initialized correctly - assert runtime.storage is not None - assert runtime.runtime_id == "test_runtime" - assert runtime.loaded_object == main - - # Test schema generation - schema = await runtime.get_schema() - assert schema.type == "agent" - assert "message" in schema.input["properties"] - assert "original_text" in schema.output["properties"] - - await storage.dispose() - - -async def test_storage_operations(): - """Test storage save/load operations.""" - import tempfile - - with tempfile.TemporaryDirectory() as tmpdir: - storage_path = f"{tmpdir}/test_storage.db" - - storage = SqliteAgentStorage(storage_path) - await storage.setup() - - # Test state save/load - runtime_id = "test_runtime_123" - test_state = {"step": "translation", "progress": 50} - - await storage.save_state(runtime_id, test_state) - loaded_state = await storage.load_state(runtime_id) - - assert loaded_state == test_state - - # Test key-value operations - await storage.set_value(runtime_id, "test_namespace", "key1", "value1") - value = await storage.get_value(runtime_id, "test_namespace", "key1") - - assert value == "value1" - - # Test dict value - await storage.set_value( - runtime_id, "test_namespace", "dict_key", {"nested": "value"} - ) - dict_value = await storage.get_value(runtime_id, "test_namespace", "dict_key") - - assert dict_value == {"nested": "value"} - - await storage.dispose() - - def test_pydantic_models(): """Test that Pydantic models work correctly with serialization.""" # Create input model diff --git a/packages/uipath-openai-agents/tests/test_schema_inference.py b/packages/uipath-openai-agents/tests/test_schema_inference.py index 1362e10..17e572e 100644 --- a/packages/uipath-openai-agents/tests/test_schema_inference.py +++ b/packages/uipath-openai-agents/tests/test_schema_inference.py @@ -29,8 +29,8 @@ class OutputModel(BaseModel): def test_schema_inference_from_agent_output_type(): - """Test that output schema is correctly inferred from agent's output_type (PRIMARY).""" - schema = get_entrypoints_schema(agent_with_output_type, None) + """Test that output schema is correctly inferred from agent's output_type.""" + schema = get_entrypoints_schema(agent_with_output_type) # Check input schema - should be default messages format assert "input" in schema @@ -58,7 +58,7 @@ def test_schema_inference_from_agent_output_type(): def test_schema_fallback_without_types(): """Test that schemas fall back to defaults when no types are provided.""" - schema = get_entrypoints_schema(test_agent, None) + schema = get_entrypoints_schema(test_agent) # Should use default message-based input schema assert "input" in schema @@ -70,8 +70,8 @@ def test_schema_fallback_without_types(): def test_schema_with_plain_agent(): - """Test schema extraction with a plain agent (no wrapper function).""" - schema = get_entrypoints_schema(test_agent, test_agent) + """Test schema extraction with a plain agent.""" + schema = get_entrypoints_schema(test_agent) # Should use default message input assert "input" in schema @@ -80,33 +80,3 @@ def test_schema_with_plain_agent(): # Should use default result output assert "output" in schema assert "result" in schema["output"]["properties"] - - -class WrapperOutputModel(BaseModel): - """Output model for wrapper function test.""" - - status: str - data: dict[str, str] - - -async def typed_wrapper_function(message: str) -> WrapperOutputModel: - """Wrapper function with type annotations (UiPath pattern - SECONDARY).""" - return WrapperOutputModel(status="success", data={}) - - -def test_schema_with_wrapper_function(): - """Test that wrapper function output schema is used as fallback (SECONDARY).""" - # Agent without output_type should fallback to wrapper function - schema = get_entrypoints_schema(test_agent, typed_wrapper_function) - - # Input should still be default messages (not extracted from wrapper) - assert "input" in schema - assert "message" in schema["input"]["properties"] - assert "required" in schema["input"] - assert "message" in schema["input"]["required"] - - # Output should come from wrapper function (secondary pattern) - assert "output" in schema - assert "properties" in schema["output"] - assert "status" in schema["output"]["properties"] - assert "data" in schema["output"]["properties"] diff --git a/packages/uipath-openai-agents/tests/test_storage.py b/packages/uipath-openai-agents/tests/test_storage.py deleted file mode 100644 index 792a270..0000000 --- a/packages/uipath-openai-agents/tests/test_storage.py +++ /dev/null @@ -1,479 +0,0 @@ -"""Tests for SqliteAgentStorage class.""" - -import json -import os -import tempfile -from pathlib import Path -from typing import Any - -import pytest -from pydantic import BaseModel -from uipath.runtime import ( - UiPathApiTrigger, - UiPathResumeTrigger, - UiPathResumeTriggerName, - UiPathResumeTriggerType, -) - -from uipath_openai_agents.runtime.storage import SqliteAgentStorage - - -class SampleModel(BaseModel): - """Sample Pydantic model for testing.""" - - name: str - value: int - - -class TestSqliteAgentStorageInitialization: - """Test storage initialization and setup.""" - - @pytest.mark.asyncio - async def test_setup_creates_database_file(self, tmp_path: Path): - """Test that setup creates the database file.""" - db_path = tmp_path / "test.db" - async with SqliteAgentStorage(str(db_path)) as storage: - await storage.setup() - assert db_path.exists() - - @pytest.mark.asyncio - async def test_setup_creates_directory_if_missing(self, tmp_path: Path): - """Test that setup creates parent directories if they don't exist.""" - db_path = tmp_path / "subdir" / "another" / "test.db" - async with SqliteAgentStorage(str(db_path)) as storage: - await storage.setup() - assert db_path.exists() - assert db_path.parent.exists() - - @pytest.mark.asyncio - async def test_setup_is_idempotent(self, tmp_path: Path): - """Test that setup can be called multiple times safely.""" - db_path = tmp_path / "test.db" - async with SqliteAgentStorage(str(db_path)) as storage: - await storage.setup() - await storage.setup() # Should not raise - assert db_path.exists() - - -class TestTriggerOperations: - """Test resume trigger save and retrieval operations.""" - - @pytest.fixture - async def storage(self): - """Create a SqliteAgentStorage instance with temporary database file.""" - temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") - temp_db.close() - - try: - async with SqliteAgentStorage(str(temp_db.name)) as storage: - await storage.setup() - yield storage - finally: - if os.path.exists(temp_db.name): - os.remove(temp_db.name) - - @pytest.mark.asyncio - async def test_save_trigger_basic(self, storage: SqliteAgentStorage): - """Test saving a basic resume trigger.""" - trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, - trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, - item_key="queue-123", - folder_path="/test/folder", - folder_key="folder-456", - payload={"data": "test"}, - interrupt_id="interrupt-789", - ) - - await storage.save_triggers("runtime-1", [trigger]) - - # Verify it was saved - triggers = await storage.get_triggers("runtime-1") - assert triggers is not None - assert len(triggers) == 1 - assert triggers[0].trigger_type == UiPathResumeTriggerType.QUEUE_ITEM - assert triggers[0].trigger_name == UiPathResumeTriggerName.QUEUE_ITEM - assert triggers[0].item_key == "queue-123" - - @pytest.mark.asyncio - async def test_save_trigger_with_api_type(self, storage: SqliteAgentStorage): - """Test saving an API type trigger.""" - trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.API, - trigger_name=UiPathResumeTriggerName.API.value, - item_key="inbox-789", - folder_path="/api/folder", - folder_key="folder-abc", - payload='{"request": "data"}', - interrupt_id="interrupt-123", - ) - trigger.api_resume = UiPathApiTrigger( - inbox_id="inbox-789", request='{"request": "data"}' - ) - - await storage.save_triggers("runtime-2", [trigger]) - - retrieved = await storage.get_triggers("runtime-2") - assert retrieved is not None - assert len(retrieved) == 1 - assert retrieved[0].trigger_type == UiPathResumeTriggerType.API - assert retrieved[0].api_resume is not None - assert retrieved[0].api_resume.inbox_id == "inbox-789" - - @pytest.mark.asyncio - async def test_save_multiple_triggers(self, storage: SqliteAgentStorage): - """Test saving multiple triggers for the same runtime.""" - trigger1 = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, - trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, - item_key="first", - interrupt_id="interrupt-1", - ) - trigger2 = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, - trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, - item_key="second", - interrupt_id="interrupt-2", - ) - - await storage.save_triggers("runtime-5", [trigger1, trigger2]) - - retrieved = await storage.get_triggers("runtime-5") - assert retrieved is not None - assert len(retrieved) == 2 - assert retrieved[0].item_key == "first" - assert retrieved[1].item_key == "second" - - @pytest.mark.asyncio - async def test_save_triggers_replaces_existing(self, storage: SqliteAgentStorage): - """Test that saving triggers replaces existing ones.""" - trigger1 = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, - trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, - item_key="first", - interrupt_id="interrupt-1", - ) - trigger2 = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, - trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, - item_key="second", - interrupt_id="interrupt-2", - ) - - await storage.save_triggers("runtime-3", [trigger1]) - await storage.save_triggers("runtime-3", [trigger2]) - - retrieved = await storage.get_triggers("runtime-3") - assert retrieved is not None - assert len(retrieved) == 1 - assert retrieved[0].item_key == "second" - - @pytest.mark.asyncio - async def test_get_triggers_nonexistent(self, storage: SqliteAgentStorage): - """Test getting trigger for non-existent runtime_id.""" - result = await storage.get_triggers("nonexistent") - assert result is None - - @pytest.mark.asyncio - async def test_delete_trigger(self, storage: SqliteAgentStorage): - """Test deleting a specific trigger.""" - trigger1 = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, - trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, - item_key="keep", - interrupt_id="interrupt-keep", - ) - trigger2 = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, - trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, - item_key="delete", - interrupt_id="interrupt-delete", - ) - - await storage.save_triggers("runtime-del", [trigger1, trigger2]) - - # Delete one trigger - await storage.delete_trigger("runtime-del", trigger2) - - retrieved = await storage.get_triggers("runtime-del") - assert retrieved is not None - assert len(retrieved) == 1 - assert retrieved[0].item_key == "keep" - - -class TestStateOperations: - """Test agent state save and load operations.""" - - @pytest.fixture - async def storage(self): - """Create a SqliteAgentStorage instance with temporary database file.""" - temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") - temp_db.close() - - try: - async with SqliteAgentStorage(str(temp_db.name)) as storage: - await storage.setup() - yield storage - finally: - if os.path.exists(temp_db.name): - os.remove(temp_db.name) - - @pytest.mark.asyncio - async def test_save_and_load_state_basic(self, storage: SqliteAgentStorage): - """Test saving and loading a basic state.""" - state = {"step": 1, "data": "test data", "flags": {"active": True}} - - await storage.save_state("runtime-1", state) - loaded = await storage.load_state("runtime-1") - - assert loaded == state - - @pytest.mark.asyncio - async def test_save_and_load_state_complex(self, storage: SqliteAgentStorage): - """Test saving and loading complex state with nested structures.""" - state = { - "variables": {"counter": 42, "name": "test", "items": [1, 2, 3, 4, 5]}, - "agent_state": { - "current_step": "processing", - "metadata": {"created": "2024-01-01", "tags": ["tag1", "tag2"]}, - }, - } - - await storage.save_state("runtime-2", state) - loaded = await storage.load_state("runtime-2") - - assert loaded == state - - @pytest.mark.asyncio - async def test_save_state_overwrites_existing(self, storage: SqliteAgentStorage): - """Test that saving state overwrites existing state.""" - state1 = {"step": 1} - state2 = {"step": 2, "new_field": "value"} - - await storage.save_state("runtime-3", state1) - await storage.save_state("runtime-3", state2) - - loaded = await storage.load_state("runtime-3") - assert loaded == state2 - assert loaded != state1 - - @pytest.mark.asyncio - async def test_load_state_nonexistent(self, storage: SqliteAgentStorage): - """Test loading state for non-existent runtime_id.""" - result = await storage.load_state("nonexistent") - assert result is None - - @pytest.mark.asyncio - async def test_save_state_empty_dict(self, storage: SqliteAgentStorage): - """Test saving empty dictionary as state.""" - state: dict[str, Any] = {} - - await storage.save_state("runtime-4", state) - loaded = await storage.load_state("runtime-4") - - assert loaded == {} - - -class TestKeyValueOperations: - """Test key-value storage operations.""" - - @pytest.fixture - async def storage(self): - """Create a SqliteAgentStorage instance with temporary database file.""" - temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") - temp_db.close() - - try: - async with SqliteAgentStorage(str(temp_db.name)) as storage: - await storage.setup() - yield storage - finally: - if os.path.exists(temp_db.name): - os.remove(temp_db.name) - - @pytest.mark.asyncio - async def test_set_and_get_string_value(self, storage: SqliteAgentStorage): - """Test setting and getting a string value.""" - await storage.set_value("runtime-1", "namespace1", "key1", "test_value") - - value = await storage.get_value("runtime-1", "namespace1", "key1") - assert value == "test_value" - - @pytest.mark.asyncio - async def test_set_and_get_dict_value(self, storage: SqliteAgentStorage): - """Test setting and getting a dictionary value.""" - test_dict = {"name": "John", "age": 30, "active": True} - - await storage.set_value("runtime-2", "namespace2", "key2", test_dict) - - value = await storage.get_value("runtime-2", "namespace2", "key2") - assert value == test_dict - - @pytest.mark.asyncio - async def test_set_and_get_pydantic_model(self, storage: SqliteAgentStorage): - """Test setting and getting a Pydantic model.""" - model = SampleModel(name="test", value=42) - - await storage.set_value("runtime-3", "namespace3", "key3", model) - - value = await storage.get_value("runtime-3", "namespace3", "key3") - assert value == model.model_dump() - - @pytest.mark.asyncio - async def test_set_and_get_none_value(self, storage: SqliteAgentStorage): - """Test setting and getting None value.""" - await storage.set_value("runtime-4", "namespace4", "key4", None) - - value = await storage.get_value("runtime-4", "namespace4", "key4") - assert value is None - - @pytest.mark.asyncio - async def test_set_value_invalid_type(self, storage: SqliteAgentStorage): - """Test that setting invalid type raises TypeError.""" - with pytest.raises( - TypeError, match="Value must be str, dict, BaseModel or None" - ): - await storage.set_value("runtime-5", "namespace5", "key5", 123) - - with pytest.raises( - TypeError, match="Value must be str, dict, BaseModel or None" - ): - await storage.set_value("runtime-5", "namespace5", "key5", [1, 2, 3]) - - @pytest.mark.asyncio - async def test_set_value_overwrites_existing(self, storage: SqliteAgentStorage): - """Test that setting a value overwrites existing value.""" - await storage.set_value("runtime-6", "namespace6", "key6", "first") - await storage.set_value("runtime-6", "namespace6", "key6", "second") - - value = await storage.get_value("runtime-6", "namespace6", "key6") - assert value == "second" - - @pytest.mark.asyncio - async def test_get_value_nonexistent(self, storage: SqliteAgentStorage): - """Test getting non-existent value returns None.""" - value = await storage.get_value("nonexistent", "namespace", "key") - assert value is None - - @pytest.mark.asyncio - async def test_values_isolated_by_runtime_id(self, storage: SqliteAgentStorage): - """Test that values are isolated by runtime_id.""" - await storage.set_value("runtime-a", "ns", "key", "value-a") - await storage.set_value("runtime-b", "ns", "key", "value-b") - - value_a = await storage.get_value("runtime-a", "ns", "key") - value_b = await storage.get_value("runtime-b", "ns", "key") - - assert value_a == "value-a" - assert value_b == "value-b" - - @pytest.mark.asyncio - async def test_values_isolated_by_namespace(self, storage: SqliteAgentStorage): - """Test that values are isolated by namespace.""" - await storage.set_value("runtime-1", "ns-a", "key", "value-a") - await storage.set_value("runtime-1", "ns-b", "key", "value-b") - - value_a = await storage.get_value("runtime-1", "ns-a", "key") - value_b = await storage.get_value("runtime-1", "ns-b", "key") - - assert value_a == "value-a" - assert value_b == "value-b" - - -class TestSerializationMethods: - """Test internal serialization/deserialization methods.""" - - @pytest.fixture - async def storage(self): - """Create a SqliteAgentStorage instance with temporary database file.""" - temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") - temp_db.close() - - try: - async with SqliteAgentStorage(str(temp_db.name)) as storage: - await storage.setup() - yield storage - finally: - if os.path.exists(temp_db.name): - os.remove(temp_db.name) - - def test_serialize_trigger_queue_type(self, storage: SqliteAgentStorage): - """Test serialization of queue type trigger.""" - trigger = UiPathResumeTrigger( - trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, - trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, - item_key="queue-123", - folder_path="/folder", - folder_key="folder-key", - payload={"test": "data"}, - interrupt_id="interrupt-456", - ) - - serialized = storage._serialize_trigger(trigger) - - assert serialized["type"] == UiPathResumeTriggerType.QUEUE_ITEM.value - assert serialized["key"] == "queue-123" - assert serialized["name"] == UiPathResumeTriggerName.QUEUE_ITEM.value - assert serialized["folder_path"] == "/folder" - assert serialized["folder_key"] == "folder-key" - assert serialized["interrupt_id"] == "interrupt-456" - assert json.loads(serialized["payload"]) == {"test": "data"} - - def test_deserialize_trigger_queue_type(self, storage: SqliteAgentStorage): - """Test deserialization of queue type trigger.""" - trigger_data = { - "type": UiPathResumeTriggerType.QUEUE_ITEM.value, - "key": "queue-789", - "name": UiPathResumeTriggerName.QUEUE_ITEM.value, - "folder_path": "/test", - "folder_key": "folder-123", - "payload": '{"key": "value"}', - } - - trigger = storage._deserialize_trigger(trigger_data) - - assert trigger.trigger_type == UiPathResumeTriggerType.QUEUE_ITEM - assert trigger.trigger_name == UiPathResumeTriggerName.QUEUE_ITEM - assert trigger.item_key == "queue-789" - assert trigger.folder_path == "/test" - assert trigger.folder_key == "folder-123" - - def test_dump_value_string(self, storage: SqliteAgentStorage): - """Test _dump_value with string.""" - result = storage._dump_value("test string") - assert result == "s:test string" - - def test_dump_value_dict(self, storage: SqliteAgentStorage): - """Test _dump_value with dictionary.""" - result = storage._dump_value({"key": "value"}) - assert result == 'j:{"key": "value"}' - - def test_dump_value_pydantic_model(self, storage: SqliteAgentStorage): - """Test _dump_value with Pydantic model.""" - model = SampleModel(name="test", value=42) - result = storage._dump_value(model) - assert result == 'j:{"name": "test", "value": 42}' - - def test_dump_value_none(self, storage: SqliteAgentStorage): - """Test _dump_value with None.""" - result = storage._dump_value(None) - assert result is None - - def test_load_value_string(self, storage: SqliteAgentStorage): - """Test _load_value with string.""" - result = storage._load_value("s:test string") - assert result == "test string" - - def test_load_value_json(self, storage: SqliteAgentStorage): - """Test _load_value with JSON.""" - result = storage._load_value('j:{"key": "value"}') - assert result == {"key": "value"} - - def test_load_value_none(self, storage: SqliteAgentStorage): - """Test _load_value with None.""" - result = storage._load_value(None) - assert result is None - - -if __name__ == "__main__": - pytest.main([__file__, "-v"])