Skip to content

Commit

Permalink
save pipeline driver
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <drfeng08@gmail.com>
  • Loading branch information
stdpain committed Dec 24, 2024
1 parent ba1ebda commit edab262
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
12 changes: 12 additions & 0 deletions be/src/exec/pipeline/pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/scan/olap_scan_operator.h"
#include "exec/pipeline/scan/scan_operator.h"
#include "exec/pipeline/schedule/timeout_tasks.h"
#include "exec/pipeline/source_operator.h"
#include "exec/query_cache/cache_operator.h"
#include "exec/query_cache/lane_arbiter.h"
Expand Down Expand Up @@ -170,6 +171,7 @@ Status PipelineDriver::prepare(RuntimeState* runtime_state) {
size_t subscribe_filter_sequence = source_op->get_driver_sequence();
_local_rf_holders =
fragment_ctx()->runtime_filter_hub()->gather_holders(all_local_rf_set, subscribe_filter_sequence);

if (use_cache) {
ssize_t cache_op_idx = -1;
query_cache::CacheOperatorPtr cache_op = nullptr;
Expand Down Expand Up @@ -688,6 +690,10 @@ void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state, in

_update_driver_level_timer();

if (_global_rf_timer != nullptr) {
_fragment_ctx->pipeline_timer()->unschedule(_global_rf_timer.get());
}

// Acquire the pointer to avoid be released when removing query
auto query_trace = _query_ctx->shared_query_trace();
const std::string driver_name = _driver_name;
Expand Down Expand Up @@ -904,4 +910,10 @@ void PipelineDriver::increment_schedule_times() {
driver_acct().increment_schedule_times();
}

void PipelineDriver::assign_observer() {
for (const auto& op : _operators) {
op->set_observer(&_observer);
}
}

} // namespace starrocks::pipeline
62 changes: 58 additions & 4 deletions be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "exec/pipeline/runtime_filter_types.h"
#include "exec/pipeline/scan/morsel.h"
#include "exec/pipeline/scan/scan_operator.h"
#include "exec/pipeline/schedule/common.h"
#include "exec/pipeline/schedule/observer.h"
#include "exec/pipeline/source_operator.h"
#include "exec/workgroup/work_group_fwd.h"
#include "exprs/runtime_filter_bank.h"
Expand Down Expand Up @@ -195,6 +197,26 @@ enum OperatorStage {
class PipelineDriver {
friend class PipelineDriverPoller;

public:
class ScheduleToken {
public:
ScheduleToken(DriverRawPtr driver, bool acquired) : _driver(driver), _acquired(acquired) {}
~ScheduleToken() {
if (_acquired) {
_driver->_schedule_token = true;
}
}

ScheduleToken(const ScheduleToken&) = delete;
void operator=(const ScheduleToken&) = delete;

bool acquired() const { return _acquired; }

private:
DriverRawPtr _driver;
bool _acquired;
};

public:
PipelineDriver(const Operators& operators, QueryContext* query_ctx, FragmentContext* fragment_ctx,
Pipeline* pipeline, int32_t driver_id)
Expand All @@ -203,11 +225,13 @@ class PipelineDriver {
_fragment_ctx(fragment_ctx),
_pipeline(pipeline),
_source_node_id(operators[0]->get_plan_node_id()),
_driver_id(driver_id) {
_driver_id(driver_id),
_observer(this) {
_runtime_profile = std::make_shared<RuntimeProfile>(strings::Substitute("PipelineDriver (id=$0)", _driver_id));
for (auto& op : _operators) {
_operator_stages[op->get_id()] = OperatorStage::INIT;
}

_driver_name = fmt::sprintf("driver_%d_%d", _source_node_id, _driver_id);
}

Expand Down Expand Up @@ -336,7 +360,6 @@ class PipelineDriver {
if (_all_global_rf_ready_or_timeout) {
return false;
}

_all_global_rf_ready_or_timeout =
_precondition_block_timer_sw->elapsed_time() >= _global_rf_wait_timeout_ns || // Timeout,
std::all_of(_global_rf_descriptors.begin(), _global_rf_descriptors.end(), [](auto* rf_desc) {
Expand Down Expand Up @@ -440,7 +463,27 @@ class PipelineDriver {
void set_driver_queue_level(size_t driver_queue_level) { _driver_queue_level = driver_queue_level; }

inline bool is_in_ready_queue() const { return _in_ready_queue.load(std::memory_order_acquire); }
void set_in_ready_queue(bool v) { _in_ready_queue.store(v, std::memory_order_release); }
void set_in_ready_queue(bool v) {
SCHEDULE_CHECK(!v || !is_in_ready_queue());
_in_ready_queue.store(v, std::memory_order_release);
}

bool is_in_block_queue() const { return _in_block_queue.load(std::memory_order_acquire); }
void set_in_block_queue(bool v) {
SCHEDULE_CHECK(!v || !is_in_block_queue());
SCHEDULE_CHECK(!is_in_ready_queue());
_in_block_queue.store(v, std::memory_order_release);
}

ScheduleToken acquire_schedule_token() {
bool val = false;
return {this, _schedule_token.compare_exchange_strong(val, true)};
}

DECLARE_RACE_DETECTOR(schedule)

bool need_check_reschedule() const { return _need_check_reschedule; }
void set_need_check_reschedule(bool need_reschedule) { _need_check_reschedule = need_reschedule; }

inline std::string get_name() const { return strings::Substitute("PipelineDriver (id=$0)", _driver_id); }

Expand All @@ -456,14 +499,18 @@ class PipelineDriver {
return source_operator()->is_epoch_finishing() || sink_operator()->is_epoch_finishing();
}

PipelineObserver* observer() { return &_observer; }
void assign_observer();

protected:
PipelineDriver()
: _operators(),
_query_ctx(nullptr),
_fragment_ctx(nullptr),
_pipeline(nullptr),
_source_node_id(0),
_driver_id(0) {}
_driver_id(0),
_observer(this) {}

// Yield PipelineDriver when maximum time in nano-seconds has spent in current execution round.
static constexpr int64_t YIELD_MAX_TIME_SPENT_NS = 100'000'000L;
Expand Down Expand Up @@ -526,9 +573,16 @@ class PipelineDriver {
// The index of QuerySharedDriverQueue._queues which this driver belongs to.
size_t _driver_queue_level = 0;
std::atomic<bool> _in_ready_queue{false};
// Indicates whether it is in a block queue. Only used in EventScheduler mode.
std::atomic<bool> _in_block_queue{false};

std::atomic<bool> _schedule_token{true};
// Indicates if the block queue needs to be checked when it is added to the block queue. See EventScheduler for details.
std::atomic<bool> _need_check_reschedule{false};

std::atomic<bool> _has_log_cancelled{false};

PipelineObserver _observer;
// metrics
RuntimeProfile::Counter* _total_timer = nullptr;
RuntimeProfile::Counter* _active_timer = nullptr;
Expand Down

0 comments on commit edab262

Please # to comment.