-
Notifications
You must be signed in to change notification settings - Fork 640
Optimize exception raising and error process #4236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements a new exception handling and task lifecycle management system across the PyTorch engine stack. The changes introduce three key APIs (start, wait_tasks, and stop) that provide structured exception propagation from ModelAgent → Executor → EngineLoop → Engine, with proper task cancellation when any component fails.
Key changes include:
- Added a centralized
wait_for_async_tasksutility function for consistent task waiting and exception handling - Introduced
wait_tasksmethods throughout the component hierarchy to propagate exceptions upward - Refactored task lifecycle management to use a set-based tracking system with automatic cleanup callbacks
- Enhanced signal handling in the ZMQ multiprocessing engine with async shutdown support
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| lmdeploy/pytorch/utils.py | Adds wait_for_async_tasks utility for centralized async task exception handling |
| lmdeploy/pytorch/engine/request.py | Sets ENGINE_STOP_ERROR response type when engine loop fails |
| lmdeploy/pytorch/engine/mp_engine/zmq_rpc.py | Adds task tracking and graceful error handling for ZMQ RPC send operations |
| lmdeploy/pytorch/engine/mp_engine/zmq_engine.py | Replaces signal handler with async-compatible version and adds proper cleanup in finally block |
| lmdeploy/pytorch/engine/model_agent.py | Refactors task management with set-based tracking and implements wait_tasks for exception propagation |
| lmdeploy/pytorch/engine/executor/base.py | Adds abstract wait_tasks method to executor interface |
| lmdeploy/pytorch/engine/executor/uni_executor.py | Implements wait_tasks by delegating to model_agent |
| lmdeploy/pytorch/engine/executor/ray_executor.py | Implements wait_tasks with Ray-specific worker monitoring and error handling |
| lmdeploy/pytorch/engine/executor/mp_executor.py | Implements wait_tasks using collective RPC calls to workers |
| lmdeploy/pytorch/engine/executor/base_worker.py | Implements wait_tasks to coordinate model_agent and output loop tasks |
| lmdeploy/pytorch/engine/executor/init.py | Adds deprecation warning for MPExecutor |
| lmdeploy/pytorch/engine/engine_loop.py | Refactors task creation and waiting with new exception propagation pattern |
| lmdeploy/pytorch/engine/engine.py | Adds stop and wait_tasks methods for top-level engine lifecycle management |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for task in done: | ||
| if task.exception(): | ||
| exc = task.exception() | ||
| if isinstance(exc, asyncio.CancelledError) and ignore_cancellederror: | ||
| logger.debug(f'Task <{task.get_name()}> cancelled.') | ||
| continue | ||
| raise exc from None |
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function checks if task.exception() to detect exceptions, but this call itself raises an exception if the task was cancelled. This should be wrapped in a try-except block or checked with task.cancelled() first. The current code may raise an unexpected asyncio.CancelledError when trying to check for exceptions on a cancelled task, bypassing the intended exception handling logic.
| except SystemExit: | ||
| logger.error('Ray worker exited.') |
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching SystemExit and logging it as an error without re-raising may hide critical failures. SystemExit typically indicates the process should terminate, and this exception should normally be allowed to propagate. If there's a specific reason to catch it here, that should be documented. Otherwise, consider removing SystemExit from the exception handling or re-raising it after cleanup.
| except SystemExit: | |
| logger.error('Ray worker exited.') | |
| except SystemExit as e: | |
| logger.error('Ray worker exited.') | |
| raise |
CUHKSZzxy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
3 new apis has been added to Engine / EngineLoop / Executor / ModelAgent.
startwould start all the backgound tasks in the module and it's submodules.wait_taskswould await all backendground tasks until one of them raised any exceptions. (wait_tasksof the submodules would also be treated asbackground task). All other tasks would be cancelled.stopwould stop all the background tasks.If an exception raised in ModelAgent,
wait_taskswould prapogate it to the parent module (ModelAgent -> Executor), other tasks in the ModelAgent would be cancelled. Executor would do the same when it received the exception from ModelAgent, then EngineLoop .... All the way to the Engine.Users can manually stop the background tasks by calling
stop, which would raise andCancelledErrorin background tasks.Tip
Another solution is merging them all into an
async def runfunction, which would start the tasks / wait for exception / clean up tasks.runfunction itself should be packed as an async task.