Skip to content

Commit 7bc50e6

Browse files
authored
tonNode.getOutMsgQueueProof query in public shard overlays (#1413)
* tonNode.getOutMsgQueueProof query in public shard overlays * Allow responding to getOutMsgQueueProof requests one at a time only
1 parent 9ae88d8 commit 7bc50e6

37 files changed

+729
-51
lines changed

create-hardfork/create-hardfork.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ class HardforkCreator : public td::actor::Actor {
272272
void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir,
273273
td::Timestamp timeout, td::Promise<std::string> promise) override {
274274
}
275+
void download_out_msg_queue_proof(
276+
ton::ShardIdFull dst_shard, std::vector<ton::BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
277+
td::Timestamp timeout, td::Promise<std::vector<td::Ref<ton::validator::OutMsgQueueProof>>> promise) override {
278+
}
275279

276280
void new_key_block(ton::validator::BlockHandle handle) override {
277281
}

crypto/block/block.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,12 @@ bool EnqueuedMsgDescr::check_key(td::ConstBitPtr key) const {
660660
hash_ == key + 96;
661661
}
662662

663+
bool ImportedMsgQueueLimits::deserialize(vm::CellSlice& cs) {
664+
return cs.fetch_ulong(8) == 0xd3 // imported_msg_queue_limits#d3
665+
&& cs.fetch_uint_to(32, max_bytes) // max_bytes:#
666+
&& cs.fetch_uint_to(32, max_msgs); // max_msgs:#
667+
}
668+
663669
bool ParamLimits::deserialize(vm::CellSlice& cs) {
664670
return cs.fetch_ulong(8) == 0xc3 // param_limits#c3
665671
&& cs.fetch_uint_to(32, limits_[0]) // underload:uint32

crypto/block/block.h

+10
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,16 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC
216216
return proc_coll.print(os);
217217
}
218218

219+
struct ImportedMsgQueueLimits {
220+
// Default values
221+
td::uint32 max_bytes = 1 << 16;
222+
td::uint32 max_msgs = 30;
223+
bool deserialize(vm::CellSlice& cs);
224+
ImportedMsgQueueLimits operator*(td::uint32 x) const {
225+
return {max_bytes * x, max_msgs * x};
226+
}
227+
};
228+
219229
struct ParamLimits {
220230
enum { limits_cnt = 4 };
221231
enum { cl_underload = 0, cl_normal = 1, cl_soft = 2, cl_medium = 3, cl_hard = 4 };

tdutils/td/utils/StringBuilder.h

+15
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,19 @@ std::enable_if_t<std::is_arithmetic<T>::value, string> to_string(const T &x) {
149149
return sb.as_cslice().str();
150150
}
151151

152+
template <class SB>
153+
struct LambdaPrintHelper {
154+
SB& sb;
155+
};
156+
template <class SB, class F>
157+
SB& operator<<(const LambdaPrintHelper<SB>& helper, F&& f) {
158+
f(helper.sb);
159+
return helper.sb;
160+
}
161+
struct LambdaPrint {};
162+
163+
inline LambdaPrintHelper<td::StringBuilder> operator<<(td::StringBuilder& sb, const LambdaPrint&) {
164+
return LambdaPrintHelper<td::StringBuilder>{sb};
165+
}
166+
152167
} // namespace td

tdutils/td/utils/logging.h

+7-3
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474

7575
#define LOG(level) LOG_IMPL(level, level, true, ::td::Slice())
7676
#define LOG_IF(level, condition) LOG_IMPL(level, level, condition, #condition)
77+
#define FLOG(level) LOG_IMPL(level, level, true, ::td::Slice()) << td::LambdaPrint{} << [&](auto &sb)
7778

7879
#define VLOG(level) LOG_IMPL(DEBUG, level, true, TD_DEFINE_STR(level))
7980
#define VLOG_IF(level, condition) LOG_IMPL(DEBUG, level, condition, TD_DEFINE_STR(level) " " #condition)
@@ -95,13 +96,13 @@ inline bool no_return_func() {
9596
#define DUMMY_LOG_CHECK(condition) LOG_IF(NEVER, !(condition))
9697

9798
#ifdef TD_DEBUG
98-
#if TD_MSVC
99+
#if TD_MSVC
99100
#define LOG_CHECK(condition) \
100101
__analysis_assume(!!(condition)); \
101102
LOG_IMPL(FATAL, FATAL, !(condition), #condition)
102-
#else
103+
#else
103104
#define LOG_CHECK(condition) LOG_IMPL(FATAL, FATAL, !(condition) && no_return_func(), #condition)
104-
#endif
105+
#endif
105106
#else
106107
#define LOG_CHECK DUMMY_LOG_CHECK
107108
#endif
@@ -263,6 +264,9 @@ class Logger {
263264
sb_ << other;
264265
return *this;
265266
}
267+
LambdaPrintHelper<td::Logger> operator<<(const LambdaPrint &) {
268+
return LambdaPrintHelper<td::Logger>{*this};
269+
}
266270

267271
MutableCSlice as_cslice() {
268272
return sb_.as_cslice();

test/test-ton-collator.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,10 @@ class TestNode : public td::actor::Actor {
373373
void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir,
374374
td::Timestamp timeout, td::Promise<std::string> promise) override {
375375
}
376+
void download_out_msg_queue_proof(
377+
ton::ShardIdFull dst_shard, std::vector<ton::BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
378+
td::Timestamp timeout, td::Promise<std::vector<td::Ref<ton::validator::OutMsgQueueProof>>> promise) override {
379+
}
376380

377381
void new_key_block(ton::validator::BlockHandle handle) override {
378382
}

tl/generate/scheme/ton_api.tl

+6
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,10 @@ tonNode.success = tonNode.Success;
454454
tonNode.archiveNotFound = tonNode.ArchiveInfo;
455455
tonNode.archiveInfo id:long = tonNode.ArchiveInfo;
456456

457+
tonNode.importedMsgQueueLimits max_bytes:int max_msgs:int = ImportedMsgQueueLimits;
458+
tonNode.outMsgQueueProof queue_proofs:bytes block_state_proofs:bytes msg_counts:(vector int) = tonNode.OutMsgQueueProof;
459+
tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof;
460+
457461
tonNode.forgetPeer = tonNode.ForgetPeer;
458462

459463
---functions---
@@ -483,6 +487,8 @@ tonNode.downloadKeyBlockProofLink block:tonNode.blockIdExt = tonNode.Data;
483487
tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo;
484488
tonNode.getShardArchiveInfo masterchain_seqno:int shard_prefix:tonNode.shardId = tonNode.ArchiveInfo;
485489
tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data;
490+
tonNode.getOutMsgQueueProof dst_shard:tonNode.shardId blocks:(vector tonNode.blockIdExt)
491+
limits:tonNode.importedMsgQueueLimits = tonNode.OutMsgQueueProof;
486492

487493
tonNode.getCapabilities = tonNode.Capabilities;
488494

tl/generate/scheme/ton_api.tlo

720 Bytes
Binary file not shown.

validator/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ set(VALIDATOR_HEADERS
4646
interfaces/db.h
4747
interfaces/external-message.h
4848
interfaces/liteserver.h
49+
interfaces/out-msg-queue-proof.h
4950
interfaces/proof.h
5051
interfaces/shard.h
5152
interfaces/signature-set.h

validator/full-node-shard.cpp

+98
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "net/download-proof.hpp"
3838
#include "net/get-next-key-blocks.hpp"
3939
#include "net/download-archive-slice.hpp"
40+
#include "impl/out-msg-queue-proof.hpp"
4041

4142
#include "td/utils/Random.h"
4243

@@ -669,6 +670,62 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
669670
query.offset_, query.max_size_, std::move(promise));
670671
}
671672

673+
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
674+
td::Promise<td::BufferSlice> promise) {
675+
std::vector<BlockIdExt> blocks;
676+
for (const auto &x : query.blocks_) {
677+
BlockIdExt id = create_block_id(x);
678+
if (!id.is_valid_ext()) {
679+
promise.set_error(td::Status::Error("invalid block_id"));
680+
return;
681+
}
682+
if (!shard_is_ancestor(shard_, id.shard_full())) {
683+
promise.set_error(td::Status::Error("query in wrong overlay"));
684+
return;
685+
}
686+
blocks.push_back(create_block_id(x));
687+
}
688+
ShardIdFull dst_shard = create_shard_id(query.dst_shard_);
689+
if (!dst_shard.is_valid_ext()) {
690+
promise.set_error(td::Status::Error("invalid shard"));
691+
return;
692+
}
693+
block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_};
694+
if (limits.max_msgs > 512) {
695+
promise.set_error(td::Status::Error("max_msgs is too big"));
696+
return;
697+
}
698+
if (limits.max_bytes > (1 << 21)) {
699+
promise.set_error(td::Status::Error("max_bytes is too big"));
700+
return;
701+
}
702+
FLOG(DEBUG) {
703+
sb << "Got query getOutMsgQueueProof to shard " << dst_shard.to_str() << " from blocks";
704+
for (const BlockIdExt &id : blocks) {
705+
sb << " " << id.id.to_str();
706+
}
707+
sb << " from " << src;
708+
};
709+
td::actor::send_closure(
710+
full_node_, &FullNode::get_out_msg_queue_query_token,
711+
[=, manager = validator_manager_, blocks = std::move(blocks),
712+
promise = std::move(promise)](td::Result<std::unique_ptr<ActionToken>> R) mutable {
713+
TRY_RESULT_PROMISE(promise, token, std::move(R));
714+
auto P =
715+
td::PromiseCreator::lambda([promise = std::move(promise), token = std::move(token)](
716+
td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> R) mutable {
717+
if (R.is_error()) {
718+
promise.set_result(create_serialize_tl_object<ton_api::tonNode_outMsgQueueProofEmpty>());
719+
} else {
720+
promise.set_result(serialize_tl_object(R.move_as_ok(), true));
721+
}
722+
});
723+
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", dst_shard, std::move(blocks), limits, manager,
724+
std::move(P))
725+
.release();
726+
});
727+
}
728+
672729
void FullNodeShardImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query,
673730
td::Promise<td::BufferSlice> promise) {
674731
if (!active_) {
@@ -944,6 +1001,47 @@ void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFu
9441001
.release();
9451002
}
9461003

1004+
void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
1005+
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
1006+
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
1007+
// TODO: maybe more complex download (like other requests here)
1008+
auto &b = choose_neighbour();
1009+
if (b.adnl_id == adnl::AdnlNodeIdShort::zero()) {
1010+
promise.set_error(td::Status::Error(ErrorCode::notready, "no nodes"));
1011+
return;
1012+
}
1013+
std::vector<tl_object_ptr<ton_api::tonNode_blockIdExt>> blocks_tl;
1014+
for (const BlockIdExt &id : blocks) {
1015+
blocks_tl.push_back(create_tl_block_id(id));
1016+
}
1017+
td::BufferSlice query = create_serialize_tl_object<ton_api::tonNode_getOutMsgQueueProof>(
1018+
create_tl_shard_id(dst_shard), std::move(blocks_tl),
1019+
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(limits.max_bytes, limits.max_msgs));
1020+
1021+
auto P = td::PromiseCreator::lambda(
1022+
[=, promise = std::move(promise), blocks = std::move(blocks)](td::Result<td::BufferSlice> R) mutable {
1023+
if (R.is_error()) {
1024+
promise.set_result(R.move_as_error());
1025+
return;
1026+
}
1027+
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::tonNode_OutMsgQueueProof>(R.move_as_ok(), true));
1028+
ton_api::downcast_call(
1029+
*f, td::overloaded(
1030+
[&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
1031+
promise.set_error(td::Status::Error("node doesn't have this block"));
1032+
},
1033+
[&](ton_api::tonNode_outMsgQueueProof &x) {
1034+
delay_action(
1035+
[=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable {
1036+
promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x));
1037+
},
1038+
td::Timestamp::now());
1039+
}));
1040+
});
1041+
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_,
1042+
"get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_);
1043+
}
1044+
9471045
void FullNodeShardImpl::set_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
9481046
CHECK(!handle_);
9491047
handle_ = std::move(handle);

