Skip to content

Commit

Permalink
refactor: naming in multi_queue_block_merger
Browse files Browse the repository at this point in the history
  • Loading branch information
mhx committed Nov 19, 2023
1 parent 6d81dc1 commit be37673
Showing 1 changed file with 37 additions and 35 deletions.
72 changes: 37 additions & 35 deletions include/dwarfs/multi_queue_block_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
using source_type = SourceT;
using block_type = BlockT;

multi_queue_block_merger(size_t num_active_slots, size_t max_queued_blocks,
std::vector<source_type> const& sources,
std::function<void(block_type)> on_block_merged)
multi_queue_block_merger(
size_t num_active_slots, size_t max_queued_blocks,
std::vector<source_type> const& sources,
std::function<void(block_type)> on_block_merged_callback)
: num_queueable_{max_queued_blocks}
, sources_{sources.begin(), sources.end()}
, active_(num_active_slots)
, on_block_merged_{on_block_merged} {
for (size_t i = 0; i < active_.size() && !sources_.empty(); ++i) {
active_[i] = sources_.front();
sources_.pop_front();
, source_queue_{sources.begin(), sources.end()}
, active_slots_(num_active_slots)
, on_block_merged_callback_{on_block_merged_callback} {
for (size_t i = 0; i < active_slots_.size() && !source_queue_.empty();
++i) {
active_slots_[i] = source_queue_.front();
source_queue_.pop_front();
}
}

Expand All @@ -62,7 +64,7 @@ class multi_queue_block_merger : public block_merger<SourceT, BlockT> {

--num_queueable_;

queues_[src].emplace(std::move(blk), is_last);
block_queues_[src].emplace(std::move(blk), is_last);

while (try_merge_block()) {
}
Expand All @@ -72,16 +74,16 @@ class multi_queue_block_merger : public block_merger<SourceT, BlockT> {

private:
size_t source_distance(source_type src) const {
auto ix = active_index_;
auto ix = active_slot_index_;
size_t distance{0};

while (active_[ix] && active_[ix].value() != src) {
while (active_slots_[ix] && active_slots_[ix].value() != src) {
++distance;
ix = (ix + 1) % active_.size();
ix = (ix + 1) % active_slots_.size();

if (ix == active_index_) {
auto it = std::find(begin(sources_), end(sources_), src);
distance += std::distance(begin(sources_), it);
if (ix == active_slot_index_) {
auto it = std::find(begin(source_queue_), end(source_queue_), src);
distance += std::distance(begin(source_queue_), it);
break;
}
}
Expand All @@ -90,54 +92,54 @@ class multi_queue_block_merger : public block_merger<SourceT, BlockT> {
}

bool try_merge_block() {
auto const ix = active_index_;
auto const ix = active_slot_index_;

assert(active_[ix]);
assert(active_slots_[ix]);

auto src = active_[ix].value();
auto it = queues_.find(src);
auto src = active_slots_[ix].value();
auto it = block_queues_.find(src);

if (it == queues_.end() || it->second.empty()) {
if (it == block_queues_.end() || it->second.empty()) {
return false;
}

auto [blk, is_last] = std::move(it->second.front());
it->second.pop();

on_block_merged_(std::move(blk));
on_block_merged_callback_(std::move(blk));

++num_queueable_;

if (is_last) {
queues_.erase(it);
block_queues_.erase(it);
update_active(ix);
}

do {
active_index_ = (active_index_ + 1) % active_.size();
} while (active_index_ != ix && !active_[active_index_]);
active_slot_index_ = (active_slot_index_ + 1) % active_slots_.size();
} while (active_slot_index_ != ix && !active_slots_[active_slot_index_]);

return active_index_ != ix || active_[active_index_];
return active_slot_index_ != ix || active_slots_[active_slot_index_];
}

void update_active(size_t ix) {
if (!sources_.empty()) {
active_[ix] = sources_.front();
sources_.pop_front();
if (!source_queue_.empty()) {
active_slots_[ix] = source_queue_.front();
source_queue_.pop_front();
} else {
active_[ix].reset();
active_slots_[ix].reset();
}
}

std::mutex mx_;
std::condition_variable cv_;
size_t active_index_{0};
size_t active_slot_index_{0};
size_t num_queueable_;
std::unordered_map<source_type, std::queue<std::pair<block_type, bool>>>
queues_;
std::deque<source_type> sources_;
std::vector<std::optional<source_type>> active_;
std::function<void(block_type)> on_block_merged_;
block_queues_;
std::deque<source_type> source_queue_;
std::vector<std::optional<source_type>> active_slots_;
std::function<void(block_type)> on_block_merged_callback_;
};

} // namespace dwarfs

0 comments on commit be37673

Please # to comment.