diff --git a/subxt/src/backend/legacy/mod.rs b/subxt/src/backend/legacy/mod.rs index 98f0f20559..1a008127b9 100644 --- a/subxt/src/backend/legacy/mod.rs +++ b/subxt/src/backend/legacy/mod.rs @@ -71,15 +71,29 @@ impl Backend for LegacyBackend { key: Vec, at: T::Hash, ) -> Result>, Error> { - Ok(StreamOf(Box::pin(StorageFetchDescendantKeysStream { + let keys = StorageFetchDescendantKeysStream { at, key, methods: self.methods.clone(), done: Default::default(), - keys: Default::default(), keys_fut: Default::default(), pagination_start_key: None, - }))) + }; + + let keys = keys.flat_map(|keys| { + match keys { + Err(e) => { + // If there's an error, return that next: + Either::Left(stream::iter(std::iter::once(Err(e)))) + } + Ok(keys) => { + // Or, stream each "ok" value: + Either::Right(stream::iter(keys.into_iter().map(Ok))) + } + } + }); + + Ok(StreamOf(Box::pin(keys))) } async fn storage_fetch_descendant_values( @@ -92,15 +106,14 @@ impl Backend for LegacyBackend { key, methods: self.methods.clone(), done: Default::default(), - keys: Default::default(), keys_fut: Default::default(), - pagination_start_key: Default::default(), + pagination_start_key: None, }; Ok(StreamOf(Box::pin(StorageFetchDescendantValuesStream { keys: keys_stream, - next_key: None, - value_fut: Default::default(), + results_fut: None, + results: Default::default(), }))) } @@ -319,6 +332,9 @@ where }) } +/// How many keys/values to fetch at once. +const STORAGE_PAGE_SIZE: u32 = 32; + /// This provides a stream of values given some prefix `key`. It /// internally manages pagination and such. pub struct StorageFetchDescendantKeysStream { @@ -329,33 +345,23 @@ pub struct StorageFetchDescendantKeysStream { pagination_start_key: Option>, // Keys, future and cached: keys_fut: Option>, Error>> + Send + 'static>>>, - keys: VecDeque>, // Set to true when we're done: done: bool, } impl std::marker::Unpin for StorageFetchDescendantKeysStream {} -// How many storage keys to ask for each time. -const STORAGE_FETCH_PAGE_SIZE: u32 = 32; - impl Stream for StorageFetchDescendantKeysStream { - type Item = Result, Error>; + type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut(); loop { - let mut this = self.as_mut(); - // We're already done. if this.done { return Poll::Ready(None); } - // We have some keys to hand back already, so do that. - if let Some(key) = this.keys.pop_front() { - return Poll::Ready(Some(Ok(key))); - } - - // Else, we don't have any keys, but we have a fut to get more so poll it. + // Poll future to fetch next keys. if let Some(mut keys_fut) = this.keys_fut.take() { let Poll::Ready(keys) = keys_fut.poll_unpin(cx) else { this.keys_fut = Some(keys_fut); @@ -371,9 +377,8 @@ impl Stream for StorageFetchDescendantKeysStream { } // The last key is where we want to paginate from next time. this.pagination_start_key = keys.last().cloned(); - // Got new keys; loop around to start returning them. - this.keys = keys.into_iter().collect(); - continue; + // return all of the keys from this run. + return Poll::Ready(Some(Ok(keys))); } Err(e) => { // Error getting keys? Return it. @@ -391,7 +396,7 @@ impl Stream for StorageFetchDescendantKeysStream { methods .state_get_keys_paged( &key, - STORAGE_FETCH_PAGE_SIZE, + STORAGE_PAGE_SIZE, pagination_start_key.as_deref(), Some(at), ) @@ -406,10 +411,18 @@ impl Stream for StorageFetchDescendantKeysStream { pub struct StorageFetchDescendantValuesStream { // Stream of keys. keys: StorageFetchDescendantKeysStream, - next_key: Option>, - // Then we track the next value: - value_fut: - Option>, Error>> + Send + 'static>>>, + // Then we track the future to get the values back for each key: + results_fut: Option< + Pin< + Box< + dyn Future, Vec)>>, Error>> + + Send + + 'static, + >, + >, + >, + // And finally we return each result back one at a time: + results: VecDeque<(Vec, Vec)>, } impl Stream for StorageFetchDescendantValuesStream { @@ -417,47 +430,56 @@ impl Stream for StorageFetchDescendantValuesStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut(); loop { - // If we're waiting on the next value then poll that future: - if let Some(mut value_fut) = this.value_fut.take() { - match value_fut.poll_unpin(cx) { - Poll::Ready(Ok(Some(value))) => { - let key = this.next_key.take().expect("key should exist"); - return Poll::Ready(Some(Ok(StorageResponse { key, value }))); + // If we have results back, return them one by one + if let Some((key, value)) = this.results.pop_front() { + let res = StorageResponse { key, value }; + return Poll::Ready(Some(Ok(res))); + } + + // If we're waiting on the next results then poll that future: + if let Some(mut results_fut) = this.results_fut.take() { + match results_fut.poll_unpin(cx) { + Poll::Ready(Ok(Some(results))) => { + this.results = results; + continue; } Poll::Ready(Ok(None)) => { - // No value back for some key? Skip. + // No values back for some keys? Skip. continue; } Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))), Poll::Pending => { - this.value_fut = Some(value_fut); + this.results_fut = Some(results_fut); return Poll::Pending; } } } - // Else, if we have the next key then let's start waiting on the next value. - if let Some(key) = &this.next_key { - let key = key.clone(); - let methods = this.keys.methods.clone(); - let at = this.keys.at; - let fut = async move { methods.state_get_storage(&key, Some(at)).await }; - - this.value_fut = Some(Box::pin(fut)); - continue; - } - - // Else, poll the keys stream to get the next key. match this.keys.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(key))) => { - this.next_key = Some(key); + Poll::Ready(Some(Ok(keys))) => { + let methods = this.keys.methods.clone(); + let at = this.keys.at; + let results_fut = async move { + let keys = keys.iter().map(|k| &**k); + let values = methods.state_query_storage_at(keys, Some(at)).await?; + let values: VecDeque<_> = values + .into_iter() + .flat_map(|v| { + v.changes.into_iter().filter_map(|(k, v)| { + let v = v?; + Some((k.0, v.0)) + }) + }) + .collect(); + Ok(Some(values)) + }; + + this.results_fut = Some(Box::pin(results_fut)); continue; } Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => { - return Poll::Pending; - } + Poll::Pending => return Poll::Pending, } } } diff --git a/subxt/src/backend/legacy/rpc_methods.rs b/subxt/src/backend/legacy/rpc_methods.rs index 951bf3f585..0adb2a0f18 100644 --- a/subxt/src/backend/legacy/rpc_methods.rs +++ b/subxt/src/backend/legacy/rpc_methods.rs @@ -73,7 +73,10 @@ impl LegacyRpcMethods { Ok(data.into_iter().map(|b| b.0).collect()) } - /// Query historical storage entries + /// Query historical storage entries in the range from the start block to the end block, + /// defaulting the end block to the current best block if it's not given. The first + /// [`StorageChangeSet`] returned has all of the values for each key, and subsequent ones + /// only contain values for any keys which have changed since the last. pub async fn state_query_storage( &self, keys: impl IntoIterator, @@ -88,7 +91,9 @@ impl LegacyRpcMethods { .map_err(Into::into) } - /// Query historical storage entries + /// Query storage entries at some block, using the best block if none is given. + /// This essentially provides a way to ask for a batch of values given a batch of keys, + /// despite the name of the [`StorageChangeSet`] type. pub async fn state_query_storage_at( &self, keys: impl IntoIterator,