Skip to content

Conversation

@grimoire
Copy link
Collaborator

@grimoire grimoire commented Dec 25, 2025

3 new apis has been added to Engine / EngineLoop / Executor / ModelAgent.

  • start would start all the backgound tasks in the module and it's submodules.
  • wait_tasks would await all backendground tasks until one of them raised any exceptions. (wait_tasks of the submodules would also be treated as background task). All other tasks would be cancelled.
  • stop would stop all the background tasks.

If an exception raised in ModelAgent, wait_tasks would prapogate it to the parent module (ModelAgent -> Executor), other tasks in the ModelAgent would be cancelled. Executor would do the same when it received the exception from ModelAgent, then EngineLoop .... All the way to the Engine.

Users can manually stop the background tasks by calling stop, which would raise and CancelledError in background tasks.

Tip

Another solution is merging them all into an async def run function, which would start the tasks / wait for exception / clean up tasks. run function itself should be packed as an async task.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a new exception handling and task lifecycle management system across the PyTorch engine stack. The changes introduce three key APIs (start, wait_tasks, and stop) that provide structured exception propagation from ModelAgent → Executor → EngineLoop → Engine, with proper task cancellation when any component fails.

Key changes include:

  • Added a centralized wait_for_async_tasks utility function for consistent task waiting and exception handling
  • Introduced wait_tasks methods throughout the component hierarchy to propagate exceptions upward
  • Refactored task lifecycle management to use a set-based tracking system with automatic cleanup callbacks
  • Enhanced signal handling in the ZMQ multiprocessing engine with async shutdown support

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
lmdeploy/pytorch/utils.py Adds wait_for_async_tasks utility for centralized async task exception handling
lmdeploy/pytorch/engine/request.py Sets ENGINE_STOP_ERROR response type when engine loop fails
lmdeploy/pytorch/engine/mp_engine/zmq_rpc.py Adds task tracking and graceful error handling for ZMQ RPC send operations
lmdeploy/pytorch/engine/mp_engine/zmq_engine.py Replaces signal handler with async-compatible version and adds proper cleanup in finally block
lmdeploy/pytorch/engine/model_agent.py Refactors task management with set-based tracking and implements wait_tasks for exception propagation
lmdeploy/pytorch/engine/executor/base.py Adds abstract wait_tasks method to executor interface
lmdeploy/pytorch/engine/executor/uni_executor.py Implements wait_tasks by delegating to model_agent
lmdeploy/pytorch/engine/executor/ray_executor.py Implements wait_tasks with Ray-specific worker monitoring and error handling
lmdeploy/pytorch/engine/executor/mp_executor.py Implements wait_tasks using collective RPC calls to workers
lmdeploy/pytorch/engine/executor/base_worker.py Implements wait_tasks to coordinate model_agent and output loop tasks
lmdeploy/pytorch/engine/executor/init.py Adds deprecation warning for MPExecutor
lmdeploy/pytorch/engine/engine_loop.py Refactors task creation and waiting with new exception propagation pattern
lmdeploy/pytorch/engine/engine.py Adds stop and wait_tasks methods for top-level engine lifecycle management

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 205 to 211
for task in done:
if task.exception():
exc = task.exception()
if isinstance(exc, asyncio.CancelledError) and ignore_cancellederror:
logger.debug(f'Task <{task.get_name()}> cancelled.')
continue
raise exc from None
Copy link

Copilot AI Dec 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function checks if task.exception() to detect exceptions, but this call itself raises an exception if the task was cancelled. This should be wrapped in a try-except block or checked with task.cancelled() first. The current code may raise an unexpected asyncio.CancelledError when trying to check for exceptions on a cancelled task, bypassing the intended exception handling logic.

Copilot uses AI. Check for mistakes.
Comment on lines +477 to +478
except SystemExit:
logger.error('Ray worker exited.')
Copy link

Copilot AI Dec 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching SystemExit and logging it as an error without re-raising may hide critical failures. SystemExit typically indicates the process should terminate, and this exception should normally be allowed to propagate. If there's a specific reason to catch it here, that should be documented. Otherwise, consider removing SystemExit from the exception handling or re-raising it after cleanup.

Suggested change
except SystemExit:
logger.error('Ray worker exited.')
except SystemExit as e:
logger.error('Ray worker exited.')
raise

Copilot uses AI. Check for mistakes.
@lvhan028 lvhan028 requested a review from CUHKSZzxy December 31, 2025 06:17
Copy link
Collaborator

@CUHKSZzxy CUHKSZzxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants