From 95cec45c3840864c37327abee2d06ad56a6a2ad0 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 3 Feb 2025 03:07:45 -0300 Subject: [PATCH] Use data column batch verification consistently (#6851) Resolve a `TODO(das)` to use KZG batch verification in `put_rpc_custody_columns` Uses `verify_kzg_for_data_column_list_with_scoring` in all paths that send more than one column. To use batch verification and have attributability of which peer is sending a bad column. Needs to move `verify_kzg_for_data_column_list_with_scoring` into the type's module to convert to the KZG verified type. --- .../src/data_availability_checker.rs | 57 +++++-------------- .../src/data_availability_checker/error.rs | 2 +- .../src/data_column_verification.rs | 44 ++++++++++++++ .../network/src/sync/block_lookups/mod.rs | 12 +++- 4 files changed, 69 insertions(+), 46 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index aa4689121ce..f10d59ca1a5 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -27,8 +27,8 @@ mod overflow_lru_cache; mod state_lru_cache; use crate::data_column_verification::{ - verify_kzg_for_data_column, verify_kzg_for_data_column_list, CustodyDataColumn, - GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, + verify_kzg_for_data_column_list_with_scoring, CustodyDataColumn, GossipVerifiedDataColumn, + KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, }; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, @@ -230,19 +230,14 @@ impl DataAvailabilityChecker { block_root: Hash256, custody_columns: DataColumnSidecarList, ) -> Result, AvailabilityCheckError> { - // TODO(das): report which column is invalid for proper peer scoring - // TODO(das): batch KZG verification here, but fallback into checking each column - // individually to report which column(s) are invalid. - let verified_custody_columns = custody_columns + // Attributes fault to the specific peer that sent an invalid column + let kzg_verified_columns = KzgVerifiedDataColumn::from_batch(custody_columns, &self.kzg) + .map_err(AvailabilityCheckError::InvalidColumn)?; + + let verified_custody_columns = kzg_verified_columns .into_iter() - .map(|column| { - let index = column.index; - Ok(KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::new(column, &self.kzg) - .map_err(|e| AvailabilityCheckError::InvalidColumn(index, e))?, - )) - }) - .collect::, AvailabilityCheckError>>()?; + .map(KzgVerifiedCustodyDataColumn::from_asserted_custody) + .collect::>(); self.availability_cache.put_kzg_verified_data_columns( block_root, @@ -365,7 +360,8 @@ impl DataAvailabilityChecker { .iter() .map(|custody_column| custody_column.as_data_column()), &self.kzg, - )?; + ) + .map_err(AvailabilityCheckError::InvalidColumn)?; Ok(MaybeAvailableBlock::Available(AvailableBlock { block_root, block, @@ -432,8 +428,9 @@ impl DataAvailabilityChecker { // verify kzg for all data columns at once if !all_data_columns.is_empty() { - // TODO: Need to also attribute which specific block is faulty - verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg)?; + // Attributes fault to the specific peer that sent an invalid column + verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg) + .map_err(AvailabilityCheckError::InvalidColumn)?; } for block in blocks { @@ -716,32 +713,6 @@ async fn availability_cache_maintenance_service( } } -fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>( - data_column_iter: I, - kzg: &'a Kzg, -) -> Result<(), AvailabilityCheckError> -where - I: Iterator>> + Clone, -{ - let Err(batch_err) = verify_kzg_for_data_column_list(data_column_iter.clone(), kzg) else { - return Ok(()); - }; - - let data_columns = data_column_iter.collect::>(); - // Find which column is invalid. If len is 1 or 0 continue to default case below. - // If len > 1 at least one column MUST fail. - if data_columns.len() > 1 { - for data_column in data_columns { - if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) { - return Err(AvailabilityCheckError::InvalidColumn(data_column.index, e)); - } - } - } - - // len 0 should never happen - Err(AvailabilityCheckError::InvalidColumn(0, batch_err)) -} - /// A fully available block that is ready to be imported into fork choice. #[derive(Clone, Debug, PartialEq)] pub struct AvailableBlock { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index cfdb3cfe91e..1ab85ab1056 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -4,7 +4,7 @@ use types::{BeaconStateError, ColumnIndex, Hash256}; #[derive(Debug)] pub enum Error { InvalidBlobs(KzgError), - InvalidColumn(ColumnIndex, KzgError), + InvalidColumn(Vec<(ColumnIndex, KzgError)>), ReconstructColumnsError(KzgError), KzgCommitmentMismatch { blob_commitment: KzgCommitment, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 565e76704ef..1262fcdeb8e 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -239,6 +239,18 @@ impl KzgVerifiedDataColumn { pub fn new(data_column: Arc>, kzg: &Kzg) -> Result { verify_kzg_for_data_column(data_column, kzg) } + + pub fn from_batch( + data_columns: Vec>>, + kzg: &Kzg, + ) -> Result, Vec<(ColumnIndex, KzgError)>> { + verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?; + Ok(data_columns + .into_iter() + .map(|column| Self { data: column }) + .collect()) + } + pub fn to_data_column(self) -> Arc> { self.data } @@ -378,6 +390,38 @@ where Ok(()) } +/// Complete kzg verification for a list of `DataColumnSidecar`s. +/// +/// If there's at least one invalid column, it re-verifies all columns individually to identify the +/// first column that is invalid. This is necessary to attribute fault to the specific peer that +/// sent bad data. The re-verification cost should not be significant. If a peer sends invalid data it +/// will be quickly banned. +pub fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>( + data_column_iter: I, + kzg: &'a Kzg, +) -> Result<(), Vec<(ColumnIndex, KzgError)>> +where + I: Iterator>> + Clone, +{ + if verify_kzg_for_data_column_list(data_column_iter.clone(), kzg).is_ok() { + return Ok(()); + }; + + // Find all columns that are invalid and identify by index. If we hit this condition there + // should be at least one invalid column + let errors = data_column_iter + .filter_map(|data_column| { + if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) { + Some((data_column.index, e)) + } else { + None + } + }) + .collect::>(); + + Err(errors) +} + pub fn validate_data_column_sidecar_for_gossip( data_column: Arc>, subnet: u64, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index ac4df42a4e8..2172c8dcd8d 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -36,6 +36,7 @@ use beacon_chain::data_availability_checker::{ use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; pub use common::RequestState; use fnv::FnvHashMap; +use itertools::Itertools; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; @@ -644,8 +645,15 @@ impl BlockLookups { // but future errors may follow the same pattern. Generalize this // pattern with https://github.com/sigp/lighthouse/pull/6321 BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidColumn(index, _), - ) => peer_group.of_index(index as usize).collect(), + AvailabilityCheckError::InvalidColumn(errors), + ) => errors + .iter() + // Collect all peers that sent a column that was invalid. Must + // run .unique as a single peer can send multiple invalid + // columns. Penalize once to avoid insta-bans + .flat_map(|(index, _)| peer_group.of_index((*index) as usize)) + .unique() + .collect(), _ => peer_group.all().collect(), }; for peer in peers_to_penalize {