From f45f484134f00fffadca62722be39ac408069a2d Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 9 Oct 2023 12:34:16 +0200 Subject: [PATCH 1/4] re-batchify fetching storage values --- subxt/src/backend/legacy/mod.rs | 145 +++++++++++++++--------- subxt/src/backend/legacy/rpc_methods.rs | 9 +- 2 files changed, 99 insertions(+), 55 deletions(-) diff --git a/subxt/src/backend/legacy/mod.rs b/subxt/src/backend/legacy/mod.rs index 98f0f20559..14c12b8284 100644 --- a/subxt/src/backend/legacy/mod.rs +++ b/subxt/src/backend/legacy/mod.rs @@ -71,15 +71,30 @@ impl Backend for LegacyBackend { key: Vec, at: T::Hash, ) -> Result>, Error> { - Ok(StreamOf(Box::pin(StorageFetchDescendantKeysStream { + let keys = StorageFetchDescendantKeysStream { at, key, methods: self.methods.clone(), done: Default::default(), - keys: Default::default(), keys_fut: Default::default(), pagination_start_key: None, - }))) + page_size: Default::default() + }; + + let keys = keys.flat_map(|keys| { + match keys { + Err(e) => { + // If there's an error, return that next: + Either::Left(stream::iter(std::iter::once(Err(e)))) + }, + Ok(keys) => { + // Or, stream each "ok" value: + Either::Right(stream::iter(keys.into_iter().map(|k| Ok(k)))) + } + } + }); + + Ok(StreamOf(Box::pin(keys))) } async fn storage_fetch_descendant_values( @@ -92,15 +107,15 @@ impl Backend for LegacyBackend { key, methods: self.methods.clone(), done: Default::default(), - keys: Default::default(), keys_fut: Default::default(), - pagination_start_key: Default::default(), + pagination_start_key: None, + page_size: Default::default() }; Ok(StreamOf(Box::pin(StorageFetchDescendantValuesStream { keys: keys_stream, - next_key: None, - value_fut: Default::default(), + results_fut: None, + results: Default::default(), }))) } @@ -329,33 +344,26 @@ pub struct StorageFetchDescendantKeysStream { pagination_start_key: Option>, // Keys, future and cached: keys_fut: Option>, Error>> + Send + 'static>>>, - keys: VecDeque>, // Set to true when we're done: done: bool, + // Number of keys to fetch next time: + page_size: PageSize } impl std::marker::Unpin for StorageFetchDescendantKeysStream {} -// How many storage keys to ask for each time. -const STORAGE_FETCH_PAGE_SIZE: u32 = 32; - impl Stream for StorageFetchDescendantKeysStream { - type Item = Result, Error>; + type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut(); loop { - let mut this = self.as_mut(); // We're already done. if this.done { return Poll::Ready(None); } - // We have some keys to hand back already, so do that. - if let Some(key) = this.keys.pop_front() { - return Poll::Ready(Some(Ok(key))); - } - - // Else, we don't have any keys, but we have a fut to get more so poll it. + // Poll future to fetch next keys. if let Some(mut keys_fut) = this.keys_fut.take() { let Poll::Ready(keys) = keys_fut.poll_unpin(cx) else { this.keys_fut = Some(keys_fut); @@ -371,9 +379,8 @@ impl Stream for StorageFetchDescendantKeysStream { } // The last key is where we want to paginate from next time. this.pagination_start_key = keys.last().cloned(); - // Got new keys; loop around to start returning them. - this.keys = keys.into_iter().collect(); - continue; + // return all of the keys from this run. + return Poll::Ready(Some(Ok(keys))) } Err(e) => { // Error getting keys? Return it. @@ -387,11 +394,12 @@ impl Stream for StorageFetchDescendantKeysStream { let key = this.key.clone(); let at = this.at; let pagination_start_key = this.pagination_start_key.take(); + let page_size = this.page_size.next(); let keys_fut = async move { methods .state_get_keys_paged( &key, - STORAGE_FETCH_PAGE_SIZE, + page_size, pagination_start_key.as_deref(), Some(at), ) @@ -406,10 +414,11 @@ impl Stream for StorageFetchDescendantKeysStream { pub struct StorageFetchDescendantValuesStream { // Stream of keys. keys: StorageFetchDescendantKeysStream, - next_key: Option>, - // Then we track the next value: - value_fut: - Option>, Error>> + Send + 'static>>>, + // Then we track the future to get the values back for each key: + results_fut: + Option, Vec)>>, Error>> + Send + 'static>>>, + // And finally we return each result back one at a time: + results: VecDeque<(Vec, Vec)> } impl Stream for StorageFetchDescendantValuesStream { @@ -417,48 +426,78 @@ impl Stream for StorageFetchDescendantValuesStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut(); loop { - // If we're waiting on the next value then poll that future: - if let Some(mut value_fut) = this.value_fut.take() { - match value_fut.poll_unpin(cx) { - Poll::Ready(Ok(Some(value))) => { - let key = this.next_key.take().expect("key should exist"); - return Poll::Ready(Some(Ok(StorageResponse { key, value }))); + // If we have results back, return them one by one + if let Some((key, value)) = this.results.pop_front() { + let res = StorageResponse { + key, + value + }; + return Poll::Ready(Some(Ok(res))) + } + + // If we're waiting on the next results then poll that future: + if let Some(mut results_fut) = this.results_fut.take() { + match results_fut.poll_unpin(cx) { + Poll::Ready(Ok(Some(results))) => { + this.results = results; + continue; } Poll::Ready(Ok(None)) => { - // No value back for some key? Skip. + // No values back for some keys? Skip. continue; } Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))), Poll::Pending => { - this.value_fut = Some(value_fut); + this.results_fut = Some(results_fut); return Poll::Pending; } } } - // Else, if we have the next key then let's start waiting on the next value. - if let Some(key) = &this.next_key { - let key = key.clone(); - let methods = this.keys.methods.clone(); - let at = this.keys.at; - let fut = async move { methods.state_get_storage(&key, Some(at)).await }; - - this.value_fut = Some(Box::pin(fut)); - continue; - } - - // Else, poll the keys stream to get the next key. match this.keys.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(key))) => { - this.next_key = Some(key); + Poll::Ready(Some(Ok(keys))) => { + let methods = this.keys.methods.clone(); + let at = this.keys.at; + let results_fut = async move { + let keys = keys.iter().map(|k| &**k); + let values = methods.state_query_storage_at(keys, Some(at)).await?; + let values: VecDeque<_> = values.into_iter().flat_map(|v| { + v.changes.into_iter().filter_map(|(k, v)| { + let v = v?; + Some((k.0, v.0)) + }) + }).collect(); + Ok(Some(values)) + }; + + this.results_fut = Some(Box::pin(results_fut)); continue; - } + }, Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => { - return Poll::Pending; - } + Poll::Pending => return Poll::Pending } } } } + +/// An iterator which returns the next page size to fetch each time. +struct PageSize { + page_size: u8 +} + +impl Default for PageSize { + fn default() -> Self { + // Fetch a fairly small batch size to begin with. + Self { page_size: 8 } + } +} + +impl PageSize { + fn next(&mut self) -> u32 { + let v = self.page_size; + // Double the batch size each time but don't exceed 128. + self.page_size = self.page_size.saturating_mul(2).min(128); + v as u32 + } +} diff --git a/subxt/src/backend/legacy/rpc_methods.rs b/subxt/src/backend/legacy/rpc_methods.rs index 951bf3f585..0adb2a0f18 100644 --- a/subxt/src/backend/legacy/rpc_methods.rs +++ b/subxt/src/backend/legacy/rpc_methods.rs @@ -73,7 +73,10 @@ impl LegacyRpcMethods { Ok(data.into_iter().map(|b| b.0).collect()) } - /// Query historical storage entries + /// Query historical storage entries in the range from the start block to the end block, + /// defaulting the end block to the current best block if it's not given. The first + /// [`StorageChangeSet`] returned has all of the values for each key, and subsequent ones + /// only contain values for any keys which have changed since the last. pub async fn state_query_storage( &self, keys: impl IntoIterator, @@ -88,7 +91,9 @@ impl LegacyRpcMethods { .map_err(Into::into) } - /// Query historical storage entries + /// Query storage entries at some block, using the best block if none is given. + /// This essentially provides a way to ask for a batch of values given a batch of keys, + /// despite the name of the [`StorageChangeSet`] type. pub async fn state_query_storage_at( &self, keys: impl IntoIterator, From 21ea861686f10fdadfd7934a1854b8b1373ba8b8 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 9 Oct 2023 12:36:13 +0200 Subject: [PATCH 2/4] cargo fmt --- subxt/src/backend/legacy/mod.rs | 50 ++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/subxt/src/backend/legacy/mod.rs b/subxt/src/backend/legacy/mod.rs index 14c12b8284..bf062f65fe 100644 --- a/subxt/src/backend/legacy/mod.rs +++ b/subxt/src/backend/legacy/mod.rs @@ -78,7 +78,7 @@ impl Backend for LegacyBackend { done: Default::default(), keys_fut: Default::default(), pagination_start_key: None, - page_size: Default::default() + page_size: Default::default(), }; let keys = keys.flat_map(|keys| { @@ -86,7 +86,7 @@ impl Backend for LegacyBackend { Err(e) => { // If there's an error, return that next: Either::Left(stream::iter(std::iter::once(Err(e)))) - }, + } Ok(keys) => { // Or, stream each "ok" value: Either::Right(stream::iter(keys.into_iter().map(|k| Ok(k)))) @@ -109,7 +109,7 @@ impl Backend for LegacyBackend { done: Default::default(), keys_fut: Default::default(), pagination_start_key: None, - page_size: Default::default() + page_size: Default::default(), }; Ok(StreamOf(Box::pin(StorageFetchDescendantValuesStream { @@ -347,7 +347,7 @@ pub struct StorageFetchDescendantKeysStream { // Set to true when we're done: done: bool, // Number of keys to fetch next time: - page_size: PageSize + page_size: PageSize, } impl std::marker::Unpin for StorageFetchDescendantKeysStream {} @@ -357,7 +357,6 @@ impl Stream for StorageFetchDescendantKeysStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut(); loop { - // We're already done. if this.done { return Poll::Ready(None); @@ -380,7 +379,7 @@ impl Stream for StorageFetchDescendantKeysStream { // The last key is where we want to paginate from next time. this.pagination_start_key = keys.last().cloned(); // return all of the keys from this run. - return Poll::Ready(Some(Ok(keys))) + return Poll::Ready(Some(Ok(keys))); } Err(e) => { // Error getting keys? Return it. @@ -415,10 +414,17 @@ pub struct StorageFetchDescendantValuesStream { // Stream of keys. keys: StorageFetchDescendantKeysStream, // Then we track the future to get the values back for each key: - results_fut: - Option, Vec)>>, Error>> + Send + 'static>>>, + results_fut: Option< + Pin< + Box< + dyn Future, Vec)>>, Error>> + + Send + + 'static, + >, + >, + >, // And finally we return each result back one at a time: - results: VecDeque<(Vec, Vec)> + results: VecDeque<(Vec, Vec)>, } impl Stream for StorageFetchDescendantValuesStream { @@ -428,11 +434,8 @@ impl Stream for StorageFetchDescendantValuesStream { loop { // If we have results back, return them one by one if let Some((key, value)) = this.results.pop_front() { - let res = StorageResponse { - key, - value - }; - return Poll::Ready(Some(Ok(res))) + let res = StorageResponse { key, value }; + return Poll::Ready(Some(Ok(res))); } // If we're waiting on the next results then poll that future: @@ -461,21 +464,24 @@ impl Stream for StorageFetchDescendantValuesStream { let results_fut = async move { let keys = keys.iter().map(|k| &**k); let values = methods.state_query_storage_at(keys, Some(at)).await?; - let values: VecDeque<_> = values.into_iter().flat_map(|v| { - v.changes.into_iter().filter_map(|(k, v)| { - let v = v?; - Some((k.0, v.0)) + let values: VecDeque<_> = values + .into_iter() + .flat_map(|v| { + v.changes.into_iter().filter_map(|(k, v)| { + let v = v?; + Some((k.0, v.0)) + }) }) - }).collect(); + .collect(); Ok(Some(values)) }; this.results_fut = Some(Box::pin(results_fut)); continue; - }, + } Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => return Poll::Pending + Poll::Pending => return Poll::Pending, } } } @@ -483,7 +489,7 @@ impl Stream for StorageFetchDescendantValuesStream { /// An iterator which returns the next page size to fetch each time. struct PageSize { - page_size: u8 + page_size: u8, } impl Default for PageSize { From 82fb2b390e25a63884f77409385162bd1e156347 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 9 Oct 2023 12:49:33 +0200 Subject: [PATCH 3/4] make page size static again; probably makes more sense to make it configurable if that is needed enough --- subxt/src/backend/legacy/mod.rs | 31 ++++--------------------------- 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/subxt/src/backend/legacy/mod.rs b/subxt/src/backend/legacy/mod.rs index bf062f65fe..4ce526ffa1 100644 --- a/subxt/src/backend/legacy/mod.rs +++ b/subxt/src/backend/legacy/mod.rs @@ -78,7 +78,6 @@ impl Backend for LegacyBackend { done: Default::default(), keys_fut: Default::default(), pagination_start_key: None, - page_size: Default::default(), }; let keys = keys.flat_map(|keys| { @@ -109,7 +108,6 @@ impl Backend for LegacyBackend { done: Default::default(), keys_fut: Default::default(), pagination_start_key: None, - page_size: Default::default(), }; Ok(StreamOf(Box::pin(StorageFetchDescendantValuesStream { @@ -334,6 +332,9 @@ where }) } +/// How many keys/values to fetch at once. +const STORAGE_PAGE_SIZE: u32 = 32; + /// This provides a stream of values given some prefix `key`. It /// internally manages pagination and such. pub struct StorageFetchDescendantKeysStream { @@ -346,8 +347,6 @@ pub struct StorageFetchDescendantKeysStream { keys_fut: Option>, Error>> + Send + 'static>>>, // Set to true when we're done: done: bool, - // Number of keys to fetch next time: - page_size: PageSize, } impl std::marker::Unpin for StorageFetchDescendantKeysStream {} @@ -393,12 +392,11 @@ impl Stream for StorageFetchDescendantKeysStream { let key = this.key.clone(); let at = this.at; let pagination_start_key = this.pagination_start_key.take(); - let page_size = this.page_size.next(); let keys_fut = async move { methods .state_get_keys_paged( &key, - page_size, + STORAGE_PAGE_SIZE, pagination_start_key.as_deref(), Some(at), ) @@ -486,24 +484,3 @@ impl Stream for StorageFetchDescendantValuesStream { } } } - -/// An iterator which returns the next page size to fetch each time. -struct PageSize { - page_size: u8, -} - -impl Default for PageSize { - fn default() -> Self { - // Fetch a fairly small batch size to begin with. - Self { page_size: 8 } - } -} - -impl PageSize { - fn next(&mut self) -> u32 { - let v = self.page_size; - // Double the batch size each time but don't exceed 128. - self.page_size = self.page_size.saturating_mul(2).min(128); - v as u32 - } -} From 2faf200ab993025e5a09f0a7079366b0eee07edc Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 9 Oct 2023 13:09:59 +0200 Subject: [PATCH 4/4] clippy --- subxt/src/backend/legacy/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/src/backend/legacy/mod.rs b/subxt/src/backend/legacy/mod.rs index 4ce526ffa1..1a008127b9 100644 --- a/subxt/src/backend/legacy/mod.rs +++ b/subxt/src/backend/legacy/mod.rs @@ -88,7 +88,7 @@ impl Backend for LegacyBackend { } Ok(keys) => { // Or, stream each "ok" value: - Either::Right(stream::iter(keys.into_iter().map(|k| Ok(k)))) + Either::Right(stream::iter(keys.into_iter().map(Ok))) } } });