diff --git a/collator/src/collator/execution_manager.rs b/collator/src/collator/execution_manager.rs index 62f8f0c8e..766131a67 100644 --- a/collator/src/collator/execution_manager.rs +++ b/collator/src/collator/execution_manager.rs @@ -17,10 +17,6 @@ use tycho_util::metrics::HistogramGuard; use tycho_util::sync::rayon_run_fifo; use tycho_util::FastHashMap; -#[cfg(not(feature = "new-message-groups"))] -use super::message_group::MessageGroup; -#[cfg(feature = "new-message-groups")] -use super::message_group_new::MessageGroupNew; use super::mq_iterator_adapter::{InitIteratorMode, QueueIteratorAdapter}; use super::types::{ AccountId, AnchorsCache, BlockCollationData, Dequeued, MessageGroup, MessagesBuffer, @@ -377,689 +373,6 @@ impl ExecutionManager { } } - // when all existing internals and externals were processed should read new internals - if group_opt.is_none() && self.read_new_messages { - // when processing new messages we return group immediately when the next message does not fit it - - // first new messages epoch is from existing internals and externals - // then we read next epoch of new messages only when the previous epoch processed - mq_iterator_adapter - .try_update_new_messages_read_to(max_new_message_key_to_current_shard)?; - - let timer = std::time::Instant::now(); - let mut add_to_groups_elapsed = Duration::ZERO; - - let mut new_internals_read_count = 0; - while let Some(int_msg) = mq_iterator_adapter.next_new_message()? { - assert!(int_msg.is_new); - - new_internals_read_count += 1; - - let timer_add_to_groups = std::time::Instant::now(); - msgs_buffer - .message_groups - .add_message(Box::new(ParsedMessage { - info: MsgInfo::Int(int_msg.item.message.info.clone()), - dst_in_current_shard: true, - cell: int_msg.item.message.cell.clone(), - special_origin: None, - dequeued: None, - })); - add_to_groups_elapsed += timer_add_to_groups.elapsed(); - - if msgs_buffer.message_groups.messages_count() >= self.messages_buffer_limit { - tracing::debug!(target: tracing_targets::COLLATOR, - "message_groups buffer filled on {}/{}, stop reading new internals", - msgs_buffer.message_groups.messages_count(), self.messages_buffer_limit, - ); - break; - } - - if msgs_buffer.message_groups.len() > 1 { - tracing::debug!(target: tracing_targets::COLLATOR, - "next new message does not fit first group, stop reading new internals", - ); - break; - } - } - collation_data.read_new_msgs_from_iterator += new_internals_read_count; - - tracing::debug!(target: tracing_targets::COLLATOR, - "new_internals_read_count={}, buffer int={}, ext={}", - new_internals_read_count, - msgs_buffer.message_groups.int_messages_count(), msgs_buffer.message_groups.ext_messages_count(), - ); - - // when we have 2 groups, the second one contains only one message - // that does not fit first group, - // so append this one message to first group (merge) - group_opt = msgs_buffer.message_groups.extract_merged_group(); - - self.read_new_messages_total_elapsed += timer.elapsed(); - self.read_new_messages_total_elapsed -= add_to_groups_elapsed; - self.add_to_message_groups_total_elapsed += add_to_groups_elapsed; - - // actually, we process all message groups with new messages in one step, - // so we update internals processed_upto each step - if msgs_buffer.message_groups.is_empty() - && msgs_buffer.message_groups.max_message_key() > &QueueKey::MIN - { - // set_int_upto_all_processed(&mut collation_data.processed_upto)?; - update_internals_processed_upto( - &mut collation_data.processed_upto, - self.shard_id, - Some(ProcessedUptoUpdate::Force( - *msgs_buffer.message_groups.max_message_key(), - )), - Some(ProcessedUptoUpdate::Force( - *msgs_buffer.message_groups.max_message_key(), - )), - ); - - // commit processed message to iterator - mq_iterator_adapter.iterator().commit(vec![( - self.shard_id, - *msgs_buffer.message_groups.max_message_key(), - )])?; - - msgs_buffer.message_groups.reset(); - } - } - - // store actual offset of current interator range - if collation_data.processed_upto.processed_offset != msgs_buffer.message_groups.offset() { - collation_data.processed_upto.processed_offset = msgs_buffer.message_groups.offset(); - tracing::debug!(target: tracing_targets::COLLATOR, "updated processed_upto.offset = {}", - collation_data.processed_upto.processed_offset, - ); - } - - Ok(group_opt) - } - - #[cfg(feature = "new-message-groups")] - #[tracing::instrument(skip_all)] - pub async fn get_next_message_group( - &mut self, - msgs_buffer: &mut MessagesBuffer, - anchors_cache: &mut AnchorsCache, - collation_data: &mut BlockCollationData, - mq_iterator_adapter: &mut QueueIteratorAdapter, - max_new_message_key_to_current_shard: &QueueKey, - working_state: &WorkingState, - ) -> Result> { - // messages polling logic differs regarding existing and new messages - - let mut group_opt = None; - - // here iterator may not exist (on the first method call during collation) - // so init iterator for current not fully processed ranges or next available - if mq_iterator_adapter.iterator_is_none() { - tracing::debug!(target: tracing_targets::COLLATOR, - "current iterator not exist, \ - will init iterator for current not fully processed ranges or next available" - ); - mq_iterator_adapter - .try_init_next_range_iterator(&mut collation_data.processed_upto, working_state) - .await?; - } - - // when buffer contains externals from prev collation - // we should process them all before reading existing internals - if msgs_buffer.message_groups.ext_messages_count() > 0 && !self.read_ext_messages { - // just extract message group with externals from buffer - group_opt = msgs_buffer.message_groups.extract_first_group(); - - if msgs_buffer.message_groups.is_empty() { - tracing::debug!(target: tracing_targets::COLLATOR, - "all externals from message_groups buffer where processed, will read existing internals" - ); - - // set all read externals as processed - if let Some(externals) = collation_data.processed_upto.externals.as_mut() { - if externals.processed_to != externals.read_to { - externals.processed_to = externals.read_to; - tracing::debug!(target: tracing_targets::COLLATOR, "updated processed_upto.externals = {:?}", - collation_data.processed_upto.externals, - ); - } - } - - self.last_read_to_anchor_chain_time = None; - - msgs_buffer.message_groups.reset(); - } - } - - // when all externals from prev collation were processed should read existing internals - if group_opt.is_none() && !self.read_ext_messages && !self.read_new_messages { - // for existing messages we use ranged iterator and process maximum possible groups in parallel - - let timer = std::time::Instant::now(); - let mut add_to_groups_elapsed = Duration::ZERO; - - // read messages from iterator and fill messages groups - // until the first group fully loaded - // or max messages buffer limit reached - let mut existing_internals_read_count = 0; - while let Some(int_msg) = mq_iterator_adapter.next_existing_message()? { - assert!(!int_msg.is_new); - - existing_internals_read_count += 1; - - let timer_add_to_groups = std::time::Instant::now(); - msgs_buffer - .message_groups - .add_message(Box::new(ParsedMessage { - info: MsgInfo::Int(int_msg.item.message.info.clone()), - dst_in_current_shard: true, - cell: int_msg.item.message.cell.clone(), - special_origin: None, - dequeued: Some(Dequeued { - same_shard: int_msg.item.source == self.shard_id, - }), - })); - add_to_groups_elapsed += timer_add_to_groups.elapsed(); - - if msgs_buffer.message_groups.messages_count() >= self.messages_buffer_limit { - tracing::debug!(target: tracing_targets::COLLATOR, - "message_groups buffer filled on {}/{}, stop reading existing internals", - msgs_buffer.message_groups.messages_count(), self.messages_buffer_limit, - ); - break; - } - - if msgs_buffer.message_groups.first_group_is_full() { - tracing::debug!(target: tracing_targets::COLLATOR, - "first message group is full, stop reading existing internals", - ); - break; - } - } - collation_data.read_int_msgs_from_iterator += existing_internals_read_count; - - tracing::debug!(target: tracing_targets::COLLATOR, - "existing_internals_read_count={}, buffer int={}, ext={}", - existing_internals_read_count, - msgs_buffer.message_groups.int_messages_count(), msgs_buffer.message_groups.ext_messages_count(), - ); - - group_opt = msgs_buffer.message_groups.extract_first_group(); - - self.read_existing_messages_total_elapsed += timer.elapsed(); - self.read_existing_messages_total_elapsed -= add_to_groups_elapsed; - self.add_to_message_groups_total_elapsed += add_to_groups_elapsed; - - // when message_groups buffer is empty and no more existing internals in current iterator - // then set all read messages as processed - // and try to init iterator for the next available ranges - if msgs_buffer.message_groups.is_empty() - && mq_iterator_adapter.no_pending_existing_internals() - { - tracing::debug!(target: tracing_targets::COLLATOR, - "message_groups buffer is empty and there are no pending existing internals, \ - will try to init iterator for next available ranges" - ); - - // set all read existing internals as processed - let updated_processed_to = - set_int_upto_all_processed(&mut collation_data.processed_upto); - - // commit processed messages to iterator - mq_iterator_adapter - .iterator() - .commit(updated_processed_to)?; - - msgs_buffer.message_groups.reset(); - - let next_range_iterator_initialized = mq_iterator_adapter - .try_init_next_range_iterator( - &mut collation_data.processed_upto, - working_state, - init_iterator_mode, - ) - .await?; - if !next_range_iterator_initialized { - tracing::debug!(target: tracing_targets::COLLATOR, - "next available ranges for internals are not exist or skipped, \ - will read externals" - ); - self.read_ext_messages = true; - } - } - } - - // when all available existing internals were processed should read externals - if group_opt.is_none() && self.read_ext_messages && !self.read_new_messages { - let timer = std::time::Instant::now(); - let mut add_to_groups_elapsed = Duration::ZERO; - - let next_chain_time = collation_data.get_gen_chain_time(); - - let read_next_externals_mode = match mode { - GetNextMessageGroupMode::Continue => ReadNextExternalsMode::ToTheEnd, - GetNextMessageGroupMode::Refill => ReadNextExternalsMode::ToPreviuosReadTo, - }; - - let mut externals_read_count = 0; - loop { - let ParsedExternals { - ext_messages, - current_reader_position, - last_read_to_anchor_chain_time, - was_stopped_on_prev_read_to_reached, - } = CollatorStdImpl::read_next_externals( - &self.shard_id, - anchors_cache, - 3, - next_chain_time, - &mut collation_data.processed_upto.externals, - msgs_buffer.current_ext_reader_position, - read_next_externals_mode, - )?; - msgs_buffer.current_ext_reader_position = current_reader_position; - self.last_read_to_anchor_chain_time = last_read_to_anchor_chain_time; - - externals_read_count += ext_messages.len() as u64; - - let timer_add_to_groups = std::time::Instant::now(); - for ext_msg in ext_messages { - msgs_buffer.message_groups.add_message(ext_msg); - } - add_to_groups_elapsed += timer_add_to_groups.elapsed(); - - if msgs_buffer.message_groups.messages_count() >= self.messages_buffer_limit { - tracing::debug!(target: tracing_targets::COLLATOR, - "message_groups buffer filled on {}/{}, stop reading externals", - msgs_buffer.message_groups.messages_count(), self.messages_buffer_limit, - ); - break; - } - - if msgs_buffer.message_groups.first_group_is_full() { - tracing::debug!(target: tracing_targets::COLLATOR, - "first message group is full, stop reading externals", - ); - break; - } - - if was_stopped_on_prev_read_to_reached { - break; - } - - if !anchors_cache.has_pending_externals() { - break; - } - } - collation_data.read_ext_msgs += externals_read_count; - - tracing::debug!(target: tracing_targets::COLLATOR, - "externals_read_count={}, buffer int={}, ext={}", - externals_read_count, - msgs_buffer.message_groups.int_messages_count(), msgs_buffer.message_groups.ext_messages_count(), - ); - - group_opt = msgs_buffer.message_groups.extract_first_group(); - - self.read_ext_messages_total_elapsed += timer.elapsed(); - self.read_ext_messages_total_elapsed -= add_to_groups_elapsed; - self.add_to_message_groups_total_elapsed += add_to_groups_elapsed; - - if msgs_buffer.message_groups.is_empty() && !anchors_cache.has_pending_externals() { - tracing::debug!(target: tracing_targets::COLLATOR, - "message_groups buffer is empty and there are no pending externals, will read new internals" - ); - - // set all read externals as processed - if let Some(externals) = collation_data.processed_upto.externals.as_mut() { - if externals.processed_to != externals.read_to { - externals.processed_to = externals.read_to; - tracing::debug!(target: tracing_targets::COLLATOR, "updated processed_upto.externals = {:?}", - collation_data.processed_upto.externals, - ); - } - } - - self.last_read_to_anchor_chain_time = None; - - msgs_buffer.message_groups.reset(); - - self.read_new_messages = true; - } - } - - // when all existing internals and externals were processed should read new internals - if group_opt.is_none() && self.read_new_messages { - // when processing new messages we return group immediately when the next message does not fit it - - // first new messages epoch is from existing internals and externals - // then we read next epoch of new messages only when the previous epoch processed - mq_iterator_adapter - .try_update_new_messages_read_to(max_new_message_key_to_current_shard)?; - - let timer = std::time::Instant::now(); - let mut add_to_groups_elapsed = Duration::ZERO; - - let mut new_internals_read_count = 0; - while let Some(int_msg) = mq_iterator_adapter.next_new_message()? { - assert!(int_msg.is_new); - - new_internals_read_count += 1; - - let timer_add_to_groups = std::time::Instant::now(); - msgs_buffer - .message_groups - .add_message(Box::new(ParsedMessage { - info: MsgInfo::Int(int_msg.item.message.info.clone()), - dst_in_current_shard: true, - cell: int_msg.item.message.cell.clone(), - special_origin: None, - dequeued: None, - })); - add_to_groups_elapsed += timer_add_to_groups.elapsed(); - - if msgs_buffer.message_groups.messages_count() >= self.messages_buffer_limit { - tracing::debug!(target: tracing_targets::COLLATOR, - "message_groups buffer filled on {}/{}, stop reading new internals", - msgs_buffer.message_groups.messages_count(), self.messages_buffer_limit, - ); - break; - } - - if msgs_buffer.message_groups.len() > 1 { - tracing::debug!(target: tracing_targets::COLLATOR, - "next new message does not fit first group, stop reading new internals", - ); - break; - } - } - collation_data.read_new_msgs_from_iterator += new_internals_read_count; - - tracing::debug!(target: tracing_targets::COLLATOR, - "new_internals_read_count={}, buffer int={}, ext={}", - new_internals_read_count, - msgs_buffer.message_groups.int_messages_count(), msgs_buffer.message_groups.ext_messages_count(), - ); - - // when we have 2 groups, the second one contains only one message - // that does not fit first group, - // so append this one message to first group (merge) - group_opt = msgs_buffer.message_groups.extract_merged_group(); - - self.read_new_messages_total_elapsed += timer.elapsed(); - self.read_new_messages_total_elapsed -= add_to_groups_elapsed; - self.add_to_message_groups_total_elapsed += add_to_groups_elapsed; - - // actually, we process all message groups with new messages in one step, - // so we update internals processed_upto each step - if msgs_buffer.message_groups.is_empty() - && msgs_buffer.message_groups.max_message_key() > &QueueKey::MIN - { - // set_int_upto_all_processed(&mut collation_data.processed_upto)?; - update_internals_processed_upto( - &mut collation_data.processed_upto, - self.shard_id, - Some(ProcessedUptoUpdate::Force( - *msgs_buffer.message_groups.max_message_key(), - )), - Some(ProcessedUptoUpdate::Force( - *msgs_buffer.message_groups.max_message_key(), - )), - ); - - // commit processed message to iterator - mq_iterator_adapter.iterator().commit(vec![( - self.shard_id, - *msgs_buffer.message_groups.max_message_key(), - )])?; - - msgs_buffer.message_groups.reset(); - } - } - - // store actual offset of current interator range - if collation_data.processed_upto.processed_offset != msgs_buffer.message_groups.offset() { - collation_data.processed_upto.processed_offset = msgs_buffer.message_groups.offset(); - tracing::debug!(target: tracing_targets::COLLATOR, "updated processed_upto.offset = {}", - collation_data.processed_upto.processed_offset, - ); - } - - Ok(group_opt) - } - - #[cfg(feature = "new-message-groups")] - #[tracing::instrument(skip_all)] - pub async fn get_next_message_group( - &mut self, - msgs_buffer: &mut MessagesBuffer, - anchors_cache: &mut AnchorsCache, - collation_data: &mut BlockCollationData, - mq_iterator_adapter: &mut QueueIteratorAdapter, - max_new_message_key_to_current_shard: &QueueKey, - working_state: &WorkingState, - ) -> Result> { - // messages polling logic differs regarding existing and new messages - - let mut group_opt = None; - - // here iterator may not exist (on the first method call during collation) - // so init iterator for current not fully processed ranges or next available - if mq_iterator_adapter.iterator_is_none() { - tracing::debug!(target: tracing_targets::COLLATOR, - "current iterator not exist, \ - will init iterator for current not fully processed ranges or next available" - ); - mq_iterator_adapter - .try_init_next_range_iterator(&mut collation_data.processed_upto, working_state) - .await?; - } - - // when buffer contains externals from prev collation - // we should process them all before reading existing internals - if msgs_buffer.message_groups.ext_messages_count() > 0 && !self.read_ext_messages { - // just extract message group with externals from buffer - group_opt = msgs_buffer.message_groups.extract_first_group(); - - if msgs_buffer.message_groups.is_empty() { - tracing::debug!(target: tracing_targets::COLLATOR, - "all externals from message_groups buffer where processed, will read existing internals" - ); - - // set all read externals as processed - if let Some(externals) = collation_data.processed_upto.externals.as_mut() { - if externals.processed_to != externals.read_to { - externals.processed_to = externals.read_to; - tracing::debug!(target: tracing_targets::COLLATOR, "updated processed_upto.externals = {:?}", - collation_data.processed_upto.externals, - ); - } - } - - self.last_read_to_anchor_chain_time = None; - - msgs_buffer.message_groups.reset(); - } - } - - // when all externals from prev collation were processed should read existing internals - if group_opt.is_none() && !self.read_ext_messages && !self.read_new_messages { - // for existing messages we use ranged iterator and process maximum possible groups in parallel - - let timer = std::time::Instant::now(); - let mut add_to_groups_elapsed = Duration::ZERO; - - // read messages from iterator and fill messages groups - // until the first group fully loaded - // or max messages buffer limit reached - let mut existing_internals_read_count = 0; - while let Some(int_msg) = mq_iterator_adapter.next_existing_message()? { - assert!(!int_msg.is_new); - - existing_internals_read_count += 1; - - let timer_add_to_groups = std::time::Instant::now(); - msgs_buffer - .message_groups - .add_message(Box::new(ParsedMessage { - info: MsgInfo::Int(int_msg.item.message.info.clone()), - dst_in_current_shard: true, - cell: int_msg.item.message.cell.clone(), - special_origin: None, - dequeued: Some(Dequeued { - same_shard: int_msg.item.source == self.shard_id, - }), - })); - add_to_groups_elapsed += timer_add_to_groups.elapsed(); - - if msgs_buffer.message_groups.messages_count() >= self.messages_buffer_limit { - tracing::debug!(target: tracing_targets::COLLATOR, - "message_groups buffer filled on {}/{}, stop reading existing internals", - msgs_buffer.message_groups.messages_count(), self.messages_buffer_limit, - ); - break; - } - - if msgs_buffer.message_groups.first_group_is_full() { - tracing::debug!(target: tracing_targets::COLLATOR, - "first message group is full, stop reading existing internals", - ); - break; - } - } - collation_data.read_int_msgs_from_iterator += existing_internals_read_count; - - tracing::debug!(target: tracing_targets::COLLATOR, - "existing_internals_read_count={}, buffer int={}, ext={}", - existing_internals_read_count, - msgs_buffer.message_groups.int_messages_count(), msgs_buffer.message_groups.ext_messages_count(), - ); - - group_opt = msgs_buffer.message_groups.extract_first_group(); - - self.read_existing_messages_total_elapsed += timer.elapsed(); - self.read_existing_messages_total_elapsed -= add_to_groups_elapsed; - self.add_to_message_groups_total_elapsed += add_to_groups_elapsed; - - // when message_groups buffer is empty and no more existing internals in current iterator - // then set all read messages as processed - // and try to init iterator for the next available ranges - if msgs_buffer.message_groups.is_empty() - && mq_iterator_adapter.no_pending_existing_internals() - { - tracing::debug!(target: tracing_targets::COLLATOR, - "message_groups buffer is empty and there are no pending existing internals, \ - will try to init iterator for next available ranges" - ); - - // set all read existing internals as processed - let updated_processed_to = - set_int_upto_all_processed(&mut collation_data.processed_upto); - - // commit processed messages to iterator - mq_iterator_adapter - .iterator() - .commit(updated_processed_to)?; - - msgs_buffer.message_groups.reset(); - - let next_range_iterator_initialized = mq_iterator_adapter - .try_init_next_range_iterator(&mut collation_data.processed_upto, working_state) - .await?; - if !next_range_iterator_initialized { - tracing::debug!(target: tracing_targets::COLLATOR, - "there are no next available ranges for existing internals iterator, \ - will read externals" - ); - self.read_ext_messages = true; - } - } - } - - // when all available existing internals were processed should externals - if group_opt.is_none() && self.read_ext_messages && !self.read_new_messages { - let timer = std::time::Instant::now(); - let mut add_to_groups_elapsed = Duration::ZERO; - - let mut externals_read_count = 0; - loop { - let ParsedExternals { - ext_messages, - last_read_to_anchor_chain_time, - } = CollatorStdImpl::read_next_externals( - &self.shard_id, - anchors_cache, - 3, - collation_data, - self.ext_messages_reader_started, - )?; - self.ext_messages_reader_started = true; - self.last_read_to_anchor_chain_time = last_read_to_anchor_chain_time; - - externals_read_count += ext_messages.len() as u64; - - let timer_add_to_groups = std::time::Instant::now(); - for ext_msg in ext_messages { - msgs_buffer.message_groups.add_message(ext_msg); - } - add_to_groups_elapsed += timer_add_to_groups.elapsed(); - - if msgs_buffer.message_groups.messages_count() >= self.messages_buffer_limit { - tracing::debug!(target: tracing_targets::COLLATOR, - "message_groups buffer filled on {}/{}, stop reading externals", - msgs_buffer.message_groups.messages_count(), self.messages_buffer_limit, - ); - break; - } - - if msgs_buffer.message_groups.first_group_is_full() { - tracing::debug!(target: tracing_targets::COLLATOR, - "first message group is full, stop reading externals", - ); - break; - } - - if !anchors_cache.has_pending_externals() { - break; - } - } - collation_data.read_ext_msgs += externals_read_count; - - tracing::debug!(target: tracing_targets::COLLATOR, - "externals_read_count={}, buffer int={}, ext={}", - externals_read_count, - msgs_buffer.message_groups.int_messages_count(), msgs_buffer.message_groups.ext_messages_count(), - ); - - group_opt = msgs_buffer.message_groups.extract_first_group(); - - self.read_ext_messages_total_elapsed += timer.elapsed(); - self.read_ext_messages_total_elapsed -= add_to_groups_elapsed; - self.add_to_message_groups_total_elapsed += add_to_groups_elapsed; - - if msgs_buffer.message_groups.is_empty() && !anchors_cache.has_pending_externals() { - tracing::debug!(target: tracing_targets::COLLATOR, - "message_groups buffer is empty and there are no pending externals, will read new internals" - ); - - // set all read externals as processed - if let Some(externals) = collation_data.processed_upto.externals.as_mut() { - if externals.processed_to != externals.read_to { - externals.processed_to = externals.read_to; - tracing::debug!(target: tracing_targets::COLLATOR, "updated processed_upto.externals = {:?}", - collation_data.processed_upto.externals, - ); - } - } - - self.last_read_to_anchor_chain_time = None; - - msgs_buffer.message_groups.reset(); - - self.read_new_messages = true; - } - } - // when all existing internals and externals were processed should read new internals if group_opt.is_none() && self.read_new_messages { // when processing new messages we return group immediately when the next message does not fit it