validator/full-node-shard.h

+3
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ class FullNodeShard : public td::actor::Actor {
6666
td::Promise<std::vector<BlockIdExt>> promise) = 0;
6767
virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
6868
td::Timestamp timeout, td::Promise<std::string> promise) = 0;
69+
virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
70+
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
71+
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) = 0;
6972

7073
virtual void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0;
7174

validator/full-node-shard.hpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ class FullNodeShardImpl : public FullNodeShard {
139139
td::Promise<td::BufferSlice> promise);
140140
void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveSlice &query,
141141
td::Promise<td::BufferSlice> promise);
142-
// void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_prepareNextKeyBlockProof &query,
143-
// td::Promise<td::BufferSlice> promise);
142+
void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
143+
td::Promise<td::BufferSlice> promise);
144144
void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query, td::Promise<td::BufferSlice> promise);
145145
void receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data);
146146

@@ -183,6 +183,9 @@ class FullNodeShardImpl : public FullNodeShard {
183183
td::Promise<std::vector<BlockIdExt>> promise) override;
184184
void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
185185
td::Timestamp timeout, td::Promise<std::string> promise) override;
186+
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
187+
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
188+
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override;
186189

187190
void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;
188191

validator/full-node.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "td/actor/MultiPromise.h"
2222
#include "full-node.h"
2323
#include "common/delay.h"
24+
#include "impl/out-msg-queue-proof.hpp"
2425
#include "td/utils/Random.h"
2526
#include "ton/ton-tl.hpp"
2627

