Skip to content

Commit

Permalink
Some sync/backfill format nits (#6861)
Browse files Browse the repository at this point in the history
When working on unrelated changes I noted:

- An unnecessary closure left by a commit of some guy named @dapplion that can be removed
- match statements that can be simplified with the new let else syntax
- instead of mapping a result to ignore the Ok value, return
  • Loading branch information
dapplion authored Jan 30, 2025
1 parent 4a07c08 commit 66c6552
Showing 1 changed file with 65 additions and 79 deletions.
144 changes: 65 additions & 79 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,67 +388,59 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
blocks: Vec<RpcBlock<T::EthSpec>>,
) -> Result<ProcessResult, BackFillError> {
// check if we have this batch
let batch = match self.batches.get_mut(&batch_id) {
None => {
if !matches!(self.state(), BackFillState::Failed) {
// A batch might get removed when the chain advances, so this is non fatal.
debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id);
}
return Ok(ProcessResult::Successful);
}
Some(batch) => {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(ProcessResult::Successful);
}
batch
let Some(batch) = self.batches.get_mut(&batch_id) else {
if !matches!(self.state(), BackFillState::Failed) {
// A batch might get removed when the chain advances, so this is non fatal.
debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id);
}
return Ok(ProcessResult::Successful);
};

{
// A stream termination has been sent. This batch has ended. Process a completed batch.
// Remove the request from the peer's active batches
self.active_requests
.get_mut(peer_id)
.map(|active_requests| active_requests.remove(&batch_id));

match batch.download_completed(blocks) {
Ok(received) => {
let awaiting_batches =
self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH;
debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches);

// pre-emptively request more blocks from peers whilst we process current blocks,
self.request_batches(network)?;
self.process_completed_batches(network)
}
Err(result) => {
let (expected_boundary, received_boundary, outcome) = match result {
Err(e) => {
return self
.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
.map(|_| ProcessResult::Successful);
}
Ok(v) => v,
};
warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary,
"peer_id" => %peer_id, batch);
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(ProcessResult::Successful);
}

if let BatchOperationOutcome::Failed { blacklist: _ } = outcome {
error!(self.log, "Backfill failed"; "epoch" => batch_id, "received_boundary" => received_boundary, "expected_boundary" => expected_boundary);
return self
.fail_sync(BackFillError::BatchDownloadFailed(batch_id))
.map(|_| ProcessResult::Successful);
// A stream termination has been sent. This batch has ended. Process a completed batch.
// Remove the request from the peer's active batches
self.active_requests
.get_mut(peer_id)
.map(|active_requests| active_requests.remove(&batch_id));

match batch.download_completed(blocks) {
Ok(received) => {
let awaiting_batches =
self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH;
debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches);

// pre-emptively request more blocks from peers whilst we process current blocks,
self.request_batches(network)?;
self.process_completed_batches(network)
}
Err(result) => {
let (expected_boundary, received_boundary, outcome) = match result {
Err(e) => {
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
return Ok(ProcessResult::Successful);
}
// this batch can't be used, so we need to request it again.
self.retry_batch_download(network, batch_id)
.map(|_| ProcessResult::Successful)
Ok(v) => v,
};
warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary,
"peer_id" => %peer_id, batch);

if let BatchOperationOutcome::Failed { blacklist: _ } = outcome {
error!(self.log, "Backfill failed"; "epoch" => batch_id, "received_boundary" => received_boundary, "expected_boundary" => expected_boundary);
self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))?;
return Ok(ProcessResult::Successful);
}
// this batch can't be used, so we need to request it again.
self.retry_batch_download(network, batch_id)?;
Ok(ProcessResult::Successful)
}
}
}
Expand Down Expand Up @@ -582,20 +574,16 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
};

let peer = match batch.current_peer() {
Some(v) => *v,
None => {
return self
.fail_sync(BackFillError::BatchInvalidState(
batch_id,
String::from("Peer does not exist"),
))
.map(|_| ProcessResult::Successful)
}
let Some(peer) = batch.current_peer() else {
self.fail_sync(BackFillError::BatchInvalidState(
batch_id,
String::from("Peer does not exist"),
))?;
return Ok(ProcessResult::Successful);
};

debug!(self.log, "Backfill batch processed"; "result" => ?result, &batch,
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(peer));

match result {
BatchProcessResult::Success {
Expand Down Expand Up @@ -679,8 +667,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
{
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
}
self.retry_batch_download(network, batch_id)
.map(|_| ProcessResult::Successful)
self.retry_batch_download(network, batch_id)?;
Ok(ProcessResult::Successful)
}
}
}
Expand Down Expand Up @@ -712,11 +700,10 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// - AwaitingDownload -> A recoverable failed batch should have been
// re-requested.
// - Processing -> `self.current_processing_batch` is None
return self
.fail_sync(BackFillError::InvalidSyncState(String::from(
"Invalid expected batch state",
)))
.map(|_| ProcessResult::Successful);
self.fail_sync(BackFillError::InvalidSyncState(String::from(
"Invalid expected batch state",
)))?;
return Ok(ProcessResult::Successful);
}
BatchState::AwaitingValidation(_) => {
// TODO: I don't think this state is possible, log a CRIT just in case.
Expand All @@ -731,12 +718,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}
} else {
return self
.fail_sync(BackFillError::InvalidSyncState(format!(
"Batch not found for current processing target {}",
self.processing_target
)))
.map(|_| ProcessResult::Successful);
self.fail_sync(BackFillError::InvalidSyncState(format!(
"Batch not found for current processing target {}",
self.processing_target
)))?;
return Ok(ProcessResult::Successful);
}
Ok(ProcessResult::Successful)
}
Expand Down

0 comments on commit 66c6552

Please # to comment.