From 18654624976b870cf070c0e4f5c26109b15d72b0 Mon Sep 17 00:00:00 2001 From: StemCll Date: Wed, 2 Aug 2023 13:39:33 +0200 Subject: [PATCH 1/5] Draft: remove fast_message_id_fn --- protocols/gossipsub/src/behaviour.rs | 58 +++------------ protocols/gossipsub/src/behaviour/tests.rs | 87 +--------------------- protocols/gossipsub/src/config.rs | 34 +-------- protocols/gossipsub/src/lib.rs | 2 +- protocols/gossipsub/src/time_cache.rs | 4 - protocols/gossipsub/src/types.rs | 8 -- 6 files changed, 12 insertions(+), 181 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 402420f378e..05bd561f630 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -55,12 +55,12 @@ use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; -use crate::time_cache::{DuplicateCache, TimeCache}; +use crate::time_cache::DuplicateCache; use crate::topic::{Hasher, Topic, TopicHash}; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::{ - ControlAction, FastMessageId, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, - Subscription, SubscriptionAction, + ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription, + SubscriptionAction, }; use crate::types::{PeerConnections, PeerKind, Rpc}; use crate::{rpc_proto::proto, TopicScoreParams}; @@ -323,9 +323,6 @@ pub struct Behaviour { /// our own messages back if the messages are anonymous or use a random author. published_message_ids: DuplicateCache, - /// Short term cache for fast message ids mapping them to the real message ids - fast_message_id_cache: TimeCache, - /// The filter used to handle message subscriptions. subscription_filter: F, @@ -446,7 +443,6 @@ where control_pool: HashMap::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), - fast_message_id_cache: TimeCache::new(config.duplicate_cache_time()), topic_peers: HashMap::new(), peer_topics: HashMap::new(), explicit_peers: HashSet::new(), @@ -1755,31 +1751,6 @@ where metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len()); } - let fast_message_id = self.config.fast_message_id(&raw_message); - - if let Some(fast_message_id) = fast_message_id.as_ref() { - if let Some(msg_id) = self.fast_message_id_cache.get(fast_message_id) { - let msg_id = msg_id.clone(); - // Report the duplicate - if self.message_is_valid(&msg_id, &mut raw_message, propagation_source) { - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.duplicated_message( - propagation_source, - &msg_id, - &raw_message.topic, - ); - } - // Update the cache, informing that we have received a duplicate from another peer. - // The peers in this cache are used to prevent us forwarding redundant messages onto - // these peers. - self.mcache.observe_duplicate(&msg_id, propagation_source); - } - - // This message has been seen previously. Ignore it - return; - } - } - // Try and perform the data transform to the message. If it fails, consider it invalid. let message = match self.data_transform.inbound_transform(raw_message.clone()) { Ok(message) => message, @@ -1805,14 +1776,6 @@ where return; } - // Add the message to the duplicate caches - if let Some(fast_message_id) = fast_message_id { - // add id to cache - self.fast_message_id_cache - .entry(fast_message_id) - .or_insert_with(|| msg_id.clone()); - } - if !self.duplicate_cache.insert(msg_id.clone()) { debug!("Message already received, ignoring. Message: {}", msg_id); if let Some((peer_score, ..)) = &mut self.peer_score { @@ -1887,20 +1850,17 @@ where metrics.register_invalid_message(&raw_message.topic); } - let fast_message_id_cache = &self.fast_message_id_cache; + if let Ok(message) = self.data_transform.inbound_transform(raw_message.clone()) { + let message_id = self.config.message_id(&message); - if let Some(msg_id) = self - .config - .fast_message_id(raw_message) - .and_then(|id| fast_message_id_cache.get(&id)) - { peer_score.reject_message( propagation_source, - msg_id, - &raw_message.topic, + &message_id, + &message.topic, reject_reason, ); - gossip_promises.reject_message(msg_id, &reject_reason); + + gossip_promises.reject_message(&message_id, &reject_reason); } else { // The message is invalid, we reject it ignoring any gossip promises. If a peer is // advertising this message via an IHAVE and it's invalid it will be double diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index b2414fd7afc..885058da373 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -24,17 +24,12 @@ use super::*; use crate::protocol::ProtocolConfig; use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; -use crate::types::FastMessageId; use crate::ValidationError; -use crate::{ - config::Config, config::ConfigBuilder, IdentTopic as Topic, Message, TopicScoreParams, -}; +use crate::{config::Config, config::ConfigBuilder, IdentTopic as Topic, TopicScoreParams}; use async_std::net::Ipv4Addr; use byteorder::{BigEndian, ByteOrder}; use libp2p_core::{ConnectedPoint, Endpoint}; use rand::Rng; -use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; use std::thread::sleep; use std::time::Duration; @@ -5064,86 +5059,6 @@ fn test_public_api() { ); } -#[test] -fn test_msg_id_fn_only_called_once_with_fast_message_ids() { - struct Pointers { - slow_counter: u32, - fast_counter: u32, - } - - let mut counters = Pointers { - slow_counter: 0, - fast_counter: 0, - }; - - let counters_pointer: *mut Pointers = &mut counters; - - let counters_address = counters_pointer as u64; - - macro_rules! get_counters_pointer { - ($m: expr) => {{ - let mut address_bytes: [u8; 8] = Default::default(); - address_bytes.copy_from_slice($m.as_slice()); - let address = u64::from_be_bytes(address_bytes); - address as *mut Pointers - }}; - } - - macro_rules! get_counters_and_hash { - ($m: expr) => {{ - let mut hasher = DefaultHasher::new(); - $m.hash(&mut hasher); - let id = hasher.finish().to_be_bytes().into(); - (id, get_counters_pointer!($m)) - }}; - } - - let message_id_fn = |m: &Message| -> MessageId { - let (mut id, counters_pointer): (MessageId, *mut Pointers) = - get_counters_and_hash!(&m.data); - unsafe { - (*counters_pointer).slow_counter += 1; - } - id.0.reverse(); - id - }; - let fast_message_id_fn = |m: &RawMessage| -> FastMessageId { - let (id, counters_pointer) = get_counters_and_hash!(&m.data); - unsafe { - (*counters_pointer).fast_counter += 1; - } - id - }; - let config = ConfigBuilder::default() - .message_id_fn(message_id_fn) - .fast_message_id_fn(fast_message_id_fn) - .build() - .unwrap(); - let (mut gs, _, topic_hashes) = inject_nodes1() - .peer_no(0) - .topics(vec![String::from("topic1")]) - .to_subscribe(true) - .gs_config(config) - .create_network(); - - let message = RawMessage { - source: None, - data: counters_address.to_be_bytes().to_vec(), - sequence_number: None, - topic: topic_hashes[0].clone(), - signature: None, - key: None, - validated: true, - }; - - for _ in 0..5 { - gs.handle_received_message(message.clone(), &PeerId::random()); - } - - assert_eq!(counters.fast_counter, 5); - assert_eq!(counters.slow_counter, 1); -} - #[test] fn test_subscribe_to_invalid_topic() { let t1 = Topic::new("t1"); diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index a5d31071538..983b5e65eec 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use std::time::Duration; use crate::protocol::{ProtocolConfig, ProtocolId, FLOODSUB_PROTOCOL}; -use crate::types::{FastMessageId, Message, MessageId, PeerKind, RawMessage}; +use crate::types::{Message, MessageId, PeerKind}; use libp2p_identity::PeerId; use libp2p_swarm::StreamProtocol; @@ -78,7 +78,6 @@ pub struct Config { duplicate_cache_time: Duration, validate_messages: bool, message_id_fn: Arc MessageId + Send + Sync + 'static>, - fast_message_id_fn: Option FastMessageId + Send + Sync + 'static>>, allow_self_origin: bool, do_px: bool, prune_peers: usize, @@ -225,20 +224,6 @@ impl Config { (self.message_id_fn)(message) } - /// A user-defined optional function that computes fast ids from raw messages. This can be used - /// to avoid possibly expensive transformations from [`RawMessage`] to - /// [`Message`] for duplicates. Two semantically different messages must always - /// have different fast message ids, but it is allowed that two semantically identical messages - /// have different fast message ids as long as the message_id_fn produces the same id for them. - /// - /// The function takes a [`RawMessage`] as input and outputs a String to be - /// interpreted as the fast message id. Default is None. - pub fn fast_message_id(&self, message: &RawMessage) -> Option { - self.fast_message_id_fn - .as_ref() - .map(|fast_message_id_fn| fast_message_id_fn(message)) - } - /// By default, gossipsub will reject messages that are sent to us that have the same message /// source as we have specified locally. Enabling this, allows these messages and prevents /// penalizing the peer that sent us the message. Default is false. @@ -423,7 +408,6 @@ impl Default for ConfigBuilder { .push_str(&message.sequence_number.unwrap_or_default().to_string()); MessageId::from(source_string) }), - fast_message_id_fn: None, allow_self_origin: false, do_px: false, prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented. @@ -650,22 +634,6 @@ impl ConfigBuilder { self } - /// A user-defined optional function that computes fast ids from raw messages. This can be used - /// to avoid possibly expensive transformations from [`RawMessage`] to - /// [`Message`] for duplicates. Two semantically different messages must always - /// have different fast message ids, but it is allowed that two semantically identical messages - /// have different fast message ids as long as the message_id_fn produces the same id for them. - /// - /// The function takes a [`Message`] as input and outputs a String to be interpreted - /// as the fast message id. Default is None. - pub fn fast_message_id_fn(&mut self, fast_id_fn: F) -> &mut Self - where - F: Fn(&RawMessage) -> FastMessageId + Send + Sync + 'static, - { - self.config.fast_message_id_fn = Some(Arc::new(fast_id_fn)); - self - } - /// Enables Peer eXchange. This should be enabled in bootstrappers and other well /// connected/trusted nodes. The default is false. /// diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index e065319c4c3..3b74de31263 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -171,7 +171,7 @@ pub use self::subscription_filter::{ }; pub use self::topic::{Hasher, Topic, TopicHash}; pub use self::transform::{DataTransform, IdentityTransform}; -pub use self::types::{FastMessageId, Message, MessageAcceptance, MessageId, RawMessage, Rpc}; +pub use self::types::{Message, MessageAcceptance, MessageId, RawMessage, Rpc}; pub type IdentTopic = Topic; pub type Sha256Topic = Topic; diff --git a/protocols/gossipsub/src/time_cache.rs b/protocols/gossipsub/src/time_cache.rs index ffc95a474f4..94078e3f5b6 100644 --- a/protocols/gossipsub/src/time_cache.rs +++ b/protocols/gossipsub/src/time_cache.rs @@ -159,10 +159,6 @@ where pub(crate) fn contains_key(&self, key: &Key) -> bool { self.map.contains_key(key) } - - pub(crate) fn get(&self, key: &Key) -> Option<&Value> { - self.map.get(key).map(|e| &e.element) - } } pub(crate) struct DuplicateCache(TimeCache); diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index f1865635454..046e3d03f1e 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -79,14 +79,6 @@ macro_rules! declare_message_id_type { // A type for gossipsub message ids. declare_message_id_type!(MessageId, "MessageId"); -// A type for gossipsub fast messsage ids, not to confuse with "real" message ids. -// -// A fast-message-id is an optional message_id that can be used to filter duplicates quickly. On -// high intensive networks with lots of messages, where the message_id is based on the result of -// decompressed traffic, it is beneficial to specify a `fast-message-id` that can identify and -// filter duplicates quickly without performing the overhead of decompression. -declare_message_id_type!(FastMessageId, "FastMessageId"); - #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct PeerConnections { /// The kind of protocol the peer supports. From be7eeb6d197ab6341653b06fb82b483d1edab7dd Mon Sep 17 00:00:00 2001 From: StemCll Date: Wed, 2 Aug 2023 17:04:18 +0200 Subject: [PATCH 2/5] Remove declare_message_id_type macro --- protocols/gossipsub/src/types.rs | 52 ++++++++++++++------------------ 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 046e3d03f1e..196468b8d32 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -43,41 +43,33 @@ pub enum MessageAcceptance { Ignore, } -/// Macro for declaring message id types -macro_rules! declare_message_id_type { - ($name: ident, $name_string: expr) => { - #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] - #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] - pub struct $name(pub Vec); - - impl $name { - pub fn new(value: &[u8]) -> Self { - Self(value.to_vec()) - } - } +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct MessageId(pub Vec); - impl>> From for $name { - fn from(value: T) -> Self { - Self(value.into()) - } - } +impl MessageId { + pub fn new(value: &[u8]) -> Self { + Self(value.to_vec()) + } +} - impl std::fmt::Display for $name { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", hex_fmt::HexFmt(&self.0)) - } - } +impl>> From for MessageId { + fn from(value: T) -> Self { + Self(value.into()) + } +} - impl std::fmt::Debug for $name { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}({})", $name_string, hex_fmt::HexFmt(&self.0)) - } - } - }; +impl std::fmt::Display for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex_fmt::HexFmt(&self.0)) + } } -// A type for gossipsub message ids. -declare_message_id_type!(MessageId, "MessageId"); +impl std::fmt::Debug for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0)) + } +} #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct PeerConnections { From 001c5bc4a77515be20e3639bcb2db892dc4d66ee Mon Sep 17 00:00:00 2001 From: StemCll Date: Sat, 5 Aug 2023 18:44:06 +0200 Subject: [PATCH 3/5] Add changelog entry; bump gossipsub version to 0.46.0 --- protocols/gossipsub/CHANGELOG.md | 7 +++++++ protocols/gossipsub/Cargo.toml | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index a1f4ef6c973..529ab47b633 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.46.0 - unreleased + +- Remove `fast_message_id_fn` mechanism from `Config`. + See [PR 4285]. + +[PR 4285]: https://github.com/libp2p/rust-libp2p/pull/4285 + ## 0.45.1 - Add getter function to obtain `TopicScoreParams`. diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index bf9500140b6..538e81d7a16 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-gossipsub" edition = "2021" rust-version = { workspace = true } description = "Gossipsub protocol for libp2p" -version = "0.45.1" +version = "0.46.0" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From 8cc131d1bcf4876b43b8ab6bb205cf2867a40725 Mon Sep 17 00:00:00 2001 From: StemCll Date: Tue, 8 Aug 2023 22:35:33 +0200 Subject: [PATCH 4/5] Change gossipsub version to 0.46.0 in main Cargo.toml --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index baaeecae080..b863d87e82e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2572,7 +2572,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" -version = "0.45.1" +version = "0.46.0" dependencies = [ "async-std", "asynchronous-codec", diff --git a/Cargo.toml b/Cargo.toml index 73ae23d70ff..37ba97d2a08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ libp2p-dcutr = { version = "0.10.0", path = "protocols/dcutr" } libp2p-deflate = { version = "0.40.1", path = "transports/deflate" } libp2p-dns = { version = "0.40.1", path = "transports/dns" } libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" } -libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" } +libp2p-gossipsub = { version = "0.46.0", path = "protocols/gossipsub" } libp2p-identify = { version = "0.43.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.3" } libp2p-kad = { version = "0.44.6", path = "protocols/kad" } From 1ad6e36a47de20c2bc05fb9a86326e726755f86b Mon Sep 17 00:00:00 2001 From: StemCll Date: Thu, 21 Sep 2023 17:11:05 +0200 Subject: [PATCH 5/5] Remove unused function in gossipsub time_cache --- protocols/gossipsub/src/time_cache.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/protocols/gossipsub/src/time_cache.rs b/protocols/gossipsub/src/time_cache.rs index 94078e3f5b6..89fd4afee09 100644 --- a/protocols/gossipsub/src/time_cache.rs +++ b/protocols/gossipsub/src/time_cache.rs @@ -93,12 +93,6 @@ impl<'a, K: 'a, V: 'a> Entry<'a, K, V> where K: Eq + std::hash::Hash + Clone, { - pub(crate) fn or_insert_with V>(self, default: F) -> &'a mut V { - match self { - Entry::Occupied(entry) => entry.into_mut(), - Entry::Vacant(entry) => entry.insert(default()), - } - } pub(crate) fn or_default(self) -> &'a mut V where V: Default,