@@ -430,6 +431,24 @@ void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFull sh
430431
timeout, std::move(promise));
431432
}
432433

434+
void FullNodeImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
435+
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
436+
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
437+
if (blocks.empty()) {
438+
promise.set_value({});
439+
return;
440+
}
441+
// All blocks are expected to have the same minsplit shard prefix
442+
auto shard = get_shard(blocks[0].shard_full());
443+
if (shard.empty()) {
444+
VLOG(FULL_NODE_WARNING) << "dropping download msg queue query to unknown shard";
445+
promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready"));
446+
return;
447+
}
448+
td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits,
449+
timeout, std::move(promise));
450+
}
451+
433452
td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard) {
434453
if (shard.is_masterchain()) {
435454
return shards_[ShardIdFull{masterchainId}].actor.get();
@@ -557,6 +576,11 @@ void FullNodeImpl::process_block_candidate_broadcast(BlockIdExt block_id, Catcha
557576
std::move(data));
558577
}
559578

579+
void FullNodeImpl::get_out_msg_queue_query_token(td::Promise<std::unique_ptr<ActionToken>> promise) {
580+
td::actor::send_closure(out_msg_queue_query_token_manager_, &TokenManager::get_token, 1, 0, td::Timestamp::in(10.0),
581+
std::move(promise));
582+
}
583+
560584
void FullNodeImpl::set_validator_telemetry_filename(std::string value) {
561585
validator_telemetry_filename_ = std::move(value);
562586
update_validator_telemetry_collector();
@@ -645,6 +669,12 @@ void FullNodeImpl::start_up() {
645669
td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, shard_prefix, std::move(tmp_dir),
646670
timeout, std::move(promise));
647671
}
672+
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
673+
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
674+
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override {
675+
td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits,
676+
timeout, std::move(promise));
677+
}
648678

