Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Batch fetching storage values again #1199

Merged
merged 4 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 75 additions & 53 deletions subxt/src/backend/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,29 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
key: Vec<u8>,
at: T::Hash,
) -> Result<StreamOfResults<Vec<u8>>, Error> {
Ok(StreamOf(Box::pin(StorageFetchDescendantKeysStream {
let keys = StorageFetchDescendantKeysStream {
at,
key,
methods: self.methods.clone(),
done: Default::default(),
keys: Default::default(),
keys_fut: Default::default(),
pagination_start_key: None,
})))
};

let keys = keys.flat_map(|keys| {
match keys {
Err(e) => {
// If there's an error, return that next:
Either::Left(stream::iter(std::iter::once(Err(e))))
}
Ok(keys) => {
// Or, stream each "ok" value:
Either::Right(stream::iter(keys.into_iter().map(Ok)))
}
}
});

Ok(StreamOf(Box::pin(keys)))
}

async fn storage_fetch_descendant_values(
Expand All @@ -92,15 +106,14 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
key,
methods: self.methods.clone(),
done: Default::default(),
keys: Default::default(),
keys_fut: Default::default(),
pagination_start_key: Default::default(),
pagination_start_key: None,
};

Ok(StreamOf(Box::pin(StorageFetchDescendantValuesStream {
keys: keys_stream,
next_key: None,
value_fut: Default::default(),
results_fut: None,
results: Default::default(),
})))
}

Expand Down Expand Up @@ -319,6 +332,9 @@ where
})
}

/// How many keys/values to fetch at once.
const STORAGE_PAGE_SIZE: u32 = 32;

/// This provides a stream of values given some prefix `key`. It
/// internally manages pagination and such.
pub struct StorageFetchDescendantKeysStream<T: Config> {
Expand All @@ -329,33 +345,23 @@ pub struct StorageFetchDescendantKeysStream<T: Config> {
pagination_start_key: Option<Vec<u8>>,
// Keys, future and cached:
keys_fut: Option<Pin<Box<dyn Future<Output = Result<Vec<Vec<u8>>, Error>> + Send + 'static>>>,
keys: VecDeque<Vec<u8>>,
// Set to true when we're done:
done: bool,
}

impl<T: Config> std::marker::Unpin for StorageFetchDescendantKeysStream<T> {}

// How many storage keys to ask for each time.
const STORAGE_FETCH_PAGE_SIZE: u32 = 32;

impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
type Item = Result<Vec<u8>, Error>;
type Item = Result<Vec<Vec<u8>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut();
loop {
let mut this = self.as_mut();

// We're already done.
if this.done {
return Poll::Ready(None);
}

// We have some keys to hand back already, so do that.
if let Some(key) = this.keys.pop_front() {
return Poll::Ready(Some(Ok(key)));
}

// Else, we don't have any keys, but we have a fut to get more so poll it.
// Poll future to fetch next keys.
if let Some(mut keys_fut) = this.keys_fut.take() {
let Poll::Ready(keys) = keys_fut.poll_unpin(cx) else {
this.keys_fut = Some(keys_fut);
Expand All @@ -371,9 +377,8 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
}
// The last key is where we want to paginate from next time.
this.pagination_start_key = keys.last().cloned();
// Got new keys; loop around to start returning them.
this.keys = keys.into_iter().collect();
continue;
// return all of the keys from this run.
return Poll::Ready(Some(Ok(keys)));
}
Err(e) => {
// Error getting keys? Return it.
Expand All @@ -391,7 +396,7 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
methods
.state_get_keys_paged(
&key,
STORAGE_FETCH_PAGE_SIZE,
STORAGE_PAGE_SIZE,
pagination_start_key.as_deref(),
Some(at),
)
Expand All @@ -406,58 +411,75 @@ impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
pub struct StorageFetchDescendantValuesStream<T: Config> {
// Stream of keys.
keys: StorageFetchDescendantKeysStream<T>,
next_key: Option<Vec<u8>>,
// Then we track the next value:
value_fut:
Option<Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, Error>> + Send + 'static>>>,
// Then we track the future to get the values back for each key:
results_fut: Option<
Pin<
Box<
dyn Future<Output = Result<Option<VecDeque<(Vec<u8>, Vec<u8>)>>, Error>>
+ Send
+ 'static,
>,
>,
>,
// And finally we return each result back one at a time:
results: VecDeque<(Vec<u8>, Vec<u8>)>,
}

impl<T: Config> Stream for StorageFetchDescendantValuesStream<T> {
type Item = Result<StorageResponse, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut();
loop {
// If we're waiting on the next value then poll that future:
if let Some(mut value_fut) = this.value_fut.take() {
match value_fut.poll_unpin(cx) {
Poll::Ready(Ok(Some(value))) => {
let key = this.next_key.take().expect("key should exist");
return Poll::Ready(Some(Ok(StorageResponse { key, value })));
// If we have results back, return them one by one
if let Some((key, value)) = this.results.pop_front() {
let res = StorageResponse { key, value };
return Poll::Ready(Some(Ok(res)));
}

// If we're waiting on the next results then poll that future:
if let Some(mut results_fut) = this.results_fut.take() {
match results_fut.poll_unpin(cx) {
Poll::Ready(Ok(Some(results))) => {
this.results = results;
continue;
}
Poll::Ready(Ok(None)) => {
// No value back for some key? Skip.
// No values back for some keys? Skip.
continue;
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Pending => {
this.value_fut = Some(value_fut);
this.results_fut = Some(results_fut);
return Poll::Pending;
}
}
}

// Else, if we have the next key then let's start waiting on the next value.
if let Some(key) = &this.next_key {
let key = key.clone();
let methods = this.keys.methods.clone();
let at = this.keys.at;
let fut = async move { methods.state_get_storage(&key, Some(at)).await };

this.value_fut = Some(Box::pin(fut));
continue;
}

// Else, poll the keys stream to get the next key.
match this.keys.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(key))) => {
this.next_key = Some(key);
Poll::Ready(Some(Ok(keys))) => {
let methods = this.keys.methods.clone();
let at = this.keys.at;
let results_fut = async move {
let keys = keys.iter().map(|k| &**k);
let values = methods.state_query_storage_at(keys, Some(at)).await?;
let values: VecDeque<_> = values
.into_iter()
.flat_map(|v| {
v.changes.into_iter().filter_map(|(k, v)| {
let v = v?;
Some((k.0, v.0))
})
})
.collect();
Ok(Some(values))
};

this.results_fut = Some(Box::pin(results_fut));
continue;
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {
return Poll::Pending;
}
Poll::Pending => return Poll::Pending,
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions subxt/src/backend/legacy/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ impl<T: Config> LegacyRpcMethods<T> {
Ok(data.into_iter().map(|b| b.0).collect())
}

/// Query historical storage entries
/// Query historical storage entries in the range from the start block to the end block,
/// defaulting the end block to the current best block if it's not given. The first
/// [`StorageChangeSet`] returned has all of the values for each key, and subsequent ones
/// only contain values for any keys which have changed since the last.
pub async fn state_query_storage(
&self,
keys: impl IntoIterator<Item = &[u8]>,
Expand All @@ -88,7 +91,9 @@ impl<T: Config> LegacyRpcMethods<T> {
.map_err(Into::into)
}

/// Query historical storage entries
/// Query storage entries at some block, using the best block if none is given.
/// This essentially provides a way to ask for a batch of values given a batch of keys,
/// despite the name of the [`StorageChangeSet`] type.
pub async fn state_query_storage_at(
&self,
keys: impl IntoIterator<Item = &[u8]>,
Expand Down