Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feature(collator): int queue sync partition router #519

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions block-util/src/queue/proto.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::fmt::{Debug, Formatter};

use anyhow::bail;
use bytes::Bytes;
Expand All @@ -9,7 +11,7 @@ use tl_proto::{TlError, TlRead, TlResult, TlWrite};
use crate::tl;

/// Representation of an internal messages queue diff.
#[derive(Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueueDiff {
/// Computed hash of this diff.
///
Expand Down Expand Up @@ -62,6 +64,7 @@ impl TlWrite for QueueDiff {
+ processed_to_map::size_hint(&self.processed_to)
+ 2 * QueueKey::SIZE_HINT
+ messages_list::size_hint(&self.messages)
+ partition_router_list::size_hint(&self.partition_router)
}

fn write_to<P>(&self, packet: &mut P)
Expand Down Expand Up @@ -140,7 +143,7 @@ pub struct QueueStateRef<'tl> {
}

/// A header for a persistent internal messages queue state.
#[derive(Clone, PartialEq, Eq, TlWrite, TlRead)]
#[derive(Debug, Clone, PartialEq, Eq, TlWrite, TlRead)]
#[tl(boxed, id = "block.queueStateHeader", scheme = "proto.tl")]
pub struct QueueStateHeader {
#[tl(with = "tl::shard_ident")]
Expand Down Expand Up @@ -618,6 +621,23 @@ mod tests {

#[test]
fn queue_diff_binary_repr() {
let mut partition_router = BTreeMap::new();

let addr1 = DestAddr {
workchain: 0,
account: HashBytes::from([0x01; 32]),
};
let addr2 = DestAddr {
workchain: 1,
account: HashBytes::from([0x02; 32]),
};

partition_router.insert(QueuePartition::LowPriority, {
let mut set = BTreeSet::new();
set.insert(addr1);
set
});

let mut diff = QueueDiff {
hash: HashBytes::ZERO, // NOTE: Uninitialized
prev_hash: HashBytes::from([0x33; 32]),
Expand Down Expand Up @@ -646,7 +666,7 @@ mod tests {
HashBytes::from([0x02; 32]),
HashBytes::from([0x03; 32]),
],
partition_router: Default::default(),
partition_router,
};

let bytes = tl_proto::serialize(&diff);
Expand All @@ -665,6 +685,23 @@ mod tests {
for seqno in 1..=10 {
let prev_hash = queue_diffs.last().map(|diff| diff.hash).unwrap_or_default();

let mut partition_router = BTreeMap::new();

let addr1 = DestAddr {
workchain: 0,
account: HashBytes::from([0x01; 32]),
};
let addr2 = DestAddr {
workchain: 1,
account: HashBytes::from([0x02; 32]),
};

partition_router.insert(QueuePartition::LowPriority, {
let mut set = BTreeSet::new();
set.insert(addr2);
set
});

let mut diff = QueueDiff {
hash: HashBytes::ZERO, // NOTE: Uninitialized
prev_hash,
Expand Down Expand Up @@ -693,7 +730,7 @@ mod tests {
HashBytes::from([0x02; 32]),
HashBytes::from([0x03; 32]),
],
partition_router: Default::default(),
partition_router,
};

// NOTE: We need this for the hash computation.
Expand Down
3 changes: 1 addition & 2 deletions collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,6 @@ where
queue_diff_with_messages,
*queue_diff_stuff.diff_hash(),
prev_block_id,
queue_diff_stuff.diff().max_message,
));

let prev_ids_info = block_stuff.construct_prev_id()?;
Expand All @@ -1308,7 +1307,7 @@ where
}

// apply required previous queue diffs
while let Some((diff, diff_hash, block_id, max_message_key)) = prev_queue_diffs.pop() {
while let Some((diff, diff_hash, block_id)) = prev_queue_diffs.pop() {
let statistics = (&diff, block_id.shard).into();
self.mq_adapter
.apply_diff(diff, block_id.as_short_id(), &diff_hash, statistics)?;
Expand Down
2 changes: 1 addition & 1 deletion storage/src/db/kv_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ weedb::tables! {
pub block_connections: tables::BlockConnections,
pub shards_internal_messages: tables::ShardsInternalMessages,
pub shards_internal_messages_uncommitted: tables::ShardsInternalMessagesSession,
pub internal_messages_statistics_commited: tables::InternalMessagesDestStat,
pub internal_messages_statistics_committed: tables::InternalMessagesDestStat,
pub internal_messages_statistics_uncommitted: tables::InternalMessagesDestStatUncommitted,
}
}
Expand Down
146 changes: 105 additions & 41 deletions storage/src/store/internal_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@ impl InternalQueueStorage {
count: u64,
) -> Result<()> {
let cf = self.db.internal_messages_statistics_uncommitted.cf();
let mut key_buffer = Vec::with_capacity(StatKey::SIZE_HINT);
key.serialize(&mut key_buffer);

let mut value_buffer = Vec::with_capacity(std::mem::size_of::<u64>() + dest.len());

value_buffer.extend_from_slice(&count.to_be_bytes());
value_buffer.extend_from_slice(dest);

batch.put_cf(&cf, &key_buffer, &value_buffer);
self.insert_statistics(batch, &cf, key, dest, count)
}

Ok(())
pub fn insert_statistics_committed(
&self,
batch: &mut WriteBatchWithTransaction<false>,
key: &StatKey,
dest: &[u8],
count: u64,
) -> Result<()> {
let cf = self.db.internal_messages_statistics_committed.cf();
self.insert_statistics(batch, &cf, key, dest, count)
}

pub fn collect_committed_stats_in_range(
Expand All @@ -56,10 +57,10 @@ impl InternalQueueStorage {
) -> Result<()> {
let mut read_config = self
.db
.internal_messages_statistics_commited
.internal_messages_statistics_committed
.new_read_config();
read_config.set_snapshot(snapshot);
let cf = self.db.internal_messages_statistics_commited.cf();
let cf = self.db.internal_messages_statistics_committed.cf();

let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config);

Expand Down Expand Up @@ -166,36 +167,79 @@ impl InternalQueueStorage {

let mut reader = QueueStateReader::begin_from_mapped(mapped.as_slice(), &top_update)?;

let cf = this.db.shards_internal_messages.cf();
let messages_cf = this.db.shards_internal_messages.cf();
let mut batch = weedb::rocksdb::WriteBatch::default();

let mut buffer = Vec::new();
while let Some(cell) = reader.read_next_message()? {
let msg_hash = cell.repr_hash();
let msg = cell.parse::<Message<'_>>()?;
let MsgInfo::Int(int_msg_info) = &msg.info else {
anyhow::bail!("non-internal message in the queue in msg {msg_hash}");
};

let IntAddr::Std(dest) = &int_msg_info.dst else {
anyhow::bail!("non-std destination address in msg {msg_hash}");
};

let key = ShardsInternalMessagesKey {
// TODO !!! read it
partition: QueuePartition::NormalPriority,
shard_ident,
internal_message_key: QueueKey {
lt: int_msg_info.created_lt,
hash: *msg_hash,
},
};

buffer.clear();
buffer.push(dest.workchain as u8);
buffer.extend_from_slice(&dest.prefix().to_be_bytes());
BocHeader::<ahash::RandomState>::with_root(cell.as_ref()).encode(&mut buffer);
batch.put_cf(&cf, key.to_vec(), &buffer);
let mut statistics: FastHashMap<QueuePartition, FastHashMap<DestAddr, u64>> =
FastHashMap::default();
while let Some(_) = reader.read_next_diff()? {
let current_diff_index = reader.next_queue_diff_index() - 1;

while let Some(cell) = reader.read_next_message()? {
let msg_hash = cell.repr_hash();
let msg = cell.parse::<Message<'_>>()?;
let MsgInfo::Int(int_msg_info) = &msg.info else {
anyhow::bail!("non-internal message in the queue in msg {msg_hash}");
};

let IntAddr::Std(dest) = &int_msg_info.dst else {
anyhow::bail!("non-std destination address in msg {msg_hash}");
};

let dest_addr = DestAddr {
workchain: dest.workchain,
account: dest.address,
};
let current_diff = &reader.state().header.queue_diffs[current_diff_index];
let partition = current_diff
.partition_router
.iter()
.find_map(|(key, value)| {
if value.contains(&dest_addr) {
Some(key)
} else {
None
}
})
.cloned()
.unwrap_or(QueuePartition::NormalPriority);

let key = ShardsInternalMessagesKey {
partition,
shard_ident,
internal_message_key: QueueKey {
lt: int_msg_info.created_lt,
hash: *msg_hash,
},
};

buffer.clear();
buffer.push(dest.workchain as u8);
buffer.extend_from_slice(&dest.prefix().to_be_bytes());
BocHeader::<ahash::RandomState>::with_root(cell.as_ref()).encode(&mut buffer);
batch.put_cf(&messages_cf, key.to_vec(), &buffer);

let partition_stats = statistics.entry(partition).or_default();
*partition_stats.entry(dest_addr).or_insert(0) += 1;
}

let current_diff = &reader.state().header.queue_diffs[current_diff_index];

for (partition, statistics) in statistics.drain() {
for (index, (dest, count)) in statistics.iter().enumerate() {
let key = StatKey {
shard_ident,
partition,
min_message: current_diff.min_message,
max_message: current_diff.max_message,
index: index as u64,
};

let acc = tl_proto::serialize(dest);
this.insert_statistics_committed(&mut batch, &key, &acc, *count)?;
}
}
}

reader.finish()?;
Expand Down Expand Up @@ -265,7 +309,7 @@ impl InternalQueueStorage {
&from_stat_key.to_vec(),
&to_stat_key.to_vec(),
&self.db.internal_messages_statistics_uncommitted.cf(),
&self.db.internal_messages_statistics_commited.cf(),
&self.db.internal_messages_statistics_committed.cf(),
)?;
}

Expand Down Expand Up @@ -342,7 +386,7 @@ impl InternalQueueStorage {

self.delete_range(
&mut batch,
&self.db.internal_messages_statistics_commited.cf(),
&self.db.internal_messages_statistics_committed.cf(),
&start_stat_key,
&end_stat_key,
)?;
Expand Down Expand Up @@ -464,4 +508,24 @@ impl InternalQueueStorage {

Ok(())
}

fn insert_statistics(
&self,
batch: &mut WriteBatchWithTransaction<false>,
cf: &BoundedCfHandle<'_>,
key: &StatKey,
dest: &[u8],
count: u64,
) -> Result<()> {
let mut key_buffer = Vec::with_capacity(StatKey::SIZE_HINT);
key.serialize(&mut key_buffer);

let mut value_buffer = Vec::with_capacity(std::mem::size_of::<u64>() + dest.len());
value_buffer.extend_from_slice(&count.to_be_bytes());
value_buffer.extend_from_slice(dest);

batch.put_cf(cf, &key_buffer, &value_buffer);

Ok(())
}
}
Loading
Loading