649679
void new_key_block(BlockHandle handle) override {
650680
td::actor::send_closure(id_, &FullNodeImpl::new_key_block, std::move(handle));

validator/full-node.h

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class FullNode : public td::actor::Actor {
9191
virtual void process_block_broadcast(BlockBroadcast broadcast) = 0;
9292
virtual void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno,
9393
td::uint32 validator_set_hash, td::BufferSlice data) = 0;
94+
virtual void get_out_msg_queue_query_token(td::Promise<std::unique_ptr<ActionToken>> promise) = 0;
9495

9596
virtual void set_validator_telemetry_filename(std::string value) = 0;
9697

validator/full-node.hpp

+8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <map>
2929
#include <set>
3030
#include <queue>
31+
#include <token-manager.h>
3132

3233
namespace ton {
3334

@@ -79,6 +80,9 @@ class FullNodeImpl : public FullNode {
7980
void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise<std::vector<BlockIdExt>> promise);
8081
void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
8182
td::Timestamp timeout, td::Promise<std::string> promise);
83+
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
84+
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
85+
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise);
8286

8387
void got_key_block_config(td::Ref<ConfigHolder> config);
8488
void new_key_block(BlockHandle handle);
@@ -87,6 +91,7 @@ class FullNodeImpl : public FullNode {
8791
void process_block_broadcast(BlockBroadcast broadcast) override;
8892
void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
8993
td::BufferSlice data) override;
94+
void get_out_msg_queue_query_token(td::Promise<std::unique_ptr<ActionToken>> promise) override;
9095

9196
void set_validator_telemetry_filename(std::string value) override;
9297

@@ -160,6 +165,9 @@ class FullNodeImpl : public FullNode {
160165
PublicKeyHash validator_telemetry_collector_key_ = PublicKeyHash::zero();
161166

162167
void update_validator_telemetry_collector();
168+
169+
td::actor::ActorOwn<TokenManager> out_msg_queue_query_token_manager_ =
170+
td::actor::create_actor<TokenManager>("tokens", /* max_tokens = */ 1);
163171
};
164172

165173
} // namespace fullnode

0 commit comments

Comments
 (0)