Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

refactor(int-queue) persistent sync & tests #520

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions block-util/src/queue/proto.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::fmt::{Debug, Formatter};

use anyhow::bail;
use bytes::Bytes;
Expand All @@ -9,7 +11,7 @@ use tl_proto::{TlError, TlRead, TlResult, TlWrite};
use crate::tl;

/// Representation of an internal messages queue diff.
#[derive(Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueueDiff {
/// Computed hash of this diff.
///
Expand Down Expand Up @@ -62,6 +64,7 @@ impl TlWrite for QueueDiff {
+ processed_to_map::size_hint(&self.processed_to)
+ 2 * QueueKey::SIZE_HINT
+ messages_list::size_hint(&self.messages)
+ partition_router_list::size_hint(&self.partition_router)
}

fn write_to<P>(&self, packet: &mut P)
Expand Down Expand Up @@ -140,7 +143,7 @@ pub struct QueueStateRef<'tl> {
}

/// A header for a persistent internal messages queue state.
#[derive(Clone, PartialEq, Eq, TlWrite, TlRead)]
#[derive(Debug, Clone, PartialEq, Eq, TlWrite, TlRead)]
#[tl(boxed, id = "block.queueStateHeader", scheme = "proto.tl")]
pub struct QueueStateHeader {
#[tl(with = "tl::shard_ident")]
Expand Down Expand Up @@ -552,6 +555,18 @@ pub struct DestAddr {
pub account: HashBytes,
}

impl DestAddr {
pub const MIN: Self = Self {
workchain: i8::MIN,
account: HashBytes::ZERO,
};

pub const MAX: Self = Self {
workchain: i8::MAX,
account: HashBytes([0xff; 32]),
};
}

impl TryFrom<IntAddr> for DestAddr {
type Error = anyhow::Error;

Expand Down Expand Up @@ -618,6 +633,23 @@ mod tests {

#[test]
fn queue_diff_binary_repr() {
let mut partition_router = BTreeMap::new();

let addr1 = DestAddr {
workchain: 0,
account: HashBytes::from([0x01; 32]),
};
let addr2 = DestAddr {
workchain: 1,
account: HashBytes::from([0x02; 32]),
};

partition_router.insert(QueuePartition::LowPriority, {
let mut set = BTreeSet::new();
set.insert(addr1);
set
});

let mut diff = QueueDiff {
hash: HashBytes::ZERO, // NOTE: Uninitialized
prev_hash: HashBytes::from([0x33; 32]),
Expand Down Expand Up @@ -646,7 +678,7 @@ mod tests {
HashBytes::from([0x02; 32]),
HashBytes::from([0x03; 32]),
],
partition_router: Default::default(),
partition_router,
};

let bytes = tl_proto::serialize(&diff);
Expand All @@ -665,6 +697,23 @@ mod tests {
for seqno in 1..=10 {
let prev_hash = queue_diffs.last().map(|diff| diff.hash).unwrap_or_default();

let mut partition_router = BTreeMap::new();

let addr1 = DestAddr {
workchain: 0,
account: HashBytes::from([0x01; 32]),
};
let addr2 = DestAddr {
workchain: 1,
account: HashBytes::from([0x02; 32]),
};

partition_router.insert(QueuePartition::LowPriority, {
let mut set = BTreeSet::new();
set.insert(addr2);
set
});

let mut diff = QueueDiff {
hash: HashBytes::ZERO, // NOTE: Uninitialized
prev_hash,
Expand Down Expand Up @@ -693,7 +742,7 @@ mod tests {
HashBytes::from([0x02; 32]),
HashBytes::from([0x03; 32]),
],
partition_router: Default::default(),
partition_router,
};

// NOTE: We need this for the hash computation.
Expand Down
4 changes: 3 additions & 1 deletion collator/src/collator/messages_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,9 @@ impl MessagesReader {
partition_router.clear();
for (account_addr, msgs_count) in stats {
if msgs_count > MAX_PAR_0_MSGS_COUNT_LIMIT {
partition_router.insert(account_addr, QueuePartition::LowPriority);
partition_router
.insert(account_addr, QueuePartition::LowPriority)
.unwrap();
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion collator/src/collator/messages_reader/new_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ impl<V: InternalMessageValue> NewMessagesState<V> {
for stats in partition_all_ranges_msgs_stats {
for account_addr in stats.statistics().keys() {
self.partition_router
.insert(account_addr.clone(), partition_id.try_into().unwrap());
.insert(account_addr.clone(), partition_id.try_into().unwrap())
.unwrap();
}
}
}
Expand Down
11 changes: 5 additions & 6 deletions collator/src/internal_queue/state/uncommitted_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,22 +229,21 @@ impl UncommittedStateStdImpl {
let min_message = diff_statistics.min_message();
let max_message = diff_statistics.max_message();

for (index, (partition, values)) in diff_statistics.iter().enumerate() {
for value in values {
for (partition, values) in diff_statistics.iter() {
for (index, value) in values.iter().enumerate() {
let (addr, count) = value;
let dest_addr = DestAddr::try_from(addr.clone())?;
let addr = tl_proto::serialize(dest_addr);
let dest = DestAddr::try_from(addr.clone())?;
let key = StatKey {
shard_ident: *shard_ident,
partition: *partition,
min_message: *min_message,
max_message: *max_message,
index: index as u64,
dest,
};

self.storage
.internal_queue_storage()
.insert_statistics_uncommitted(batch, &key, &addr, *count)?;
.insert_statistics_uncommitted(batch, &key, *count)?;
}
}

Expand Down
45 changes: 43 additions & 2 deletions collator/src/internal_queue/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ impl PartitionRouter {
.unwrap_or_default()
}

pub fn insert(&mut self, addr: IntAddr, partition: QueuePartition) {
pub fn insert(&mut self, addr: IntAddr, partition: QueuePartition) -> Result<()> {
if partition == QueuePartition::NormalPriority {
bail!("Attempt to insert address into normal priority partition");
}

let _ = self.inner.entry(partition).or_default().insert(addr);

Ok(())
}

pub fn clear(&mut self) {
Expand Down Expand Up @@ -250,7 +256,7 @@ pub struct QueueRange {
pub to: QueueKey,
}

#[derive(Default, Clone)]
#[derive(Debug, Default, Clone)]
pub struct QueueStatistics {
statistics: FastHashMap<IntAddr, u64>,
}
Expand Down Expand Up @@ -295,6 +301,14 @@ impl QueueStatistics {
}
}

impl PartialEq for QueueStatistics {
fn eq(&self, other: &Self) -> bool {
self.statistics == other.statistics
}
}

impl Eq for QueueStatistics {}

impl IntoIterator for QueueStatistics {
type Item = (IntAddr, u64);
type IntoIter = hash_map::IntoIter<IntAddr, u64>;
Expand Down Expand Up @@ -368,3 +382,30 @@ impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for
}
}
}

#[test]
fn test_insert() {
let mut map = PartitionRouter::new(); // Предположим, MyStruct имеет inner

let addr1 = IntAddr::Std(StdAddr::new(1, HashBytes::from([1; 32])));
let addr2 = IntAddr::Std(StdAddr::new(2, HashBytes::from([2; 32])));
let partition = QueuePartition::LowPriority;

// Первая вставка
map.insert(addr1.clone(), partition).unwrap();

// Вторая вставка
map.insert(addr2.clone(), partition).unwrap();

let partition1 = map.get_partition(&addr1);

assert_eq!(partition, partition1);

let partition2 = map.get_partition(&addr2);

assert_eq!(partition, partition2);

// Проверяем содержимое
// assert!(map.inner.get(&partition).unwrap().contains(&addr1));
// assert!(map.inner.get(&partition).unwrap().contains(&addr2));
}
3 changes: 1 addition & 2 deletions collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,6 @@ where
queue_diff_with_messages,
*queue_diff_stuff.diff_hash(),
prev_block_id,
queue_diff_stuff.diff().max_message,
));

let prev_ids_info = block_stuff.construct_prev_id()?;
Expand All @@ -1308,7 +1307,7 @@ where
}

// apply required previous queue diffs
while let Some((diff, diff_hash, block_id, max_message_key)) = prev_queue_diffs.pop() {
while let Some((diff, diff_hash, block_id)) = prev_queue_diffs.pop() {
let statistics = (&diff, block_id.shard).into();
self.mq_adapter
.apply_diff(diff, block_id.as_short_id(), &diff_hash, statistics)?;
Expand Down
Loading
Loading