Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Speed up LargeBocSerializer with bulk cells reading #1533

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions crypto/test/test-db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2991,10 +2991,13 @@ TEST(TonDb, LargeBocSerializer) {
td::unlink(path).ignore();
fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
.move_as_ok();
std_boc_serialize_to_file_large(dboc->get_cell_db_reader(), root->get_hash(), fd, 31);
boc_serialize_to_file_large(dboc->get_cell_db_reader(), root->get_hash(), fd, 31);
fd.close();
auto b = td::read_file_str(path).move_as_ok();
CHECK(a == b);

auto a_cell = vm::deserialize_boc(td::BufferSlice(a));
auto b_cell = vm::deserialize_boc(td::BufferSlice(b));
ASSERT_EQ(a_cell->get_hash(), b_cell->get_hash());
}

TEST(TonDb, DoNotMakeListsPrunned) {
Expand Down
6 changes: 3 additions & 3 deletions crypto/vm/boc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ td::Result<int> BagOfCells::import_cell(td::Ref<vm::Cell> cell, int depth) {
return td::Status::Error("error while importing a cell into a bag of cells: cell is null");
}
if (logger_ptr_) {
TRY_STATUS(logger_ptr_->on_cell_processed());
TRY_STATUS(logger_ptr_->on_cells_processed(1));
}
auto it = cells.find(cell->get_hash());
if (it != cells.end()) {
Expand Down Expand Up @@ -555,7 +555,7 @@ td::Result<std::size_t> BagOfCells::serialize_to_impl(WriterT& writer, int mode)
}
store_offset(fixed_offset);
if (logger_ptr_) {
TRY_STATUS(logger_ptr_->on_cell_processed());
TRY_STATUS(logger_ptr_->on_cells_processed(1));
}
}
if (logger_ptr_) {
Expand Down Expand Up @@ -588,7 +588,7 @@ td::Result<std::size_t> BagOfCells::serialize_to_impl(WriterT& writer, int mode)
}
// std::cerr << std::endl;
if (logger_ptr_) {
TRY_STATUS(logger_ptr_->on_cell_processed());
TRY_STATUS(logger_ptr_->on_cells_processed(1));
}
}
writer.chk();
Expand Down
19 changes: 13 additions & 6 deletions crypto/vm/boc.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,22 +210,27 @@ class BagOfCellsLogger {

void start_stage(std::string stage) {
log_speed_at_ = td::Timestamp::in(LOG_SPEED_PERIOD);
last_speed_log_ = td::Timestamp::now();
processed_cells_ = 0;
timer_ = {};
stage_ = std::move(stage);
}
void finish_stage(td::Slice desc) {
LOG(ERROR) << "serializer: " << stage_ << " took " << timer_.elapsed() << "s, " << desc;
}
td::Status on_cell_processed() {
++processed_cells_;
if (processed_cells_ % 1000 == 0) {
td::Status on_cells_processed(size_t count) {
processed_cells_ += count;
if (processed_cells_ / 1000 > last_token_check_) {
TRY_STATUS(cancellation_token_.check());
last_token_check_ = processed_cells_ / 1000;
}
if (log_speed_at_.is_in_past()) {
log_speed_at_ += LOG_SPEED_PERIOD;
LOG(WARNING) << "serializer: " << stage_ << " " << (double)processed_cells_ / LOG_SPEED_PERIOD << " cells/s";
double period = td::Timestamp::now().at() - last_speed_log_.at();

LOG(WARNING) << "serializer: " << stage_ << " " << (double)processed_cells_ / period << " cells/s";
processed_cells_ = 0;
last_speed_log_ = td::Timestamp::now();
log_speed_at_ = td::Timestamp::in(LOG_SPEED_PERIOD);
}
return td::Status::OK();
}
Expand All @@ -236,6 +241,8 @@ class BagOfCellsLogger {
td::CancellationToken cancellation_token_;
td::Timestamp log_speed_at_;
size_t processed_cells_ = 0;
size_t last_token_check_ = 0;
td::Timestamp last_speed_log_;
static constexpr double LOG_SPEED_PERIOD = 120.0;
};
class BagOfCells {
Expand Down Expand Up @@ -390,7 +397,7 @@ td::Result<td::BufferSlice> std_boc_serialize_multi(std::vector<Ref<Cell>> root,

td::Status std_boc_serialize_to_file(Ref<Cell> root, td::FileFd& fd, int mode = 0,
td::CancellationToken cancellation_token = {});
td::Status std_boc_serialize_to_file_large(std::shared_ptr<CellDbReader> reader, Cell::Hash root_hash, td::FileFd& fd,
td::Status boc_serialize_to_file_large(std::shared_ptr<CellDbReader> reader, Cell::Hash root_hash, td::FileFd& fd,
int mode = 0, td::CancellationToken cancellation_token = {});

} // namespace vm
22 changes: 22 additions & 0 deletions crypto/vm/db/CellStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,28 @@ td::Result<CellLoader::LoadResult> CellLoader::load(td::Slice hash, bool need_da
return res;
}

td::Result<std::vector<CellLoader::LoadResult>> CellLoader::load_bulk(td::Span<td::Slice> hashes, bool need_data,
ExtCellCreator &ext_cell_creator) {
std::vector<std::string> values;
TRY_RESULT(get_statuses, reader_->get_multi(hashes, &values));
std::vector<LoadResult> res;
res.reserve(hashes.size());
for (size_t i = 0; i < hashes.size(); i++) {
auto get_status = get_statuses[i];
if (get_status != KeyValue::GetStatus::Ok) {
DCHECK(get_status == KeyValue::GetStatus::NotFound);
res.push_back(LoadResult{});
continue;
}
TRY_RESULT(load_res, load(hashes[i], values[i], need_data, ext_cell_creator));
if (on_load_callback_) {
on_load_callback_(load_res);
}
res.push_back(std::move(load_res));
}
return res;
}

td::Result<CellLoader::LoadResult> CellLoader::load(td::Slice hash, td::Slice value, bool need_data,
ExtCellCreator &ext_cell_creator) {
LoadResult res;
Expand Down
1 change: 1 addition & 0 deletions crypto/vm/db/CellStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class CellLoader {
};
CellLoader(std::shared_ptr<KeyValueReader> reader, std::function<void(const LoadResult &)> on_load_callback = {});
td::Result<LoadResult> load(td::Slice hash, bool need_data, ExtCellCreator &ext_cell_creator);
td::Result<std::vector<LoadResult>> load_bulk(td::Span<td::Slice> hashes, bool need_data, ExtCellCreator &ext_cell_creator);
static td::Result<LoadResult> load(td::Slice hash, td::Slice value, bool need_data, ExtCellCreator &ext_cell_creator);
td::Result<LoadResult> load_refcnt(td::Slice hash); // This only loads refcnt_, cell_ == null
KeyValueReader &key_value_reader() const {
Expand Down
46 changes: 34 additions & 12 deletions crypto/vm/db/DynamicBagOfCellsDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,24 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
return td::Status::OK();
}
td::Result<Ref<DataCell>> load_cell(td::Slice hash) override {
auto info = hash_table_.get_if_exists(hash);
if (info && info->sync_with_db) {
TRY_RESULT(loaded_cell, info->cell->load_cell());
return std::move(loaded_cell.data_cell);
}
TRY_RESULT(res, loader_->load(hash, true, *this));
if (res.status != CellLoader::LoadResult::Ok) {
return td::Status::Error("cell not found");
}
Ref<DataCell> cell = res.cell();
hash_table_.apply(hash, [&](CellInfo &info) { update_cell_info_loaded(info, hash, std::move(res)); });
return cell;
TRY_RESULT(loaded_cell, get_cell_info_force(hash).cell->load_cell());
return std::move(loaded_cell.data_cell);
}
td::Result<Ref<DataCell>> load_root(td::Slice hash) override {
return load_cell(hash);
}
td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
std::vector<Ref<DataCell>> result;
result.reserve(hashes.size());
for (auto &hash : hashes) {
auto cell = load_cell(hash);
if (cell.is_error()) {
return cell.move_as_error();
}
result.push_back(cell.move_as_ok());
}
return result;
}
td::Result<Ref<DataCell>> load_root_thread_safe(td::Slice hash) const override {
return td::Status::Error("Not implemented");
}
Expand Down Expand Up @@ -193,6 +195,9 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
promise->set_result(std::move(cell));
});
}
CellInfo &get_cell_info_force(td::Slice hash) {
return hash_table_.apply(hash, [&](CellInfo &info) { update_cell_info_force(info, hash); });
}
CellInfo &get_cell_info_lazy(Cell::LevelMask level_mask, td::Slice hash, td::Slice depth) {
return hash_table_.apply(hash.substr(hash.size() - Cell::hash_bytes),
[&](CellInfo &info) { update_cell_info_lazy(info, level_mask, hash, depth); });
Expand Down Expand Up @@ -383,6 +388,23 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
return std::move(load_result.cell());
}

td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
if (db_) {
return db_->load_bulk(hashes);
}
TRY_RESULT(load_result, cell_loader_->load_bulk(hashes, true, *this));

std::vector<Ref<DataCell>> res;
res.reserve(load_result.size());
for (auto &load_res : load_result) {
if (load_res.status != CellLoader::LoadResult::Ok) {
return td::Status::Error("cell not found");
}
res.push_back(std::move(load_res.cell()));
}
return res;
}

private:
static td::NamedThreadSafeCounter::CounterRef get_thread_safe_counter() {
static auto res = td::NamedThreadSafeCounter::get_default().get_counter("DynamicBagOfCellsDbLoader");
Expand Down
2 changes: 2 additions & 0 deletions crypto/vm/db/DynamicBagOfCellsDb.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class CellDbReader {
public:
virtual ~CellDbReader() = default;
virtual td::Result<Ref<DataCell>> load_cell(td::Slice hash) = 0;
virtual td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) = 0;
};

class DynamicBagOfCellsDb {
Expand All @@ -57,6 +58,7 @@ class DynamicBagOfCellsDb {
virtual td::Status meta_erase(td::Slice key) = 0;

virtual td::Result<Ref<DataCell>> load_cell(td::Slice hash) = 0;
virtual td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) = 0;
virtual td::Result<Ref<DataCell>> load_root(td::Slice hash) = 0;
virtual td::Result<Ref<DataCell>> load_root_thread_safe(td::Slice hash) const = 0;
virtual td::Result<std::vector<Ref<DataCell>>> load_known_roots() const {
Expand Down
18 changes: 18 additions & 0 deletions crypto/vm/db/DynamicBagOfCellsDbV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,10 @@ class DynamicBagOfCellsDbImplV2 : public DynamicBagOfCellsDb {
td::Result<Ref<DataCell>> load_root(td::Slice hash) override {
return load_cell(hash);
}
td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
CHECK(cell_db_reader_);
return cell_db_reader_->load_bulk(hashes);
}
td::Result<Ref<DataCell>> load_root_thread_safe(td::Slice hash) const override {
// TODO: it is better to use AtomicRef, or atomic shared pointer
// But to use AtomicRef we need a little refactoring
Expand Down Expand Up @@ -1102,6 +1106,20 @@ class DynamicBagOfCellsDbImplV2 : public DynamicBagOfCellsDb {
return load_cell_slow_path(hash);
}

td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
// thread safe function
std::vector<Ref<DataCell>> result;
result.reserve(hashes.size());
for (auto &hash : hashes) {
auto maybe_cell = load_cell(hash);
if (maybe_cell.is_error()) {
return maybe_cell.move_as_error();
}
result.push_back(maybe_cell.move_as_ok());
}
return result;
}

td::Result<Ref<DataCell>> load_ext_cell(Ref<DynamicBocExtCell> ext_cell) override {
// thread safe function.
// Called by external cell
Expand Down
13 changes: 13 additions & 0 deletions crypto/vm/db/InMemoryBagOfCellsDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,16 @@ class CellStorage {
return td::Status::Error("not found");
}

td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<CellHash> hashes) const {
std::vector<Ref<DataCell>> res;
res.reserve(hashes.size());
for (auto &hash : hashes) {
TRY_RESULT(cell, load_cell(hash));
res.push_back(std::move(cell));
}
return res;
}

td::Result<Ref<DataCell>> load_root_local(const CellHash &hash) const {
auto lock = local_access_.lock();
if (auto it = local_roots_.find(hash); it != local_roots_.end()) {
Expand Down Expand Up @@ -839,6 +849,9 @@ class InMemoryBagOfCellsDb : public DynamicBagOfCellsDb {
td::Result<Ref<DataCell>> load_root(td::Slice hash) override {
return storage_->load_root_local(CellHash::from_slice(hash));
}
td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
return storage_->load_bulk(td::transform(hashes, [](auto &hash) { return CellHash::from_slice(hash); }));
}
td::Result<Ref<DataCell>> load_root_thread_safe(td::Slice hash) const override {
return storage_->load_root_shared(CellHash::from_slice(hash));
}
Expand Down
Loading