diff --git a/crypto/test/test-db.cpp b/crypto/test/test-db.cpp index 2b77e7118..47d833d56 100644 --- a/crypto/test/test-db.cpp +++ b/crypto/test/test-db.cpp @@ -2991,10 +2991,13 @@ TEST(TonDb, LargeBocSerializer) { td::unlink(path).ignore(); fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write) .move_as_ok(); - std_boc_serialize_to_file_large(dboc->get_cell_db_reader(), root->get_hash(), fd, 31); + boc_serialize_to_file_large(dboc->get_cell_db_reader(), root->get_hash(), fd, 31); fd.close(); auto b = td::read_file_str(path).move_as_ok(); - CHECK(a == b); + + auto a_cell = vm::deserialize_boc(td::BufferSlice(a)); + auto b_cell = vm::deserialize_boc(td::BufferSlice(b)); + ASSERT_EQ(a_cell->get_hash(), b_cell->get_hash()); } TEST(TonDb, DoNotMakeListsPrunned) { diff --git a/crypto/vm/boc.cpp b/crypto/vm/boc.cpp index 6377674d7..3aef7dd10 100644 --- a/crypto/vm/boc.cpp +++ b/crypto/vm/boc.cpp @@ -209,7 +209,7 @@ td::Result BagOfCells::import_cell(td::Ref cell, int depth) { return td::Status::Error("error while importing a cell into a bag of cells: cell is null"); } if (logger_ptr_) { - TRY_STATUS(logger_ptr_->on_cell_processed()); + TRY_STATUS(logger_ptr_->on_cells_processed(1)); } auto it = cells.find(cell->get_hash()); if (it != cells.end()) { @@ -555,7 +555,7 @@ td::Result BagOfCells::serialize_to_impl(WriterT& writer, int mode) } store_offset(fixed_offset); if (logger_ptr_) { - TRY_STATUS(logger_ptr_->on_cell_processed()); + TRY_STATUS(logger_ptr_->on_cells_processed(1)); } } if (logger_ptr_) { @@ -588,7 +588,7 @@ td::Result BagOfCells::serialize_to_impl(WriterT& writer, int mode) } // std::cerr << std::endl; if (logger_ptr_) { - TRY_STATUS(logger_ptr_->on_cell_processed()); + TRY_STATUS(logger_ptr_->on_cells_processed(1)); } } writer.chk(); diff --git a/crypto/vm/boc.h b/crypto/vm/boc.h index 17e7eb69d..4f3fe5fcf 100644 --- a/crypto/vm/boc.h +++ b/crypto/vm/boc.h @@ -210,6 +210,7 @@ class BagOfCellsLogger { void start_stage(std::string stage) { log_speed_at_ = td::Timestamp::in(LOG_SPEED_PERIOD); + last_speed_log_ = td::Timestamp::now(); processed_cells_ = 0; timer_ = {}; stage_ = std::move(stage); @@ -217,15 +218,19 @@ class BagOfCellsLogger { void finish_stage(td::Slice desc) { LOG(ERROR) << "serializer: " << stage_ << " took " << timer_.elapsed() << "s, " << desc; } - td::Status on_cell_processed() { - ++processed_cells_; - if (processed_cells_ % 1000 == 0) { + td::Status on_cells_processed(size_t count) { + processed_cells_ += count; + if (processed_cells_ / 1000 > last_token_check_) { TRY_STATUS(cancellation_token_.check()); + last_token_check_ = processed_cells_ / 1000; } if (log_speed_at_.is_in_past()) { - log_speed_at_ += LOG_SPEED_PERIOD; - LOG(WARNING) << "serializer: " << stage_ << " " << (double)processed_cells_ / LOG_SPEED_PERIOD << " cells/s"; + double period = td::Timestamp::now().at() - last_speed_log_.at(); + + LOG(WARNING) << "serializer: " << stage_ << " " << (double)processed_cells_ / period << " cells/s"; processed_cells_ = 0; + last_speed_log_ = td::Timestamp::now(); + log_speed_at_ = td::Timestamp::in(LOG_SPEED_PERIOD); } return td::Status::OK(); } @@ -236,6 +241,8 @@ class BagOfCellsLogger { td::CancellationToken cancellation_token_; td::Timestamp log_speed_at_; size_t processed_cells_ = 0; + size_t last_token_check_ = 0; + td::Timestamp last_speed_log_; static constexpr double LOG_SPEED_PERIOD = 120.0; }; class BagOfCells { @@ -390,7 +397,7 @@ td::Result std_boc_serialize_multi(std::vector> root, td::Status std_boc_serialize_to_file(Ref root, td::FileFd& fd, int mode = 0, td::CancellationToken cancellation_token = {}); -td::Status std_boc_serialize_to_file_large(std::shared_ptr reader, Cell::Hash root_hash, td::FileFd& fd, +td::Status boc_serialize_to_file_large(std::shared_ptr reader, Cell::Hash root_hash, td::FileFd& fd, int mode = 0, td::CancellationToken cancellation_token = {}); } // namespace vm diff --git a/crypto/vm/db/CellStorage.cpp b/crypto/vm/db/CellStorage.cpp index a07d85e87..f93e3fa59 100644 --- a/crypto/vm/db/CellStorage.cpp +++ b/crypto/vm/db/CellStorage.cpp @@ -177,6 +177,28 @@ td::Result CellLoader::load(td::Slice hash, bool need_da return res; } +td::Result> CellLoader::load_bulk(td::Span hashes, bool need_data, + ExtCellCreator &ext_cell_creator) { + std::vector values; + TRY_RESULT(get_statuses, reader_->get_multi(hashes, &values)); + std::vector res; + res.reserve(hashes.size()); + for (size_t i = 0; i < hashes.size(); i++) { + auto get_status = get_statuses[i]; + if (get_status != KeyValue::GetStatus::Ok) { + DCHECK(get_status == KeyValue::GetStatus::NotFound); + res.push_back(LoadResult{}); + continue; + } + TRY_RESULT(load_res, load(hashes[i], values[i], need_data, ext_cell_creator)); + if (on_load_callback_) { + on_load_callback_(load_res); + } + res.push_back(std::move(load_res)); + } + return res; +} + td::Result CellLoader::load(td::Slice hash, td::Slice value, bool need_data, ExtCellCreator &ext_cell_creator) { LoadResult res; diff --git a/crypto/vm/db/CellStorage.h b/crypto/vm/db/CellStorage.h index ca32a8007..7ae586793 100644 --- a/crypto/vm/db/CellStorage.h +++ b/crypto/vm/db/CellStorage.h @@ -49,6 +49,7 @@ class CellLoader { }; CellLoader(std::shared_ptr reader, std::function on_load_callback = {}); td::Result load(td::Slice hash, bool need_data, ExtCellCreator &ext_cell_creator); + td::Result> load_bulk(td::Span hashes, bool need_data, ExtCellCreator &ext_cell_creator); static td::Result load(td::Slice hash, td::Slice value, bool need_data, ExtCellCreator &ext_cell_creator); td::Result load_refcnt(td::Slice hash); // This only loads refcnt_, cell_ == null KeyValueReader &key_value_reader() const { diff --git a/crypto/vm/db/DynamicBagOfCellsDb.cpp b/crypto/vm/db/DynamicBagOfCellsDb.cpp index d6731b039..1a30f76d4 100644 --- a/crypto/vm/db/DynamicBagOfCellsDb.cpp +++ b/crypto/vm/db/DynamicBagOfCellsDb.cpp @@ -138,22 +138,24 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat return td::Status::OK(); } td::Result> load_cell(td::Slice hash) override { - auto info = hash_table_.get_if_exists(hash); - if (info && info->sync_with_db) { - TRY_RESULT(loaded_cell, info->cell->load_cell()); - return std::move(loaded_cell.data_cell); - } - TRY_RESULT(res, loader_->load(hash, true, *this)); - if (res.status != CellLoader::LoadResult::Ok) { - return td::Status::Error("cell not found"); - } - Ref cell = res.cell(); - hash_table_.apply(hash, [&](CellInfo &info) { update_cell_info_loaded(info, hash, std::move(res)); }); - return cell; + TRY_RESULT(loaded_cell, get_cell_info_force(hash).cell->load_cell()); + return std::move(loaded_cell.data_cell); } td::Result> load_root(td::Slice hash) override { return load_cell(hash); } + td::Result>> load_bulk(td::Span hashes) override { + std::vector> result; + result.reserve(hashes.size()); + for (auto &hash : hashes) { + auto cell = load_cell(hash); + if (cell.is_error()) { + return cell.move_as_error(); + } + result.push_back(cell.move_as_ok()); + } + return result; + } td::Result> load_root_thread_safe(td::Slice hash) const override { return td::Status::Error("Not implemented"); } @@ -193,6 +195,9 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat promise->set_result(std::move(cell)); }); } + CellInfo &get_cell_info_force(td::Slice hash) { + return hash_table_.apply(hash, [&](CellInfo &info) { update_cell_info_force(info, hash); }); + } CellInfo &get_cell_info_lazy(Cell::LevelMask level_mask, td::Slice hash, td::Slice depth) { return hash_table_.apply(hash.substr(hash.size() - Cell::hash_bytes), [&](CellInfo &info) { update_cell_info_lazy(info, level_mask, hash, depth); }); @@ -383,6 +388,23 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat return std::move(load_result.cell()); } + td::Result>> load_bulk(td::Span hashes) override { + if (db_) { + return db_->load_bulk(hashes); + } + TRY_RESULT(load_result, cell_loader_->load_bulk(hashes, true, *this)); + + std::vector> res; + res.reserve(load_result.size()); + for (auto &load_res : load_result) { + if (load_res.status != CellLoader::LoadResult::Ok) { + return td::Status::Error("cell not found"); + } + res.push_back(std::move(load_res.cell())); + } + return res; + } + private: static td::NamedThreadSafeCounter::CounterRef get_thread_safe_counter() { static auto res = td::NamedThreadSafeCounter::get_default().get_counter("DynamicBagOfCellsDbLoader"); diff --git a/crypto/vm/db/DynamicBagOfCellsDb.h b/crypto/vm/db/DynamicBagOfCellsDb.h index 82028f3fe..ec0aee9c1 100644 --- a/crypto/vm/db/DynamicBagOfCellsDb.h +++ b/crypto/vm/db/DynamicBagOfCellsDb.h @@ -45,6 +45,7 @@ class CellDbReader { public: virtual ~CellDbReader() = default; virtual td::Result> load_cell(td::Slice hash) = 0; + virtual td::Result>> load_bulk(td::Span hashes) = 0; }; class DynamicBagOfCellsDb { @@ -57,6 +58,7 @@ class DynamicBagOfCellsDb { virtual td::Status meta_erase(td::Slice key) = 0; virtual td::Result> load_cell(td::Slice hash) = 0; + virtual td::Result>> load_bulk(td::Span hashes) = 0; virtual td::Result> load_root(td::Slice hash) = 0; virtual td::Result> load_root_thread_safe(td::Slice hash) const = 0; virtual td::Result>> load_known_roots() const { diff --git a/crypto/vm/db/DynamicBagOfCellsDbV2.cpp b/crypto/vm/db/DynamicBagOfCellsDbV2.cpp index eff74e214..ecf3b76cb 100644 --- a/crypto/vm/db/DynamicBagOfCellsDbV2.cpp +++ b/crypto/vm/db/DynamicBagOfCellsDbV2.cpp @@ -811,6 +811,10 @@ class DynamicBagOfCellsDbImplV2 : public DynamicBagOfCellsDb { td::Result> load_root(td::Slice hash) override { return load_cell(hash); } + td::Result>> load_bulk(td::Span hashes) override { + CHECK(cell_db_reader_); + return cell_db_reader_->load_bulk(hashes); + } td::Result> load_root_thread_safe(td::Slice hash) const override { // TODO: it is better to use AtomicRef, or atomic shared pointer // But to use AtomicRef we need a little refactoring @@ -1102,6 +1106,20 @@ class DynamicBagOfCellsDbImplV2 : public DynamicBagOfCellsDb { return load_cell_slow_path(hash); } + td::Result>> load_bulk(td::Span hashes) override { + // thread safe function + std::vector> result; + result.reserve(hashes.size()); + for (auto &hash : hashes) { + auto maybe_cell = load_cell(hash); + if (maybe_cell.is_error()) { + return maybe_cell.move_as_error(); + } + result.push_back(maybe_cell.move_as_ok()); + } + return result; + } + td::Result> load_ext_cell(Ref ext_cell) override { // thread safe function. // Called by external cell diff --git a/crypto/vm/db/InMemoryBagOfCellsDb.cpp b/crypto/vm/db/InMemoryBagOfCellsDb.cpp index 44be4ca8b..829ed38d8 100644 --- a/crypto/vm/db/InMemoryBagOfCellsDb.cpp +++ b/crypto/vm/db/InMemoryBagOfCellsDb.cpp @@ -454,6 +454,16 @@ class CellStorage { return td::Status::Error("not found"); } + td::Result>> load_bulk(td::Span hashes) const { + std::vector> res; + res.reserve(hashes.size()); + for (auto &hash : hashes) { + TRY_RESULT(cell, load_cell(hash)); + res.push_back(std::move(cell)); + } + return res; + } + td::Result> load_root_local(const CellHash &hash) const { auto lock = local_access_.lock(); if (auto it = local_roots_.find(hash); it != local_roots_.end()) { @@ -839,6 +849,9 @@ class InMemoryBagOfCellsDb : public DynamicBagOfCellsDb { td::Result> load_root(td::Slice hash) override { return storage_->load_root_local(CellHash::from_slice(hash)); } + td::Result>> load_bulk(td::Span hashes) override { + return storage_->load_bulk(td::transform(hashes, [](auto &hash) { return CellHash::from_slice(hash); })); + } td::Result> load_root_thread_safe(td::Slice hash) const override { return storage_->load_root_shared(CellHash::from_slice(hash)); } diff --git a/crypto/vm/large-boc-serializer.cpp b/crypto/vm/large-boc-serializer.cpp index d209c88ed..839e6235a 100644 --- a/crypto/vm/large-boc-serializer.cpp +++ b/crypto/vm/large-boc-serializer.cpp @@ -32,6 +32,7 @@ namespace { class LargeBocSerializer { public: using Hash = Cell::Hash; + constexpr static int load_batch_size = 4'000'000; explicit LargeBocSerializer(std::shared_ptr reader) : reader(std::move(reader)) { } @@ -46,7 +47,6 @@ class LargeBocSerializer { private: std::shared_ptr reader; struct CellInfo { - Cell::Hash hash; std::array ref_idx; int idx; unsigned short serialized_size; @@ -95,6 +95,8 @@ void LargeBocSerializer::add_root(Hash root) { roots.emplace_back(root, -1); } +// Unlike crypto/vm/boc.cpp this implementation does not load all cells into memory +// and traverses them in BFS order to utilize bulk load of cells on the same level. td::Status LargeBocSerializer::import_cells() { if (logger_ptr_) { logger_ptr_->start_stage("import_cells"); @@ -111,46 +113,124 @@ td::Status LargeBocSerializer::import_cells() { return td::Status::OK(); } -td::Result LargeBocSerializer::import_cell(Hash hash, int depth) { - if (depth > Cell::max_depth) { - return td::Status::Error("error while importing a cell into a bag of cells: cell depth too large"); +td::Result LargeBocSerializer::import_cell(Hash root_hash, int root_depth) { + const int start_ind = cell_count; + td::HashMap> current_depth_hashes; + + auto existing_it = cells.find(root_hash); + if (existing_it != cells.end()) { + existing_it->second.should_cache = true; + } else { + current_depth_hashes.emplace(root_hash, std::make_pair(cell_count, false)); + } + int current_depth = root_depth; + int next_child_idx = cell_count + 1; + while (!current_depth_hashes.empty()) { + if (current_depth > Cell::max_depth) { + return td::Status::Error("error while importing a cell into a bag of cells: cell depth too large"); + } + + cell_list.resize(cell_list.size() + current_depth_hashes.size()); + td::HashMap> next_depth_hashes; + auto batch_start = current_depth_hashes.begin(); + while (batch_start != current_depth_hashes.end()) { + std::vector batch_hashes; + batch_hashes.reserve(load_batch_size); + std::vector*> batch_idxs_should_cache; + batch_idxs_should_cache.reserve(load_batch_size); + + while (batch_hashes.size() < load_batch_size && batch_start != current_depth_hashes.end()) { + batch_hashes.push_back(batch_start->first.as_slice()); + batch_idxs_should_cache.push_back(&batch_start->second); + ++batch_start; + } + + TRY_RESULT_PREFIX(loaded_results, reader->load_bulk(batch_hashes), + "error while importing a cell into a bag of cells: "); + DCHECK(loaded_results.size() == batch_hashes.size()); + + for (size_t i = 0; i < loaded_results.size(); ++i) { + auto& cell = loaded_results[i]; + + if (cell->get_virtualization() != 0) { + return td::Status::Error( + "error while importing a cell into a bag of cells: cell has non-zero virtualization level"); + } + + const auto hash = cell->get_hash(); + CellSlice cs(std::move(cell)); + + DCHECK(cs.size_refs() <= 4); + std::array refs{-1, -1, -1, -1}; + for (unsigned j = 0; j < cs.size_refs(); j++) { + auto child = cs.prefetch_ref(j); + const auto child_hash = child->get_hash(); + + auto existing_global_it = cells.find(child_hash); + if (existing_global_it != cells.end()) { + existing_global_it->second.should_cache = true; + refs[j] = existing_global_it->second.idx; + continue; + } + auto current_depth_it = current_depth_hashes.find(child_hash); + if (current_depth_it != current_depth_hashes.end()) { + current_depth_it->second.second = true; + refs[j] = current_depth_it->second.first; + continue; + } + auto next_depth_it = next_depth_hashes.find(child_hash); + if (next_depth_it != next_depth_hashes.end()) { + next_depth_it->second.second = true; + refs[j] = next_depth_it->second.first; + continue; + } + auto res = next_depth_hashes.emplace(child_hash, std::make_pair(next_child_idx, false)); + refs[j] = next_child_idx++; + } + + auto dc = cs.move_as_loaded_cell().data_cell; + auto idx_should_cache = batch_idxs_should_cache[i]; + auto res = cells.emplace(hash, CellInfo(idx_should_cache->first, std::move(refs))); + DCHECK(res.second); + cell_list[idx_should_cache->first] = &*res.first; + CellInfo& dc_info = res.first->second; + dc_info.should_cache = idx_should_cache->second; + dc_info.hcnt = static_cast(dc->get_level_mask().get_hashes_count()); + DCHECK(dc_info.hcnt <= 4); + dc_info.wt = 0; // will be calculated after traversing + TRY_RESULT(serialized_size, td::narrow_cast_safe(dc->get_serialized_size())); + data_bytes += dc_info.serialized_size = serialized_size; + cell_count++; + } + if (logger_ptr_) { + TRY_STATUS(logger_ptr_->on_cells_processed(batch_hashes.size())); + } + } + + current_depth_hashes = std::move(next_depth_hashes); + next_depth_hashes.clear(); + current_depth++; + } + DCHECK(next_child_idx == cell_count); + + for (int idx = cell_count - 1; idx >= start_ind; --idx) { + CellInfo& cell_info = cell_list[idx]->second; + + unsigned sum_child_wt = 1; + for (size_t j = 0; j < cell_info.ref_idx.size(); ++j) { + int child_idx = cell_info.ref_idx[j]; + if (child_idx == -1) { + continue; + } + sum_child_wt += cell_list[child_idx]->second.wt; + ++int_refs; + } + cell_info.wt = static_cast(std::min(0xffU, sum_child_wt)); } - if (logger_ptr_) { - TRY_STATUS(logger_ptr_->on_cell_processed()); - } - auto it = cells.find(hash); - if (it != cells.end()) { - it->second.should_cache = true; - return it->second.idx; - } - TRY_RESULT(cell, reader->load_cell(hash.as_slice())); - if (cell->get_virtualization() != 0) { - return td::Status::Error( - "error while importing a cell into a bag of cells: cell has non-zero virtualization level"); - } - CellSlice cs(std::move(cell)); - std::array refs; - std::fill(refs.begin(), refs.end(), -1); - DCHECK(cs.size_refs() <= 4); - unsigned sum_child_wt = 1; - for (unsigned i = 0; i < cs.size_refs(); i++) { - TRY_RESULT(ref, import_cell(cs.prefetch_ref(i)->get_hash(), depth + 1)); - refs[i] = ref; - sum_child_wt += cell_list[ref]->second.wt; - ++int_refs; - } - auto dc = cs.move_as_loaded_cell().data_cell; - auto res = cells.emplace(hash, CellInfo(cell_count, refs)); - DCHECK(res.second); - cell_list.push_back(&*res.first); - CellInfo& dc_info = res.first->second; - dc_info.wt = (unsigned char)std::min(0xffU, sum_child_wt); - unsigned hcnt = dc->get_level_mask().get_hashes_count(); - DCHECK(hcnt <= 4); - dc_info.hcnt = (unsigned char)hcnt; - TRY_RESULT(serialized_size, td::narrow_cast_safe(dc->get_serialized_size())); - data_bytes += dc_info.serialized_size = serialized_size; - return cell_count++; + + auto root_it = cells.find(root_hash); + DCHECK(root_it != cells.end()); + return root_it->second.idx; } void LargeBocSerializer::reorder_cells() { @@ -386,7 +466,7 @@ td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) { } store_offset(fixed_offset); if (logger_ptr_) { - TRY_STATUS(logger_ptr_->on_cell_processed()); + TRY_STATUS(logger_ptr_->on_cells_processed(1)); } } DCHECK(offs == info.data_size); @@ -399,26 +479,42 @@ td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) { if (logger_ptr_) { logger_ptr_->start_stage("serialize"); } - for (int i = 0; i < cell_count; ++i) { - auto hash = cell_list[cell_count - 1 - i]->first; - const auto& dc_info = cell_list[cell_count - 1 - i]->second; - TRY_RESULT(dc, reader->load_cell(hash.as_slice())); - bool with_hash = (mode & Mode::WithIntHashes) && !dc_info.wt; - if (dc_info.is_root_cell && (mode & Mode::WithTopHash)) { - with_hash = true; + for (int batch_start = 0; batch_start < cell_count; batch_start += load_batch_size) { + int batch_end = std::min(batch_start + static_cast(load_batch_size), cell_count); + + std::vector batch_hashes; + batch_hashes.reserve(batch_end - batch_start); + for (int i = batch_start; i < batch_end; ++i) { + int cell_index = cell_count - 1 - i; + batch_hashes.push_back(cell_list[cell_index]->first.as_slice()); } - unsigned char buf[256]; - int s = dc->serialize(buf, 256, with_hash); - writer.store_bytes(buf, s); - DCHECK(dc->size_refs() == dc_info.get_ref_num()); - unsigned ref_num = dc_info.get_ref_num(); - for (unsigned j = 0; j < ref_num; ++j) { - int k = cell_count - 1 - dc_info.ref_idx[j]; - DCHECK(k > i && k < cell_count); - store_ref(k); + + TRY_RESULT(batch_cells, reader->load_bulk(std::move(batch_hashes))); + + for (int i = batch_start; i < batch_end; ++i) { + int idx_in_batch = i - batch_start; + int cell_index = cell_count - 1 - i; + + const auto& dc_info = cell_list[cell_index]->second; + auto& dc = batch_cells[idx_in_batch]; + + bool with_hash = (mode & Mode::WithIntHashes) && !dc_info.wt; + if (dc_info.is_root_cell && (mode & Mode::WithTopHash)) { + with_hash = true; + } + unsigned char buf[256]; + int s = dc->serialize(buf, 256, with_hash); + writer.store_bytes(buf, s); + DCHECK(dc->size_refs() == dc_info.get_ref_num()); + unsigned ref_num = dc_info.get_ref_num(); + for (unsigned j = 0; j < ref_num; ++j) { + int k = cell_count - 1 - dc_info.ref_idx[j]; + DCHECK(k > i && k < cell_count); + store_ref(k); + } } if (logger_ptr_) { - TRY_STATUS(logger_ptr_->on_cell_processed()); + TRY_STATUS(logger_ptr_->on_cells_processed(batch_hashes.size())); } } DCHECK(writer.position() - keep_position == info.data_size); @@ -435,7 +531,7 @@ td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) { } } // namespace -td::Status std_boc_serialize_to_file_large(std::shared_ptr reader, Cell::Hash root_hash, td::FileFd& fd, +td::Status boc_serialize_to_file_large(std::shared_ptr reader, Cell::Hash root_hash, td::FileFd& fd, int mode, td::CancellationToken cancellation_token) { td::Timer timer; CHECK(reader != nullptr) diff --git a/tddb/td/db/KeyValue.h b/tddb/td/db/KeyValue.h index c3f83919b..6bbe7df3c 100644 --- a/tddb/td/db/KeyValue.h +++ b/tddb/td/db/KeyValue.h @@ -20,6 +20,7 @@ #include "td/utils/Status.h" #include "td/utils/Time.h" #include "td/utils/logging.h" +#include "td/utils/Span.h" #include "td/utils/ThreadSafeCounter.h" #include namespace td { @@ -59,7 +60,8 @@ class KeyValueReader { virtual ~KeyValueReader() = default; enum class GetStatus : int32 { Ok, NotFound }; - virtual Result get(Slice key, std::string& value) = 0; + virtual Result get(Slice key, std::string &value) = 0; + virtual Result> get_multi(td::Span keys, std::vector *values) = 0; virtual Result count(Slice prefix) = 0; virtual Status for_each(std::function f) { return Status::Error("for_each is not supported"); @@ -77,6 +79,14 @@ class PrefixedKeyValueReader : public KeyValueReader { Result get(Slice key, std::string& value) override { return reader_->get(PSLICE() << prefix_ << key, value); } + Result> get_multi(td::Span keys, std::vector *values) override { + std::vector prefixed_keys; + prefixed_keys.reserve(keys.size()); + for (auto &key : keys) { + prefixed_keys.push_back(PSLICE() << prefix_ << key); + } + return reader_->get_multi(prefixed_keys, values); + } Result count(Slice prefix) override { return reader_->count(PSLICE() << prefix_ << prefix); } @@ -125,6 +135,14 @@ class PrefixedKeyValue : public KeyValue { Result get(Slice key, std::string& value) override { return kv_->get(PSLICE() << prefix_ << key, value); } + Result> get_multi(td::Span keys, std::vector *values) override { + std::vector prefixed_keys; + prefixed_keys.reserve(keys.size()); + for (auto &key : keys) { + prefixed_keys.push_back(PSLICE() << prefix_ << key); + } + return kv_->get_multi(prefixed_keys, values); + } Result count(Slice prefix) override { return kv_->count(PSLICE() << prefix_ << prefix); } diff --git a/tddb/td/db/MemoryKeyValue.cpp b/tddb/td/db/MemoryKeyValue.cpp index 7105f72b9..c79daccea 100644 --- a/tddb/td/db/MemoryKeyValue.cpp +++ b/tddb/td/db/MemoryKeyValue.cpp @@ -19,6 +19,7 @@ #include "td/db/MemoryKeyValue.h" #include "td/utils/format.h" +#include "td/utils/Span.h" namespace td { Result MemoryKeyValue::get(Slice key, std::string &value) { @@ -41,6 +42,17 @@ std::unique_ptr MemoryKeyValue:: return lock(buckets_[bucket_id]); } +Result> MemoryKeyValue::get_multi(td::Span keys, + std::vector *values) { + values->resize(keys.size()); + std::vector res; + res.reserve(keys.size()); + for (size_t i = 0; i < keys.size(); i++) { + res.push_back(get(keys[i], (*values)[i]).move_as_ok()); + } + return res; +} + Status MemoryKeyValue::for_each(std::function f) { for (auto &unlocked_bucket : buckets_) { auto bucket = lock(unlocked_bucket); diff --git a/tddb/td/db/MemoryKeyValue.h b/tddb/td/db/MemoryKeyValue.h index cf896095d..6a5be7866 100644 --- a/tddb/td/db/MemoryKeyValue.h +++ b/tddb/td/db/MemoryKeyValue.h @@ -34,6 +34,7 @@ class MemoryKeyValue : public KeyValue { MemoryKeyValue(std::shared_ptr merger) : merger_(std::move(merger)) { } Result get(Slice key, std::string& value) override; + Result> get_multi(td::Span keys, std::vector *values) override; Status for_each(std::function f) override; Status for_each_in_range(Slice begin, Slice end, std::function f) override; Status set(Slice key, Slice value) override; diff --git a/tddb/td/db/RocksDb.cpp b/tddb/td/db/RocksDb.cpp index b56f3b145..5c7a05d21 100644 --- a/tddb/td/db/RocksDb.cpp +++ b/tddb/td/db/RocksDb.cpp @@ -179,6 +179,40 @@ Result RocksDb::get(Slice key, std::string &value) { return from_rocksdb(status); } +Result> RocksDb::get_multi(td::Span keys, std::vector *values) { + std::vector statuses(keys.size()); + std::vector keys_rocksdb; + keys_rocksdb.reserve(keys.size()); + for (auto &key : keys) { + keys_rocksdb.push_back(to_rocksdb(key)); + } + std::vector values_rocksdb(keys.size()); + rocksdb::ReadOptions options; + if (snapshot_) { + options.snapshot = snapshot_.get(); + db_->MultiGet(options, db_->DefaultColumnFamily(), keys_rocksdb.size(), keys_rocksdb.data(), values_rocksdb.data(), statuses.data()); + } else if (transaction_) { + transaction_->MultiGet(options, db_->DefaultColumnFamily(), keys_rocksdb.size(), keys_rocksdb.data(), values_rocksdb.data(), statuses.data()); + } else { + db_->MultiGet(options, db_->DefaultColumnFamily(), keys_rocksdb.size(), keys_rocksdb.data(), values_rocksdb.data(), statuses.data()); + } + std::vector res(statuses.size()); + values->resize(statuses.size()); + for (size_t i = 0; i < statuses.size(); i++) { + auto &status = statuses[i]; + if (status.ok()) { + res[i] = GetStatus::Ok; + values->at(i) = values_rocksdb[i].ToString(); + } else if (status.code() == rocksdb::Status::kNotFound) { + res[i] = GetStatus::NotFound; + values->at(i) = ""; + } else { + return from_rocksdb(status); + } + } + return res; +} + Status RocksDb::set(Slice key, Slice value) { if (write_batch_) { return from_rocksdb(write_batch_->Put(to_rocksdb(key), to_rocksdb(value))); diff --git a/tddb/td/db/RocksDb.h b/tddb/td/db/RocksDb.h index d24a20dd7..5e6fb556c 100644 --- a/tddb/td/db/RocksDb.h +++ b/tddb/td/db/RocksDb.h @@ -84,6 +84,7 @@ class RocksDb : public KeyValue { static Result open(std::string path, RocksDbOptions options = {}); Result get(Slice key, std::string &value) override; + Result> get_multi(td::Span keys, std::vector *values) override; Status set(Slice key, Slice value) override; Status merge(Slice key, Slice value) override; Status erase(Slice key) override; diff --git a/validator/state-serializer.cpp b/validator/state-serializer.cpp index bc3d7b5e0..b8fd84d17 100644 --- a/validator/state-serializer.cpp +++ b/validator/state-serializer.cpp @@ -282,8 +282,38 @@ class CachedCellDbReader : public vm::CellDbReader { } return parent_->load_cell(hash); } + td::Result>> load_bulk(td::Span hashes) override { + total_reqs_ += hashes.size(); + if (!cache_) { + ++bulk_reqs_; + return parent_->load_bulk(hashes); + } + std::vector missing_hashes; + std::vector missing_indices; + std::vector> res(hashes.size()); + for (size_t i = 0; i < hashes.size(); i++) { + auto it = cache_->find(hashes[i]); + if (it != cache_->end()) { + ++cached_reqs_; + TRY_RESULT(loaded_cell, (*it)->load_cell()); + res[i] = loaded_cell.data_cell; + continue; + } + missing_hashes.push_back(hashes[i]); + missing_indices.push_back(i); + } + if (missing_hashes.empty()) { + return std::move(res); + } + TRY_RESULT(missing_cells, parent_->load_bulk(missing_hashes)); + for (size_t i = 0; i < missing_indices.size(); i++) { + res[missing_indices[i]] = missing_cells[i]; + } + return res; + }; void print_stats() const { - LOG(WARNING) << "CachedCellDbReader stats : " << total_reqs_ << " reads, " << cached_reqs_ << " cached"; + LOG(WARNING) << "CachedCellDbReader stats : " << total_reqs_ << " reads, " << cached_reqs_ << " cached, " + << bulk_reqs_ << " bulk reqs"; } private: std::shared_ptr parent_; @@ -291,6 +321,7 @@ class CachedCellDbReader : public vm::CellDbReader { td::uint64 total_reqs_ = 0; td::uint64 cached_reqs_ = 0; + td::uint64 bulk_reqs_ = 0; }; void AsyncStateSerializer::PreviousStateCache::prepare_cache(ShardIdFull shard) { @@ -373,7 +404,7 @@ void AsyncStateSerializer::got_masterchain_state(td::Ref state previous_state_cache->prepare_cache(shard); } auto new_cell_db_reader = std::make_shared(cell_db_reader, previous_state_cache->cache); - auto res = vm::std_boc_serialize_to_file_large(new_cell_db_reader, root->get_hash(), fd, 31, std::move(cancellation_token)); + auto res = vm::boc_serialize_to_file_large(new_cell_db_reader, root->get_hash(), fd, 31, std::move(cancellation_token)); new_cell_db_reader->print_stats(); return res; }; @@ -443,7 +474,7 @@ void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Refprepare_cache(shard); } auto new_cell_db_reader = std::make_shared(cell_db_reader, previous_state_cache->cache); - auto res = vm::std_boc_serialize_to_file_large(new_cell_db_reader, root->get_hash(), fd, 31, std::move(cancellation_token)); + auto res = vm::boc_serialize_to_file_large(new_cell_db_reader, root->get_hash(), fd, 31, std::move(cancellation_token)); new_cell_db_reader->print_stats(); return res; };