diff --git a/block-util/src/queue/proto.rs b/block-util/src/queue/proto.rs index 5519ee0df..c91e7cd47 100644 --- a/block-util/src/queue/proto.rs +++ b/block-util/src/queue/proto.rs @@ -1,4 +1,6 @@ use std::collections::{BTreeMap, BTreeSet}; +use std::fmt; +use std::fmt::{Debug, Formatter}; use anyhow::bail; use bytes::Bytes; @@ -9,7 +11,7 @@ use tl_proto::{TlError, TlRead, TlResult, TlWrite}; use crate::tl; /// Representation of an internal messages queue diff. -#[derive(Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct QueueDiff { /// Computed hash of this diff. /// @@ -62,6 +64,7 @@ impl TlWrite for QueueDiff { + processed_to_map::size_hint(&self.processed_to) + 2 * QueueKey::SIZE_HINT + messages_list::size_hint(&self.messages) + + partition_router_list::size_hint(&self.partition_router) } fn write_to

(&self, packet: &mut P) @@ -140,7 +143,7 @@ pub struct QueueStateRef<'tl> { } /// A header for a persistent internal messages queue state. -#[derive(Clone, PartialEq, Eq, TlWrite, TlRead)] +#[derive(Debug, Clone, PartialEq, Eq, TlWrite, TlRead)] #[tl(boxed, id = "block.queueStateHeader", scheme = "proto.tl")] pub struct QueueStateHeader { #[tl(with = "tl::shard_ident")] @@ -552,6 +555,18 @@ pub struct DestAddr { pub account: HashBytes, } +impl DestAddr { + pub const MIN: Self = Self { + workchain: i8::MIN, + account: HashBytes::ZERO, + }; + + pub const MAX: Self = Self { + workchain: i8::MAX, + account: HashBytes([0xff; 32]), + }; +} + impl TryFrom for DestAddr { type Error = anyhow::Error; @@ -618,6 +633,23 @@ mod tests { #[test] fn queue_diff_binary_repr() { + let mut partition_router = BTreeMap::new(); + + let addr1 = DestAddr { + workchain: 0, + account: HashBytes::from([0x01; 32]), + }; + let addr2 = DestAddr { + workchain: 1, + account: HashBytes::from([0x02; 32]), + }; + + partition_router.insert(QueuePartition::LowPriority, { + let mut set = BTreeSet::new(); + set.insert(addr1); + set + }); + let mut diff = QueueDiff { hash: HashBytes::ZERO, // NOTE: Uninitialized prev_hash: HashBytes::from([0x33; 32]), @@ -646,7 +678,7 @@ mod tests { HashBytes::from([0x02; 32]), HashBytes::from([0x03; 32]), ], - partition_router: Default::default(), + partition_router, }; let bytes = tl_proto::serialize(&diff); @@ -665,6 +697,23 @@ mod tests { for seqno in 1..=10 { let prev_hash = queue_diffs.last().map(|diff| diff.hash).unwrap_or_default(); + let mut partition_router = BTreeMap::new(); + + let addr1 = DestAddr { + workchain: 0, + account: HashBytes::from([0x01; 32]), + }; + let addr2 = DestAddr { + workchain: 1, + account: HashBytes::from([0x02; 32]), + }; + + partition_router.insert(QueuePartition::LowPriority, { + let mut set = BTreeSet::new(); + set.insert(addr2); + set + }); + let mut diff = QueueDiff { hash: HashBytes::ZERO, // NOTE: Uninitialized prev_hash, @@ -693,7 +742,7 @@ mod tests { HashBytes::from([0x02; 32]), HashBytes::from([0x03; 32]), ], - partition_router: Default::default(), + partition_router, }; // NOTE: We need this for the hash computation. diff --git a/collator/src/collator/messages_reader/mod.rs b/collator/src/collator/messages_reader/mod.rs index f19b6f604..9a608abae 100644 --- a/collator/src/collator/messages_reader/mod.rs +++ b/collator/src/collator/messages_reader/mod.rs @@ -308,7 +308,9 @@ impl MessagesReader { partition_router.clear(); for (account_addr, msgs_count) in stats { if msgs_count > MAX_PAR_0_MSGS_COUNT_LIMIT { - partition_router.insert(account_addr, QueuePartition::LowPriority); + partition_router + .insert(account_addr, QueuePartition::LowPriority) + .unwrap(); } } } diff --git a/collator/src/collator/messages_reader/new_messages.rs b/collator/src/collator/messages_reader/new_messages.rs index 9207eda4f..39f20c636 100644 --- a/collator/src/collator/messages_reader/new_messages.rs +++ b/collator/src/collator/messages_reader/new_messages.rs @@ -51,7 +51,8 @@ impl NewMessagesState { for stats in partition_all_ranges_msgs_stats { for account_addr in stats.statistics().keys() { self.partition_router - .insert(account_addr.clone(), partition_id.try_into().unwrap()); + .insert(account_addr.clone(), partition_id.try_into().unwrap()) + .unwrap(); } } } diff --git a/collator/src/internal_queue/state/uncommitted_state.rs b/collator/src/internal_queue/state/uncommitted_state.rs index 163711383..e25f7d70d 100644 --- a/collator/src/internal_queue/state/uncommitted_state.rs +++ b/collator/src/internal_queue/state/uncommitted_state.rs @@ -229,22 +229,21 @@ impl UncommittedStateStdImpl { let min_message = diff_statistics.min_message(); let max_message = diff_statistics.max_message(); - for (index, (partition, values)) in diff_statistics.iter().enumerate() { - for value in values { + for (partition, values) in diff_statistics.iter() { + for (index, value) in values.iter().enumerate() { let (addr, count) = value; - let dest_addr = DestAddr::try_from(addr.clone())?; - let addr = tl_proto::serialize(dest_addr); + let dest = DestAddr::try_from(addr.clone())?; let key = StatKey { shard_ident: *shard_ident, partition: *partition, min_message: *min_message, max_message: *max_message, - index: index as u64, + dest, }; self.storage .internal_queue_storage() - .insert_statistics_uncommitted(batch, &key, &addr, *count)?; + .insert_statistics_uncommitted(batch, &key, *count)?; } } diff --git a/collator/src/internal_queue/types.rs b/collator/src/internal_queue/types.rs index 669a9f30d..b4fe83064 100644 --- a/collator/src/internal_queue/types.rs +++ b/collator/src/internal_queue/types.rs @@ -37,8 +37,14 @@ impl PartitionRouter { .unwrap_or_default() } - pub fn insert(&mut self, addr: IntAddr, partition: QueuePartition) { + pub fn insert(&mut self, addr: IntAddr, partition: QueuePartition) -> Result<()> { + if partition == QueuePartition::NormalPriority { + bail!("Attempt to insert address into normal priority partition"); + } + let _ = self.inner.entry(partition).or_default().insert(addr); + + Ok(()) } pub fn clear(&mut self) { @@ -250,7 +256,7 @@ pub struct QueueRange { pub to: QueueKey, } -#[derive(Default, Clone)] +#[derive(Debug, Default, Clone)] pub struct QueueStatistics { statistics: FastHashMap, } @@ -295,6 +301,14 @@ impl QueueStatistics { } } +impl PartialEq for QueueStatistics { + fn eq(&self, other: &Self) -> bool { + self.statistics == other.statistics + } +} + +impl Eq for QueueStatistics {} + impl IntoIterator for QueueStatistics { type Item = (IntAddr, u64); type IntoIter = hash_map::IntoIter; @@ -368,3 +382,30 @@ impl From<(&QueueDiffWithMessages, ShardIdent)> for } } } + +#[test] +fn test_insert() { + let mut map = PartitionRouter::new(); // Предположим, MyStruct имеет inner + + let addr1 = IntAddr::Std(StdAddr::new(1, HashBytes::from([1; 32]))); + let addr2 = IntAddr::Std(StdAddr::new(2, HashBytes::from([2; 32]))); + let partition = QueuePartition::LowPriority; + + // Первая вставка + map.insert(addr1.clone(), partition).unwrap(); + + // Вторая вставка + map.insert(addr2.clone(), partition).unwrap(); + + let partition1 = map.get_partition(&addr1); + + assert_eq!(partition, partition1); + + let partition2 = map.get_partition(&addr2); + + assert_eq!(partition, partition2); + + // Проверяем содержимое + // assert!(map.inner.get(&partition).unwrap().contains(&addr1)); + // assert!(map.inner.get(&partition).unwrap().contains(&addr2)); +} diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 4c592a765..51d8c67cc 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -1295,7 +1295,6 @@ where queue_diff_with_messages, *queue_diff_stuff.diff_hash(), prev_block_id, - queue_diff_stuff.diff().max_message, )); let prev_ids_info = block_stuff.construct_prev_id()?; @@ -1308,7 +1307,7 @@ where } // apply required previous queue diffs - while let Some((diff, diff_hash, block_id, max_message_key)) = prev_queue_diffs.pop() { + while let Some((diff, diff_hash, block_id)) = prev_queue_diffs.pop() { let statistics = (&diff, block_id.shard).into(); self.mq_adapter .apply_diff(diff, block_id.as_short_id(), &diff_hash, statistics)?; diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index 435b831cb..e38c95233 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -1,5 +1,4 @@ use std::collections::{BTreeMap, BTreeSet}; -use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -26,8 +25,7 @@ use tycho_collator::internal_queue::state::uncommitted_state::{ use tycho_collator::internal_queue::types::{ DiffStatistics, InternalMessageValue, PartitionRouter, QueueDiffWithMessages, QueueShardRange, }; -use tycho_collator::test_utils::prepare_test_storage; -use tycho_util::FastHashMap; +use tycho_storage::Storage; #[derive(Clone, Debug, PartialEq, Eq)] struct StoredObject { @@ -86,20 +84,128 @@ impl InternalMessageValue for StoredObject { } } -fn create_stored_object(key: u64, dest_str: &str) -> anyhow::Result> { - let dest = IntAddr::Std(StdAddr::from_str(dest_str)?); +fn create_stored_object(key: u64, dest_addr: DestAddr) -> anyhow::Result> { + let dest = dest_addr.to_int_addr(); Ok(Arc::new(StoredObject { key, dest })) } +fn test_statistics_check_statistics( + queue: &QueueImpl, + dest_1_low_priority: DestAddr, + dest_2_low_priority: DestAddr, + dest_3_normal_priority: DestAddr, +) -> anyhow::Result<()> { + // check two diff statistics + let statistics_low_priority_partition = + queue.load_statistics(QueuePartition::LowPriority, vec![QueueShardRange { + shard_ident: ShardIdent::new_full(0), + from: QueueKey { + lt: 1, + hash: HashBytes::default(), + }, + to: QueueKey { + lt: 36000, + hash: HashBytes::default(), + }, + }])?; + + let addr_1_stat = statistics_low_priority_partition + .statistics() + .get(&dest_1_low_priority.to_int_addr()) + .unwrap(); + let addr_2_stat = statistics_low_priority_partition + .statistics() + .get(&dest_2_low_priority.to_int_addr()) + .unwrap(); + + assert_eq!(*addr_1_stat, 20000); + assert_eq!(*addr_2_stat, 10000); + + let statistics_normal_priority_partition = + queue.load_statistics(QueuePartition::NormalPriority, vec![QueueShardRange { + shard_ident: ShardIdent::new_full(0), + from: QueueKey { + lt: 1, + hash: HashBytes::default(), + }, + to: QueueKey { + lt: 36000, + hash: HashBytes::default(), + }, + }])?; + + let addr_3_stat = statistics_normal_priority_partition + .statistics() + .get(&dest_3_normal_priority.to_int_addr()) + .unwrap(); + assert_eq!(*addr_3_stat, 2000); + + // check first diff + let statistics_low_priority_partition = + queue.load_statistics(QueuePartition::LowPriority, vec![QueueShardRange { + shard_ident: ShardIdent::new_full(0), + from: QueueKey { + lt: 1, + hash: HashBytes::default(), + }, + to: QueueKey { + lt: 16000, + hash: HashBytes::default(), + }, + }])?; + + let addr_1_stat = statistics_low_priority_partition + .statistics() + .get(&dest_1_low_priority.to_int_addr()) + .unwrap(); + let addr_2_stat = statistics_low_priority_partition + .statistics() + .get(&dest_2_low_priority.to_int_addr()) + .unwrap(); + + assert_eq!(*addr_1_stat, 10000); + assert_eq!(*addr_2_stat, 5000); + + // check second diff + let statistics_low_priority_partition = + queue.load_statistics(QueuePartition::LowPriority, vec![QueueShardRange { + shard_ident: ShardIdent::new_full(0), + from: QueueKey { + lt: 20000, + hash: HashBytes::default(), + }, + to: QueueKey { + lt: 36000, + hash: HashBytes::default(), + }, + }])?; + + let addr_1_stat = statistics_low_priority_partition + .statistics() + .get(&dest_1_low_priority.to_int_addr()) + .unwrap(); + let addr_2_stat = statistics_low_priority_partition + .statistics() + .get(&dest_2_low_priority.to_int_addr()) + .unwrap(); + + assert_eq!(*addr_1_stat, 10000); + assert_eq!(*addr_2_stat, 5000); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_queue() -> anyhow::Result<()> { - let (storage, _tmp_dir) = prepare_test_storage().await.unwrap(); + let (storage, _tmp_dir) = Storage::new_temp().await?; let queue_factory = QueueFactoryStdImpl { uncommitted_state_factory: UncommittedStateImplFactory { storage: storage.clone(), }, - committed_state_factory: CommittedStateImplFactory { storage }, + committed_state_factory: CommittedStateImplFactory { + storage: storage.clone(), + }, config: QueueConfig { gc_interval: Duration::from_secs(1), }, @@ -109,166 +215,322 @@ async fn test_queue() -> anyhow::Result<()> { queue_factory.create(); // create first block with queue diff - let block = BlockIdShort { + let block1 = BlockIdShort { shard: ShardIdent::new_full(0), seqno: 0, }; let mut diff = QueueDiffWithMessages::new(); - let stored_objects = vec![ - create_stored_object( - 1, - "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 2, - "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 3, - "0:7d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 4, - "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 5, - "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - ]; + let mut partition_router = PartitionRouter::default(); - for stored_object in &stored_objects { + let dest_1_low_priority = + DestAddr::try_from(IntAddr::Std(StdAddr::new(-1, HashBytes::from([1; 32]))))?; + let dest_2_low_priority = + DestAddr::try_from(IntAddr::Std(StdAddr::new(-1, HashBytes::from([2; 32]))))?; + + // low priority + for i in 1..=10000 { + let stored_object = create_stored_object(i, dest_1_low_priority)?; diff.messages .insert(stored_object.key(), stored_object.clone()); + partition_router.insert(stored_object.dest.clone(), QueuePartition::LowPriority)?; } - let mut partition_router = PartitionRouter::default(); + for i in 10001..=15000 { + let stored_object = create_stored_object(i, dest_2_low_priority)?; + diff.messages + .insert(stored_object.key(), stored_object.clone()); + partition_router.insert(stored_object.dest.clone(), QueuePartition::LowPriority)?; + } - for stored_object in &stored_objects { - partition_router.insert(stored_object.dest.clone(), QueuePartition::LowPriority); + let dest_3_normal_priority = + DestAddr::try_from(IntAddr::Std(StdAddr::new(0, HashBytes::from([3; 32]))))?; + + // normal priority + for i in 15001..=16000 { + let stored_object = create_stored_object(i, dest_3_normal_priority)?; + diff.messages + .insert(stored_object.key(), stored_object.clone()); } + assert_eq!( + partition_router.get_partition(&dest_1_low_priority.to_int_addr()), + QueuePartition::LowPriority + ); + assert_eq!( + partition_router.get_partition(&dest_2_low_priority.to_int_addr()), + QueuePartition::LowPriority + ); + assert_eq!( + partition_router.get_partition(&dest_3_normal_priority.to_int_addr()), + QueuePartition::NormalPriority + ); + let diff_with_messages = QueueDiffWithMessages { messages: diff.messages, processed_to: diff.processed_to, partition_router, }; - let statistics = (&diff_with_messages, block.shard).into(); + let diff_statistics: DiffStatistics = (&diff_with_messages, block1.shard).into(); + assert_eq!(diff_with_messages.messages.len(), 16000); + + // check low priority statistics + diff_statistics + .iter() + .filter(|(partition, _)| partition == &&QueuePartition::LowPriority) + .for_each(|(_partition, statistics)| { + assert_eq!(statistics.iter().count(), 2); + + let addr1_count = statistics.get(&dest_1_low_priority.to_int_addr()).unwrap(); + assert_eq!(*addr1_count, 10000); + + let addr2_count = statistics.get(&dest_2_low_priority.to_int_addr()).unwrap(); + assert_eq!(*addr2_count, 5000); + }); + + // check normal priority statistics + diff_statistics + .iter() + .filter(|(partition, _)| partition == &&QueuePartition::NormalPriority) + .for_each(|(_partition, statistics)| { + assert_eq!(statistics.iter().count(), 1); + + let addr3_count = statistics + .get(&dest_3_normal_priority.to_int_addr()) + .unwrap(); + assert_eq!(*addr3_count, 1000); + }); queue.apply_diff( diff_with_messages, - block, + block1, &HashBytes::from([1; 32]), - statistics, + diff_statistics, )?; - - let top_blocks = vec![(block, true)]; - - queue.commit_diff(&top_blocks)?; + // end block 1 diff // create second block with queue diff let block2 = BlockIdShort { - shard: ShardIdent::new_full(1), + shard: ShardIdent::new_full(0), seqno: 1, }; - let mut diff = QueueDiffWithMessages::new(); - let stored_objects2 = vec![ - create_stored_object( - 1, - "0:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 2, - "0:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 3, - "0:7d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 4, - "0:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 5, - "0:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - ]; + let mut partition_router = PartitionRouter::default(); - for stored_object in &stored_objects2 { + let dest_1_low_priority = + DestAddr::try_from(IntAddr::Std(StdAddr::new(-1, HashBytes::from([1; 32]))))?; + let dest_2_low_priority = + DestAddr::try_from(IntAddr::Std(StdAddr::new(-1, HashBytes::from([2; 32]))))?; + + // low priority + for i in 20001..=30000 { + let stored_object = create_stored_object(i, dest_1_low_priority)?; diff.messages .insert(stored_object.key(), stored_object.clone()); + partition_router.insert(stored_object.dest.clone(), QueuePartition::LowPriority)?; } - let top_blocks = vec![(block2, true)]; + for i in 30001..=35000 { + let stored_object = create_stored_object(i, dest_2_low_priority)?; + diff.messages + .insert(stored_object.key(), stored_object.clone()); + partition_router.insert(stored_object.dest.clone(), QueuePartition::LowPriority)?; + } - let mut partition_router = PartitionRouter::default(); + let dest_3_normal_priority = + DestAddr::try_from(IntAddr::Std(StdAddr::new(0, HashBytes::from([3; 32]))))?; - for stored_object in &stored_objects2 { - partition_router.insert(stored_object.dest.clone(), QueuePartition::LowPriority); + // normal priority + for i in 35001..=36000 { + let stored_object = create_stored_object(i, dest_3_normal_priority)?; + diff.messages + .insert(stored_object.key(), stored_object.clone()); } + assert_eq!( + partition_router.get_partition(&dest_1_low_priority.to_int_addr()), + QueuePartition::LowPriority + ); + assert_eq!( + partition_router.get_partition(&dest_2_low_priority.to_int_addr()), + QueuePartition::LowPriority + ); + assert_eq!( + partition_router.get_partition(&dest_3_normal_priority.to_int_addr()), + QueuePartition::NormalPriority + ); + let diff_with_messages = QueueDiffWithMessages { messages: diff.messages, processed_to: diff.processed_to, partition_router, }; - let statistics = (&diff_with_messages, block2.shard).into(); + let diff_statistics: DiffStatistics = (&diff_with_messages, block2.shard).into(); + assert_eq!(diff_with_messages.messages.len(), 16000); + + // check low priority statistics + diff_statistics + .iter() + .filter(|(partition, _)| partition == &&QueuePartition::LowPriority) + .for_each(|(_partition, statistics)| { + assert_eq!(statistics.iter().count(), 2); + + let addr1_count = statistics.get(&dest_1_low_priority.to_int_addr()).unwrap(); + assert_eq!(*addr1_count, 10000); + + let addr2_count = statistics.get(&dest_2_low_priority.to_int_addr()).unwrap(); + assert_eq!(*addr2_count, 5000); + }); + + // check normal priority statistics + diff_statistics + .iter() + .filter(|(partition, _)| partition == &&QueuePartition::NormalPriority) + .for_each(|(_partition, statistics)| { + assert_eq!(statistics.iter().count(), 1); + + let addr3_count = statistics + .get(&dest_3_normal_priority.to_int_addr()) + .unwrap(); + assert_eq!(*addr3_count, 1000); + }); queue.apply_diff( diff_with_messages, block2, - &HashBytes::from([0; 32]), - statistics, + &HashBytes::from([1; 32]), + diff_statistics, + )?; + + // end block 2 diff + + test_statistics_check_statistics( + &queue, + dest_1_low_priority, + dest_2_low_priority, + dest_3_normal_priority, + )?; + + queue.commit_diff(&vec![(block1, true)])?; + test_statistics_check_statistics( + &queue, + dest_1_low_priority, + dest_2_low_priority, + dest_3_normal_priority, )?; - queue.commit_diff(&top_blocks)?; + // test iterator + // test first diff iterator let mut ranges = Vec::new(); let queue_range = QueueShardRange { shard_ident: ShardIdent::new_full(0), from: QueueKey { - lt: 1, + lt: 0, hash: HashBytes::default(), }, to: QueueKey { - lt: 4, + lt: 16000, hash: HashBytes::default(), }, }; ranges.push(queue_range); - let partition = QueuePartition::LowPriority; - let iterators = queue.iterator(partition, ranges, ShardIdent::new_full(-1))?; + let iterators = queue.iterator( + QueuePartition::LowPriority, + ranges.clone(), + ShardIdent::new_full(-1), + )?; let mut iterator_manager = StatesIteratorsManager::new(iterators); - iterator_manager.next().ok(); - let loaded_stored_object = iterator_manager.next(); + let mut read_count = 0; + while let Some(_) = iterator_manager.next()? { + read_count += 1; + } + assert_eq!(read_count, 15000); - let loaded_stored_object = loaded_stored_object.unwrap().unwrap(); - assert_eq!(stored_objects[3], loaded_stored_object.message); + let iterators = queue.iterator( + QueuePartition::NormalPriority, + ranges, + ShardIdent::new_full(0), + )?; + let mut iterator_manager = StatesIteratorsManager::new(iterators); + let mut read_count = 0; + while let Some(_) = iterator_manager.next()? { + read_count += 1; + } + + assert_eq!(read_count, 1000); - let current_position = iterator_manager.current_position(); - let mut expected_position = FastHashMap::default(); - expected_position.insert(ShardIdent::new_full(0), QueueKey { - lt: 4, - hash: HashBytes::default(), - }); + // check two diff iterator + let mut ranges = Vec::new(); - assert_eq!(expected_position, current_position); + let queue_range = QueueShardRange { + shard_ident: ShardIdent::new_full(0), + from: QueueKey { + lt: 0, + hash: HashBytes::default(), + }, + to: QueueKey { + lt: 36000, + hash: HashBytes::default(), + }, + }; + + ranges.push(queue_range); + + let iterators = queue.iterator( + QueuePartition::LowPriority, + ranges.clone(), + ShardIdent::new_full(-1), + )?; + + let mut iterator_manager = StatesIteratorsManager::new(iterators); + let mut read_count = 0; + while let Some(_) = iterator_manager.next()? { + read_count += 1; + } + assert_eq!(read_count, 30000); + + let iterators = queue.iterator( + QueuePartition::NormalPriority, + ranges, + ShardIdent::new_full(0), + )?; + let mut iterator_manager = StatesIteratorsManager::new(iterators); + let mut read_count = 0; + while let Some(_) = iterator_manager.next()? { + read_count += 1; + } + + assert_eq!(read_count, 2000); + + // test commit all diffs and check statistics + queue.commit_diff(&vec![(block2, true)])?; + test_statistics_check_statistics( + &queue, + dest_1_low_priority, + dest_2_low_priority, + dest_3_normal_priority, + )?; + queue.clear_uncommitted_state()?; + test_statistics_check_statistics( + &queue, + dest_1_low_priority, + dest_2_low_priority, + dest_3_normal_priority, + )?; Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_queue_clear() -> anyhow::Result<()> { - let (storage, _tmp_dir) = prepare_test_storage().await.unwrap(); + let (storage, _tmp_dir) = Storage::new_temp().await?; let queue_factory = QueueFactoryStdImpl { uncommitted_state_factory: UncommittedStateImplFactory { @@ -288,10 +550,10 @@ async fn test_queue_clear() -> anyhow::Result<()> { }; let mut diff = QueueDiffWithMessages::new(); - let stored_objects = vec![create_stored_object( - 1, - "1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?]; + let stored_objects = vec![create_stored_object(1, DestAddr { + workchain: 1, + account: HashBytes::from([1; 32]), + })?]; for stored_object in &stored_objects { diff.messages @@ -330,7 +592,6 @@ async fn test_queue_clear() -> anyhow::Result<()> { ranges.push(queue_range); let partition = QueuePartition::NormalPriority; - let iterators = queue.iterator(partition, ranges.clone(), ShardIdent::new_full(1))?; let mut iterator_manager = StatesIteratorsManager::new(iterators); @@ -346,86 +607,6 @@ async fn test_queue_clear() -> anyhow::Result<()> { Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn test_statistics() -> anyhow::Result<()> { - let (storage, _tmp_dir) = prepare_test_storage().await.unwrap(); - - let queue_factory = QueueFactoryStdImpl { - uncommitted_state_factory: UncommittedStateImplFactory { - storage: storage.clone(), - }, - committed_state_factory: CommittedStateImplFactory { storage }, - config: QueueConfig { - gc_interval: Duration::from_secs(1), - }, - }; - - let queue: QueueImpl = - queue_factory.create(); - let block = BlockIdShort { - shard: ShardIdent::new_full(0), - seqno: 0, - }; - let mut diff = QueueDiffWithMessages::new(); - - let stored_objects = vec![create_stored_object( - 1, - "1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?]; - - for stored_object in &stored_objects { - diff.messages - .insert(stored_object.key(), stored_object.clone()); - } - - let start_key = *diff.messages.iter().next().unwrap().0; - let end_key = *diff.messages.iter().last().unwrap().0; - let diff_with_messages = QueueDiffWithMessages { - messages: diff.messages, - processed_to: diff.processed_to, - partition_router: Default::default(), - }; - - let statistics: DiffStatistics = (&diff_with_messages, block.shard).into(); - - for stat in statistics.iter() { - assert_eq!(stat.1.len(), 1); - } - - queue.apply_diff( - diff_with_messages, - block, - &HashBytes::from([1; 32]), - statistics, - )?; - - let top_blocks = vec![(block, true)]; - - queue.commit_diff(&top_blocks)?; - - let partition = QueuePartition::NormalPriority; - - let range = QueueShardRange { - shard_ident: ShardIdent::new_full(0), - from: start_key, - to: end_key, - }; - - let ranges = vec![range.clone()]; - - let stat = queue.load_statistics(partition, ranges)?; - - assert_eq!(*stat.statistics().iter().next().unwrap().1, 1); - - let ranges = vec![range.clone(), range]; - - let stat = queue.load_statistics(partition, ranges)?; - - assert_eq!(*stat.statistics().iter().next().unwrap().1, 2); - - Ok(()) -} - #[test] fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> { let mut out_msg = OutMsgDescr::default(); @@ -608,10 +789,9 @@ fn create_dump_msg_envelope(message: Lazy) -> Lazy { }) .unwrap() } - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_queue_tail() -> anyhow::Result<()> { - let (storage, _tmp_dir) = prepare_test_storage().await.unwrap(); + let (storage, _tmp_dir) = Storage::new_temp().await?; let queue_factory = QueueFactoryStdImpl { uncommitted_state_factory: UncommittedStateImplFactory { @@ -638,26 +818,22 @@ async fn test_queue_tail() -> anyhow::Result<()> { let mut diff_mc2 = QueueDiffWithMessages::new(); let stored_objects = vec![ - create_stored_object( - 1, - "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 2, - "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 3, - "0:7d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 4, - "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, - create_stored_object( - 5, - "-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", - )?, + create_stored_object(1, DestAddr { + workchain: -1, + account: HashBytes::from([1; 32]), + })?, + create_stored_object(2, DestAddr { + workchain: -1, + account: HashBytes::from([2; 32]), + })?, + create_stored_object(3, DestAddr { + workchain: 0, + account: HashBytes::from([3; 32]), + })?, + create_stored_object(4, DestAddr { + workchain: -1, + account: HashBytes::from([4; 32]), + })?, ]; if let Some(stored_object) = stored_objects.first() { diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index 37182d2be..fb6e4cc4d 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -134,7 +134,7 @@ weedb::tables! { pub block_connections: tables::BlockConnections, pub shards_internal_messages: tables::ShardsInternalMessages, pub shards_internal_messages_uncommitted: tables::ShardsInternalMessagesSession, - pub internal_messages_statistics_commited: tables::InternalMessagesDestStat, + pub internal_messages_statistics_committed: tables::InternalMessagesDestStat, pub internal_messages_statistics_uncommitted: tables::InternalMessagesDestStatUncommitted, } } diff --git a/storage/src/store/internal_queue/mod.rs b/storage/src/store/internal_queue/mod.rs index 6d6468e78..d128aa205 100644 --- a/storage/src/store/internal_queue/mod.rs +++ b/storage/src/store/internal_queue/mod.rs @@ -28,21 +28,20 @@ impl InternalQueueStorage { &self, batch: &mut WriteBatchWithTransaction, key: &StatKey, - dest: &[u8], count: u64, ) -> Result<()> { let cf = self.db.internal_messages_statistics_uncommitted.cf(); - let mut key_buffer = Vec::with_capacity(StatKey::SIZE_HINT); - key.serialize(&mut key_buffer); - - let mut value_buffer = Vec::with_capacity(std::mem::size_of::() + dest.len()); - - value_buffer.extend_from_slice(&count.to_be_bytes()); - value_buffer.extend_from_slice(dest); - - batch.put_cf(&cf, &key_buffer, &value_buffer); + self.insert_statistics(batch, &cf, key, count) + } - Ok(()) + pub fn insert_statistics_committed( + &self, + batch: &mut WriteBatchWithTransaction, + key: &StatKey, + count: u64, + ) -> Result<()> { + let cf = self.db.internal_messages_statistics_committed.cf(); + self.insert_statistics(batch, &cf, key, count) } pub fn collect_committed_stats_in_range( @@ -56,10 +55,10 @@ impl InternalQueueStorage { ) -> Result<()> { let mut read_config = self .db - .internal_messages_statistics_commited + .internal_messages_statistics_committed .new_read_config(); read_config.set_snapshot(snapshot); - let cf = self.db.internal_messages_statistics_commited.cf(); + let cf = self.db.internal_messages_statistics_committed.cf(); let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config); @@ -101,7 +100,7 @@ impl InternalQueueStorage { partition, min_message: from, max_message: QueueKey::MIN, - index: 0, + dest: DestAddr::MIN, }; let from_key_bytes = { @@ -129,11 +128,11 @@ impl InternalQueueStorage { break; } - let (count_bytes, dest_bytes) = v.split_at(8); - let dest_addr = tl_proto::deserialize::(dest_bytes)?; + let count_bytes = v; + let count = u64::from_be_bytes(count_bytes.try_into().unwrap()); - let entry = result.entry(dest_addr.to_int_addr()).or_insert(0); + let entry = result.entry(current_key.dest.to_int_addr()).or_insert(0); *entry += count; } _ => { @@ -166,36 +165,78 @@ impl InternalQueueStorage { let mut reader = QueueStateReader::begin_from_mapped(mapped.as_slice(), &top_update)?; - let cf = this.db.shards_internal_messages.cf(); + let messages_cf = this.db.shards_internal_messages.cf(); let mut batch = weedb::rocksdb::WriteBatch::default(); let mut buffer = Vec::new(); - while let Some(cell) = reader.read_next_message()? { - let msg_hash = cell.repr_hash(); - let msg = cell.parse::>()?; - let MsgInfo::Int(int_msg_info) = &msg.info else { - anyhow::bail!("non-internal message in the queue in msg {msg_hash}"); - }; - - let IntAddr::Std(dest) = &int_msg_info.dst else { - anyhow::bail!("non-std destination address in msg {msg_hash}"); - }; - - let key = ShardsInternalMessagesKey { - // TODO !!! read it - partition: QueuePartition::NormalPriority, - shard_ident, - internal_message_key: QueueKey { - lt: int_msg_info.created_lt, - hash: *msg_hash, - }, - }; - - buffer.clear(); - buffer.push(dest.workchain as u8); - buffer.extend_from_slice(&dest.prefix().to_be_bytes()); - BocHeader::::with_root(cell.as_ref()).encode(&mut buffer); - batch.put_cf(&cf, key.to_vec(), &buffer); + let mut statistics: FastHashMap> = + FastHashMap::default(); + while let Some(_) = reader.read_next_diff()? { + let current_diff_index = reader.next_queue_diff_index() - 1; + + while let Some(cell) = reader.read_next_message()? { + let msg_hash = cell.repr_hash(); + let msg = cell.parse::>()?; + let MsgInfo::Int(int_msg_info) = &msg.info else { + anyhow::bail!("non-internal message in the queue in msg {msg_hash}"); + }; + + let IntAddr::Std(dest) = &int_msg_info.dst else { + anyhow::bail!("non-std destination address in msg {msg_hash}"); + }; + + let dest_addr = DestAddr { + workchain: dest.workchain, + account: dest.address, + }; + let current_diff = &reader.state().header.queue_diffs[current_diff_index]; + let partition = current_diff + .partition_router + .iter() + .find_map(|(key, value)| { + if value.contains(&dest_addr) { + Some(key) + } else { + None + } + }) + .cloned() + .unwrap_or(QueuePartition::NormalPriority); + + let key = ShardsInternalMessagesKey { + partition, + shard_ident, + internal_message_key: QueueKey { + lt: int_msg_info.created_lt, + hash: *msg_hash, + }, + }; + + buffer.clear(); + buffer.push(dest.workchain as u8); + buffer.extend_from_slice(&dest.prefix().to_be_bytes()); + BocHeader::::with_root(cell.as_ref()).encode(&mut buffer); + batch.put_cf(&messages_cf, key.to_vec(), &buffer); + + let partition_stats = statistics.entry(partition).or_default(); + *partition_stats.entry(dest_addr).or_insert(0) += 1; + } + + let current_diff = &reader.state().header.queue_diffs[current_diff_index]; + + for (partition, statistics) in statistics.drain() { + for (dest, count) in statistics.iter() { + let key = StatKey { + shard_ident, + partition, + min_message: current_diff.min_message, + max_message: current_diff.max_message, + dest: *dest, + }; + + this.insert_statistics_committed(&mut batch, &key, *count)?; + } + } } reader.finish()?; @@ -249,14 +290,14 @@ impl InternalQueueStorage { partition: range.partition, min_message: range.from, max_message: QueueKey::MIN, - index: 0, + dest: DestAddr::MIN, }; let to_stat_key = StatKey { shard_ident: range.shard_ident, partition: range.partition, min_message: range.to, max_message: QueueKey::MAX, - index: u64::MAX, + dest: DestAddr::MAX, }; self.commit_range( @@ -265,7 +306,7 @@ impl InternalQueueStorage { &from_stat_key.to_vec(), &to_stat_key.to_vec(), &self.db.internal_messages_statistics_uncommitted.cf(), - &self.db.internal_messages_statistics_commited.cf(), + &self.db.internal_messages_statistics_committed.cf(), )?; } @@ -320,7 +361,7 @@ impl InternalQueueStorage { partition: range.partition, min_message: range.from, max_message: QueueKey::MIN, - index: 0, + dest: DestAddr::MIN, } .to_vec(); @@ -329,7 +370,7 @@ impl InternalQueueStorage { partition: range.partition, min_message: range.to, max_message: QueueKey::MAX, - index: u64::MAX, + dest: DestAddr::MAX, } .to_vec(); @@ -342,7 +383,7 @@ impl InternalQueueStorage { self.delete_range( &mut batch, - &self.db.internal_messages_statistics_commited.cf(), + &self.db.internal_messages_statistics_committed.cf(), &start_stat_key, &end_stat_key, )?; @@ -464,4 +505,22 @@ impl InternalQueueStorage { Ok(()) } + + fn insert_statistics( + &self, + batch: &mut WriteBatchWithTransaction, + cf: &BoundedCfHandle<'_>, + key: &StatKey, + count: u64, + ) -> Result<()> { + let mut key_buffer = Vec::with_capacity(StatKey::SIZE_HINT); + key.serialize(&mut key_buffer); + + let mut value_buffer = Vec::with_capacity(std::mem::size_of::()); + value_buffer.extend_from_slice(&count.to_be_bytes()); + + batch.put_cf(cf, &key_buffer, &value_buffer); + + Ok(()) + } } diff --git a/storage/src/store/internal_queue/model.rs b/storage/src/store/internal_queue/model.rs index cc9ad9ad6..f67476336 100644 --- a/storage/src/store/internal_queue/model.rs +++ b/storage/src/store/internal_queue/model.rs @@ -1,6 +1,6 @@ use everscale_types::cell::HashBytes; use everscale_types::models::ShardIdent; -use tycho_block_util::queue::{QueueKey, QueuePartition}; +use tycho_block_util::queue::{DestAddr, QueueKey, QueuePartition}; use crate::util::{StoredValue, StoredValueBuffer}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -120,13 +120,42 @@ impl StoredValue for QueueKey { } } +impl StoredValue for DestAddr { + const SIZE_HINT: usize = 1 + 32; + type OnStackSlice = [u8; Self::SIZE_HINT]; + + fn serialize(&self, buffer: &mut T) { + buffer.write_raw_slice(&self.workchain.to_be_bytes()); + buffer.write_raw_slice(&self.account.0); + } + + fn deserialize(reader: &mut &[u8]) -> Self + where + Self: Sized, + { + if reader.len() < Self::SIZE_HINT { + panic!("Insufficient data for deserialization"); + } + + let workchain = reader[0] as i8; + *reader = &reader[1..]; + + let mut account_bytes = [0u8; 32]; + account_bytes.copy_from_slice(&reader[..32]); + let account = HashBytes(account_bytes); + *reader = &reader[32..]; + + Self { workchain, account } + } +} + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct StatKey { pub shard_ident: ShardIdent, pub partition: QueuePartition, pub min_message: QueueKey, pub max_message: QueueKey, - pub index: u64, + pub dest: DestAddr, } impl StatKey { @@ -135,14 +164,14 @@ impl StatKey { partition: QueuePartition, min_message: QueueKey, max_message: QueueKey, - index: u64, + dest: DestAddr, ) -> Self { Self { shard_ident, partition, min_message, max_message, - index, + dest, } } } @@ -151,19 +180,19 @@ impl StoredValue for StatKey { const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueuePartition::SIZE_HINT + QueueKey::SIZE_HINT * 2 - + std::mem::size_of::(); + + DestAddr::SIZE_HINT; type OnStackSlice = [u8; ShardIdent::SIZE_HINT + QueuePartition::SIZE_HINT + QueueKey::SIZE_HINT * 2 - + std::mem::size_of::()]; + + DestAddr::SIZE_HINT]; fn serialize(&self, buffer: &mut T) { self.shard_ident.serialize(buffer); self.partition.serialize(buffer); self.min_message.serialize(buffer); self.max_message.serialize(buffer); - buffer.write_raw_slice(&self.index.to_be_bytes()); + self.dest.serialize(buffer); } fn deserialize(reader: &mut &[u8]) -> Self { @@ -175,18 +204,14 @@ impl StoredValue for StatKey { let partition = QueuePartition::deserialize(reader); let min_message = QueueKey::deserialize(reader); let max_message = QueueKey::deserialize(reader); - - let mut index_bytes = [0u8; std::mem::size_of::()]; - index_bytes.copy_from_slice(&reader[..std::mem::size_of::()]); - let index = u64::from_be_bytes(index_bytes); - *reader = &reader[std::mem::size_of::()..]; + let dest = DestAddr::deserialize(reader); Self { shard_ident, partition, min_message, max_message, - index, + dest, } } } diff --git a/storage/src/store/persistent_state/queue_state/reader.rs b/storage/src/store/persistent_state/queue_state/reader.rs index 0cae254b5..4d3ffac56 100644 --- a/storage/src/store/persistent_state/queue_state/reader.rs +++ b/storage/src/store/persistent_state/queue_state/reader.rs @@ -3,7 +3,7 @@ use everscale_types::boc::de::ProcessedCells; use everscale_types::cell::{Cell, CellFamily}; use everscale_types::models::OutMsgQueueUpdates; use tl_proto::TlRead; -use tycho_block_util::queue::{QueueStateHeader, QueueStateRef}; +use tycho_block_util::queue::{QueueDiff, QueueStateHeader, QueueStateRef}; pub struct QueueStateReader<'a> { state: QueueStateRef<'a>, @@ -52,55 +52,74 @@ impl<'a> QueueStateReader<'a> { &self.state.header } + pub fn next_queue_diff_index(&self) -> usize { + self.queue_diff_index + } + + pub fn read_next_diff(&mut self) -> Result> { + if let Some(queue_diff) = self.state.header.queue_diffs.get(self.queue_diff_index) { + self.queue_diff_index += 1; + self.message_index = 0; + Ok(Some(queue_diff)) + } else { + anyhow::ensure!(self.parsed_boc.is_none(), "too many messages"); + Ok(None) + } + } + pub fn read_next_message(&mut self) -> Result> { use everscale_types::boc::de; const MAX_ALLOWED_ROOTS_PER_CHUNK: usize = 10000; - loop { - let Some(queue_diff) = self.state.header.queue_diffs.get(self.queue_diff_index) else { - anyhow::ensure!(self.parsed_boc.is_none(), "too many messages"); - return Ok(None); - }; + // get current queue diff + let queue_diff = match self + .state + .header + .queue_diffs + .get(self.queue_diff_index.saturating_sub(1)) + { + Some(diff) => diff, + None => return Ok(None), + }; - let Some(expected_hash) = queue_diff.messages.get(self.message_index) else { - // Move to the queue diff - self.queue_diff_index += 1; - self.message_index = 0; - continue; - }; + // get message hash + let expected_hash = match queue_diff.messages.get(self.message_index) { + Some(hash) => hash, + None => return Ok(None), + }; - loop { - if let Some(boc) = &mut self.parsed_boc { - if let Some(cell) = boc.next() { - if boc.roots.is_empty() { - self.parsed_boc = None; - self.boc_index += 1; - } - - self.message_index += 1; - anyhow::ensure!(cell.repr_hash() == expected_hash, "message hash mismatch"); - return Ok(Some(cell)); + loop { + if let Some(boc) = &mut self.parsed_boc { + if let Some(cell) = boc.next() { + if boc.roots.is_empty() { + self.parsed_boc = None; + self.boc_index += 1; } - } - let Some(data) = self.state.messages.get(self.boc_index) else { - anyhow::bail!("not enough messages"); - }; - let boc = de::BocHeader::decode(data, &de::Options { - min_roots: None, - // NOTE: We must specify the max number of roots to avoid the default - // limit (which is quite low since it is rarely used in practice). - max_roots: Some(MAX_ALLOWED_ROOTS_PER_CHUNK), - })?; - - let mut roots = boc.roots().to_vec(); - let cells = boc.finalize(&mut Cell::empty_context())?; - - // NOTE: Reverse root indices here to allow the `ParsedBoc` iterator to just pop them. - roots.reverse(); - self.parsed_boc = Some(ParsedBoc { roots, cells }); + self.message_index += 1; + anyhow::ensure!(cell.repr_hash() == expected_hash, "message hash mismatch"); + + return Ok(Some(cell)); + } } + + let Some(data) = self.state.messages.get(self.boc_index) else { + anyhow::bail!("not enough messages"); + }; + let boc = de::BocHeader::decode(data, &de::Options { + min_roots: None, + // NOTE: We must specify the max number of roots to avoid the default + // limit (which is quite low since it is rarely used in practice). + max_roots: Some(MAX_ALLOWED_ROOTS_PER_CHUNK), + })?; + + let mut roots = boc.roots().to_vec(); + let cells = boc.finalize(&mut Cell::empty_context())?; + + // NOTE: Reverse root indices here to allow the `ParsedBoc` iterator to just pop them. + roots.reverse(); + self.parsed_boc = Some(ParsedBoc { roots, cells }); } } diff --git a/storage/src/store/persistent_state/tests.rs b/storage/src/store/persistent_state/tests.rs index e8c78cf30..9e674d544 100644 --- a/storage/src/store/persistent_state/tests.rs +++ b/storage/src/store/persistent_state/tests.rs @@ -370,17 +370,25 @@ async fn persistent_queue_state_read_write() -> Result<()> { tracing::info!(i, chunk_size = %ByteSize(chunk.len() as u64)); } - let mut read_messages = 0; - while let Some(cell) = reader.read_next_message()? { - let msg = cell.parse::>()?; - let MsgInfo::Int(_) = msg.info else { - panic!("unexpected message type"); - }; - assert!(msg.init.is_none()); - assert!(msg.body.is_empty()); - read_messages += 1; + let mut read_messages = FastHashSet::default(); + let mut next_diff_index = 0; + while let Some(_) = reader.read_next_diff()? { + next_diff_index += 1; + while let Some(cell) = reader.read_next_message()? { + let exists = read_messages.insert(*cell.repr_hash()); + assert!(exists, "duplicate message"); + + let msg = cell.parse::>()?; + + matches!(msg.info, MsgInfo::Int(_)); + + assert!(msg.init.is_none()); + assert!(msg.body.is_empty()); + } + assert_eq!(reader.next_queue_diff_index(), next_diff_index); + assert_eq!(read_messages.len(), next_diff_index * 5000); } - assert_eq!(read_messages, target_message_count); + assert_eq!(read_messages.len(), target_message_count); reader.finish()?;