-
Notifications
You must be signed in to change notification settings - Fork 845
/
Copy pathfetch_blobs.rs
296 lines (265 loc) · 10.5 KB
/
fetch_blobs.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
//! This module implements an optimisation to fetch blobs via JSON-RPC from the EL.
//! If a blob has already been seen in the public mempool, then it is often unnecessary to wait for
//! it to arrive on P2P gossip. This PR uses a new JSON-RPC method (`engine_getBlobsV1`) which
//! allows the CL to load the blobs quickly from the EL's blob pool.
//!
//! Once the node fetches the blobs from EL, it then publishes the remaining blobs that hasn't seen
//! on P2P gossip to the network. From PeerDAS onwards, together with the increase in blob count,
//! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity
//! supernodes.
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::observed_data_sidecars::ObservableDataSidecar;
use crate::{metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
use execution_layer::json_structures::BlobAndProofV1;
use execution_layer::Error as ExecutionLayerError;
use itertools::Either;
use metrics::{inc_counter, inc_counter_by, TryExt};
use slog::{debug, error, o, Logger};
use ssz_types::FixedVector;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList};
use types::{
BeaconStateError, BlobSidecar, DataColumnSidecar, DataColumnSidecarList, EthSpec, FullPayload,
Hash256, SignedBeaconBlock, SignedBeaconBlockHeader,
};
pub enum BlobsOrDataColumns<E: EthSpec> {
Blobs(Vec<Arc<BlobSidecar<E>>>),
DataColumns(DataColumnSidecarList<E>),
}
#[derive(Debug)]
pub enum FetchEngineBlobError {
BeaconStateError(BeaconStateError),
BlobProcessingError(BlockError),
BlobSidecarError(BlobSidecarError),
ExecutionLayerMissing,
InternalError(String),
RequestFailed(ExecutionLayerError),
RuntimeShutdown,
}
/// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or
/// data columns (PeerDAS onwards) to the network, using the supplied `publish_fn`.
pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
publish_fn: impl Fn(BlobsOrDataColumns<T::EthSpec>) + Send + 'static,
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
let block_root_str = format!("{:?}", block_root);
let log = chain
.log
.new(o!("service" => "fetch_engine_blobs", "block_root" => block_root_str));
let versioned_hashes =
if let Ok(kzg_commitments) = block.message().body().blob_kzg_commitments() {
kzg_commitments
.iter()
.map(kzg_commitment_to_versioned_hash)
.collect::<Vec<_>>()
} else {
debug!(
log,
"Fetch blobs not triggered - none required";
);
return Ok(None);
};
let num_expected_blobs = versioned_hashes.len();
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(FetchEngineBlobError::ExecutionLayerMissing)?;
debug!(
log,
"Fetching blobs from the EL";
"num_expected_blobs" => num_expected_blobs,
);
let response = execution_layer
.get_blobs(versioned_hashes)
.await
.map_err(FetchEngineBlobError::RequestFailed)?;
if response.is_empty() {
debug!(
log,
"No blobs fetched from the EL";
"num_expected_blobs" => num_expected_blobs,
);
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
return Ok(None);
} else {
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
}
let (signed_block_header, kzg_commitments_proof) = block
.signed_block_header_and_kzg_commitments_proof()
.map_err(FetchEngineBlobError::BeaconStateError)?;
let fixed_blob_sidecar_list = build_blob_sidecars(
&block,
response,
signed_block_header,
&kzg_commitments_proof,
)?;
let num_fetched_blobs = fixed_blob_sidecar_list
.iter()
.filter(|b| b.is_some())
.count();
inc_counter_by(
&metrics::BLOBS_FROM_EL_EXPECTED_TOTAL,
num_expected_blobs as u64,
);
inc_counter_by(
&metrics::BLOBS_FROM_EL_RECEIVED_TOTAL,
num_fetched_blobs as u64,
);
let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch());
let data_columns_receiver_opt = if peer_das_enabled {
// Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns.
if num_fetched_blobs != num_expected_blobs {
debug!(
log,
"Not all blobs fetched from the EL";
"info" => "Unable to compute data columns",
"num_fetched_blobs" => num_fetched_blobs,
"num_expected_blobs" => num_expected_blobs,
);
return Ok(None);
}
let data_columns_receiver = spawn_compute_and_publish_data_columns_task(
&chain,
block.clone(),
fixed_blob_sidecar_list.clone(),
publish_fn,
log.clone(),
);
Some(data_columns_receiver)
} else {
let all_blobs = fixed_blob_sidecar_list.clone();
let all_blobs_iter = all_blobs.into_iter().flat_map(|b| b.clone());
let blobs_to_publish = match chain
.data_availability_checker
.cached_blob_indexes(&block_root)
{
None => Either::Left(all_blobs_iter),
Some(imported_blob_indices) => Either::Right(
all_blobs_iter.filter(move |b| !imported_blob_indices.contains(&b.index())),
),
};
publish_fn(BlobsOrDataColumns::Blobs(
blobs_to_publish.collect::<Vec<_>>(),
));
None
};
debug!(
log,
"Processing engine blobs";
"num_fetched_blobs" => num_fetched_blobs,
);
let availability_processing_status = chain
.process_engine_blobs(
block.slot(),
block_root,
fixed_blob_sidecar_list.clone(),
data_columns_receiver_opt,
)
.await
.map_err(FetchEngineBlobError::BlobProcessingError)?;
Ok(Some(availability_processing_status))
}
/// Spawn a blocking task here for long computation tasks, so it doesn't block processing, and it
/// allows blobs / data columns to propagate without waiting for processing.
///
/// An `mpsc::Sender` is then used to send the produced data columns to the `beacon_chain` for it
/// to be persisted, **after** the block is made attestable.
///
/// The reason for doing this is to make the block available and attestable as soon as possible,
/// while maintaining the invariant that block and data columns are persisted atomically.
fn spawn_compute_and_publish_data_columns_task<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
blobs: FixedBlobSidecarList<T::EthSpec>,
publish_fn: impl Fn(BlobsOrDataColumns<T::EthSpec>) + Send + 'static,
log: Logger,
) -> Receiver<Vec<Arc<DataColumnSidecar<T::EthSpec>>>> {
let chain_cloned = chain.clone();
let (data_columns_sender, data_columns_receiver) = tokio::sync::mpsc::channel(1);
chain.task_executor.spawn_blocking(
move || {
let mut timer = metrics::start_timer_vec(
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
&[&blobs.len().to_string()],
);
let blob_refs = blobs
.iter()
.filter_map(|b| b.as_ref().map(|b| &b.blob))
.collect::<Vec<_>>();
let data_columns_result = blobs_to_data_column_sidecars(
&blob_refs,
&block,
&chain_cloned.kzg,
&chain_cloned.spec,
)
.discard_timer_on_break(&mut timer);
drop(timer);
let all_data_columns = match data_columns_result {
Ok(d) => d,
Err(e) => {
error!(
log,
"Failed to build data column sidecars from blobs";
"error" => ?e
);
return;
}
};
if let Err(e) = data_columns_sender.try_send(all_data_columns.clone()) {
error!(log, "Failed to send computed data columns"; "error" => ?e);
};
// Check indices from cache before sending the columns, to make sure we don't
// publish components already seen on gossip.
let is_supernode = chain_cloned.data_availability_checker.is_supernode();
// At the moment non supernodes are not required to publish any columns.
// TODO(das): we could experiment with having full nodes publish their custodied
// columns here.
if !is_supernode {
return;
}
publish_fn(BlobsOrDataColumns::DataColumns(all_data_columns));
},
"compute_and_publish_data_columns",
);
data_columns_receiver
}
fn build_blob_sidecars<E: EthSpec>(
block: &Arc<SignedBeaconBlock<E, FullPayload<E>>>,
response: Vec<Option<BlobAndProofV1<E>>>,
signed_block_header: SignedBeaconBlockHeader,
kzg_commitments_proof: &FixedVector<Hash256, E::KzgCommitmentsInclusionProofDepth>,
) -> Result<FixedBlobSidecarList<E>, FetchEngineBlobError> {
let mut fixed_blob_sidecar_list = FixedBlobSidecarList::default();
for (index, blob_and_proof) in response
.into_iter()
.enumerate()
.filter_map(|(i, opt_blob)| Some((i, opt_blob?)))
{
match BlobSidecar::new_with_existing_proof(
index,
blob_and_proof.blob,
block,
signed_block_header.clone(),
kzg_commitments_proof,
blob_and_proof.proof,
) {
Ok(blob) => {
if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(index) {
*blob_mut = Some(Arc::new(blob));
} else {
return Err(FetchEngineBlobError::InternalError(format!(
"Blobs from EL contains blob with invalid index {index}"
)));
}
}
Err(e) => {
return Err(FetchEngineBlobError::BlobSidecarError(e));
}
}
}
Ok(fixed_blob_sidecar_list)
}