From add2fb9908f36cb78564109b3c9c029dd170aa09 Mon Sep 17 00:00:00 2001 From: Mryange Date: Tue, 30 Dec 2025 19:13:05 +0800 Subject: [PATCH] upd --- be/src/pipeline/dependency.cpp | 4 ++-- be/src/pipeline/dependency.h | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 9688bcd568b8d4..0812660952ae6f 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -78,7 +78,7 @@ void Dependency::set_ready() { if (_ready) { return; } - _watcher.stop(); + stop_watcher(lc); _ready = true; local_block_task.swap(_blocked_task); } @@ -95,7 +95,7 @@ Dependency* Dependency::is_blocked_by(std::shared_ptr task) { auto ready = _ready.load(); if (!ready && task) { _add_block_task(task); - start_watcher(); + start_watcher(lc); THROW_IF_ERROR(task->blocked(this, lc)); } return ready ? nullptr : this; diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 01f37cf133bb02..979da95c5d3ed4 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -115,7 +115,10 @@ class Dependency : public std::enable_shared_from_this { bool ready() const { return _ready; } // Start the watcher. We use it to count how long this dependency block the current pipeline task. - void start_watcher() { _watcher.start(); } + void start_watcher(const std::unique_lock&) { _watcher.start(); } + + // Stop the watcher. make sure to call it in lock of _task_lock. + void stop_watcher(const std::unique_lock&) { _watcher.stop(); } [[nodiscard]] int64_t watcher_elapse_time() { return _watcher.elapsed_time(); } // Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready.