Skip to content

Commit

Permalink
Use data column batch verification consistently (#6851)
Browse files Browse the repository at this point in the history
Resolve a `TODO(das)` to use KZG batch verification in `put_rpc_custody_columns`


  Uses `verify_kzg_for_data_column_list_with_scoring` in all paths that send more than one column. To use batch verification and have attributability of which peer is sending a bad column.

Needs to move `verify_kzg_for_data_column_list_with_scoring` into the type's module to convert to the KZG verified type.
  • Loading branch information
dapplion authored Feb 3, 2025
1 parent 1e2b547 commit 95cec45
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 46 deletions.
57 changes: 14 additions & 43 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_column_verification::{
verify_kzg_for_data_column, verify_kzg_for_data_column_list, CustodyDataColumn,
GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
verify_kzg_for_data_column_list_with_scoring, CustodyDataColumn, GossipVerifiedDataColumn,
KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
};
use crate::metrics::{
KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES,
Expand Down Expand Up @@ -230,19 +230,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// TODO(das): report which column is invalid for proper peer scoring
// TODO(das): batch KZG verification here, but fallback into checking each column
// individually to report which column(s) are invalid.
let verified_custody_columns = custody_columns
// Attributes fault to the specific peer that sent an invalid column
let kzg_verified_columns = KzgVerifiedDataColumn::from_batch(custody_columns, &self.kzg)
.map_err(AvailabilityCheckError::InvalidColumn)?;

let verified_custody_columns = kzg_verified_columns
.into_iter()
.map(|column| {
let index = column.index;
Ok(KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::new(column, &self.kzg)
.map_err(|e| AvailabilityCheckError::InvalidColumn(index, e))?,
))
})
.collect::<Result<Vec<_>, AvailabilityCheckError>>()?;
.map(KzgVerifiedCustodyDataColumn::from_asserted_custody)
.collect::<Vec<_>>();

self.availability_cache.put_kzg_verified_data_columns(
block_root,
Expand Down Expand Up @@ -365,7 +360,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.iter()
.map(|custody_column| custody_column.as_data_column()),
&self.kzg,
)?;
)
.map_err(AvailabilityCheckError::InvalidColumn)?;
Ok(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
Expand Down Expand Up @@ -432,8 +428,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

// verify kzg for all data columns at once
if !all_data_columns.is_empty() {
// TODO: Need to also attribute which specific block is faulty
verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg)?;
// Attributes fault to the specific peer that sent an invalid column
verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg)
.map_err(AvailabilityCheckError::InvalidColumn)?;
}

for block in blocks {
Expand Down Expand Up @@ -716,32 +713,6 @@ async fn availability_cache_maintenance_service<T: BeaconChainTypes>(
}
}

fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>(
data_column_iter: I,
kzg: &'a Kzg,
) -> Result<(), AvailabilityCheckError>
where
I: Iterator<Item = &'a Arc<DataColumnSidecar<E>>> + Clone,
{
let Err(batch_err) = verify_kzg_for_data_column_list(data_column_iter.clone(), kzg) else {
return Ok(());
};

let data_columns = data_column_iter.collect::<Vec<_>>();
// Find which column is invalid. If len is 1 or 0 continue to default case below.
// If len > 1 at least one column MUST fail.
if data_columns.len() > 1 {
for data_column in data_columns {
if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) {
return Err(AvailabilityCheckError::InvalidColumn(data_column.index, e));
}
}
}

// len 0 should never happen
Err(AvailabilityCheckError::InvalidColumn(0, batch_err))
}

/// A fully available block that is ready to be imported into fork choice.
#[derive(Clone, Debug, PartialEq)]
pub struct AvailableBlock<E: EthSpec> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use types::{BeaconStateError, ColumnIndex, Hash256};
#[derive(Debug)]
pub enum Error {
InvalidBlobs(KzgError),
InvalidColumn(ColumnIndex, KzgError),
InvalidColumn(Vec<(ColumnIndex, KzgError)>),
ReconstructColumnsError(KzgError),
KzgCommitmentMismatch {
blob_commitment: KzgCommitment,
Expand Down
44 changes: 44 additions & 0 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,18 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
pub fn new(data_column: Arc<DataColumnSidecar<E>>, kzg: &Kzg) -> Result<Self, KzgError> {
verify_kzg_for_data_column(data_column, kzg)
}

pub fn from_batch(
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
kzg: &Kzg,
) -> Result<Vec<Self>, Vec<(ColumnIndex, KzgError)>> {
verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?;
Ok(data_columns
.into_iter()
.map(|column| Self { data: column })
.collect())
}

pub fn to_data_column(self) -> Arc<DataColumnSidecar<E>> {
self.data
}
Expand Down Expand Up @@ -378,6 +390,38 @@ where
Ok(())
}

/// Complete kzg verification for a list of `DataColumnSidecar`s.
///
/// If there's at least one invalid column, it re-verifies all columns individually to identify the
/// first column that is invalid. This is necessary to attribute fault to the specific peer that
/// sent bad data. The re-verification cost should not be significant. If a peer sends invalid data it
/// will be quickly banned.
pub fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>(
data_column_iter: I,
kzg: &'a Kzg,
) -> Result<(), Vec<(ColumnIndex, KzgError)>>
where
I: Iterator<Item = &'a Arc<DataColumnSidecar<E>>> + Clone,
{
if verify_kzg_for_data_column_list(data_column_iter.clone(), kzg).is_ok() {
return Ok(());
};

// Find all columns that are invalid and identify by index. If we hit this condition there
// should be at least one invalid column
let errors = data_column_iter
.filter_map(|data_column| {
if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) {
Some((data_column.index, e))
} else {
None
}
})
.collect::<Vec<_>>();

Err(errors)
}

pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrategy>(
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
subnet: u64,
Expand Down
12 changes: 10 additions & 2 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use beacon_chain::data_availability_checker::{
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
pub use common::RequestState;
use fnv::FnvHashMap;
use itertools::Itertools;
use lighthouse_network::service::api_types::SingleLookupReqId;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
Expand Down Expand Up @@ -644,8 +645,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// but future errors may follow the same pattern. Generalize this
// pattern with https://github.com/sigp/lighthouse/pull/6321
BlockError::AvailabilityCheck(
AvailabilityCheckError::InvalidColumn(index, _),
) => peer_group.of_index(index as usize).collect(),
AvailabilityCheckError::InvalidColumn(errors),
) => errors
.iter()
// Collect all peers that sent a column that was invalid. Must
// run .unique as a single peer can send multiple invalid
// columns. Penalize once to avoid insta-bans
.flat_map(|(index, _)| peer_group.of_index((*index) as usize))
.unique()
.collect(),
_ => peer_group.all().collect(),
};
for peer in peers_to_penalize {
Expand Down

0 comments on commit 95cec45

Please # to comment.