From edab262d1c333683cae6c9954732bc3003661f2c Mon Sep 17 00:00:00 2001 From: stdpain Date: Tue, 24 Dec 2024 15:22:37 +0800 Subject: [PATCH] save pipeline driver Signed-off-by: stdpain --- be/src/exec/pipeline/pipeline_driver.cpp | 12 +++++ be/src/exec/pipeline/pipeline_driver.h | 62 ++++++++++++++++++++++-- 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_driver.cpp b/be/src/exec/pipeline/pipeline_driver.cpp index d895eda16d9ec1..d8bf902ff67284 100644 --- a/be/src/exec/pipeline/pipeline_driver.cpp +++ b/be/src/exec/pipeline/pipeline_driver.cpp @@ -25,6 +25,7 @@ #include "exec/pipeline/pipeline_driver_executor.h" #include "exec/pipeline/scan/olap_scan_operator.h" #include "exec/pipeline/scan/scan_operator.h" +#include "exec/pipeline/schedule/timeout_tasks.h" #include "exec/pipeline/source_operator.h" #include "exec/query_cache/cache_operator.h" #include "exec/query_cache/lane_arbiter.h" @@ -170,6 +171,7 @@ Status PipelineDriver::prepare(RuntimeState* runtime_state) { size_t subscribe_filter_sequence = source_op->get_driver_sequence(); _local_rf_holders = fragment_ctx()->runtime_filter_hub()->gather_holders(all_local_rf_set, subscribe_filter_sequence); + if (use_cache) { ssize_t cache_op_idx = -1; query_cache::CacheOperatorPtr cache_op = nullptr; @@ -688,6 +690,10 @@ void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state, in _update_driver_level_timer(); + if (_global_rf_timer != nullptr) { + _fragment_ctx->pipeline_timer()->unschedule(_global_rf_timer.get()); + } + // Acquire the pointer to avoid be released when removing query auto query_trace = _query_ctx->shared_query_trace(); const std::string driver_name = _driver_name; @@ -904,4 +910,10 @@ void PipelineDriver::increment_schedule_times() { driver_acct().increment_schedule_times(); } +void PipelineDriver::assign_observer() { + for (const auto& op : _operators) { + op->set_observer(&_observer); + } +} + } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/pipeline_driver.h b/be/src/exec/pipeline/pipeline_driver.h index 94870fe87e2937..2c352db9159efe 100644 --- a/be/src/exec/pipeline/pipeline_driver.h +++ b/be/src/exec/pipeline/pipeline_driver.h @@ -29,6 +29,8 @@ #include "exec/pipeline/runtime_filter_types.h" #include "exec/pipeline/scan/morsel.h" #include "exec/pipeline/scan/scan_operator.h" +#include "exec/pipeline/schedule/common.h" +#include "exec/pipeline/schedule/observer.h" #include "exec/pipeline/source_operator.h" #include "exec/workgroup/work_group_fwd.h" #include "exprs/runtime_filter_bank.h" @@ -195,6 +197,26 @@ enum OperatorStage { class PipelineDriver { friend class PipelineDriverPoller; +public: + class ScheduleToken { + public: + ScheduleToken(DriverRawPtr driver, bool acquired) : _driver(driver), _acquired(acquired) {} + ~ScheduleToken() { + if (_acquired) { + _driver->_schedule_token = true; + } + } + + ScheduleToken(const ScheduleToken&) = delete; + void operator=(const ScheduleToken&) = delete; + + bool acquired() const { return _acquired; } + + private: + DriverRawPtr _driver; + bool _acquired; + }; + public: PipelineDriver(const Operators& operators, QueryContext* query_ctx, FragmentContext* fragment_ctx, Pipeline* pipeline, int32_t driver_id) @@ -203,11 +225,13 @@ class PipelineDriver { _fragment_ctx(fragment_ctx), _pipeline(pipeline), _source_node_id(operators[0]->get_plan_node_id()), - _driver_id(driver_id) { + _driver_id(driver_id), + _observer(this) { _runtime_profile = std::make_shared(strings::Substitute("PipelineDriver (id=$0)", _driver_id)); for (auto& op : _operators) { _operator_stages[op->get_id()] = OperatorStage::INIT; } + _driver_name = fmt::sprintf("driver_%d_%d", _source_node_id, _driver_id); } @@ -336,7 +360,6 @@ class PipelineDriver { if (_all_global_rf_ready_or_timeout) { return false; } - _all_global_rf_ready_or_timeout = _precondition_block_timer_sw->elapsed_time() >= _global_rf_wait_timeout_ns || // Timeout, std::all_of(_global_rf_descriptors.begin(), _global_rf_descriptors.end(), [](auto* rf_desc) { @@ -440,7 +463,27 @@ class PipelineDriver { void set_driver_queue_level(size_t driver_queue_level) { _driver_queue_level = driver_queue_level; } inline bool is_in_ready_queue() const { return _in_ready_queue.load(std::memory_order_acquire); } - void set_in_ready_queue(bool v) { _in_ready_queue.store(v, std::memory_order_release); } + void set_in_ready_queue(bool v) { + SCHEDULE_CHECK(!v || !is_in_ready_queue()); + _in_ready_queue.store(v, std::memory_order_release); + } + + bool is_in_block_queue() const { return _in_block_queue.load(std::memory_order_acquire); } + void set_in_block_queue(bool v) { + SCHEDULE_CHECK(!v || !is_in_block_queue()); + SCHEDULE_CHECK(!is_in_ready_queue()); + _in_block_queue.store(v, std::memory_order_release); + } + + ScheduleToken acquire_schedule_token() { + bool val = false; + return {this, _schedule_token.compare_exchange_strong(val, true)}; + } + + DECLARE_RACE_DETECTOR(schedule) + + bool need_check_reschedule() const { return _need_check_reschedule; } + void set_need_check_reschedule(bool need_reschedule) { _need_check_reschedule = need_reschedule; } inline std::string get_name() const { return strings::Substitute("PipelineDriver (id=$0)", _driver_id); } @@ -456,6 +499,9 @@ class PipelineDriver { return source_operator()->is_epoch_finishing() || sink_operator()->is_epoch_finishing(); } + PipelineObserver* observer() { return &_observer; } + void assign_observer(); + protected: PipelineDriver() : _operators(), @@ -463,7 +509,8 @@ class PipelineDriver { _fragment_ctx(nullptr), _pipeline(nullptr), _source_node_id(0), - _driver_id(0) {} + _driver_id(0), + _observer(this) {} // Yield PipelineDriver when maximum time in nano-seconds has spent in current execution round. static constexpr int64_t YIELD_MAX_TIME_SPENT_NS = 100'000'000L; @@ -526,9 +573,16 @@ class PipelineDriver { // The index of QuerySharedDriverQueue._queues which this driver belongs to. size_t _driver_queue_level = 0; std::atomic _in_ready_queue{false}; + // Indicates whether it is in a block queue. Only used in EventScheduler mode. + std::atomic _in_block_queue{false}; + + std::atomic _schedule_token{true}; + // Indicates if the block queue needs to be checked when it is added to the block queue. See EventScheduler for details. + std::atomic _need_check_reschedule{false}; std::atomic _has_log_cancelled{false}; + PipelineObserver _observer; // metrics RuntimeProfile::Counter* _total_timer = nullptr; RuntimeProfile::Counter* _active_timer = nullptr;