From e09336f5a4ba7480ccf78fa230dd51b325eac280 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Tue, 12 Nov 2024 22:42:05 -0800 Subject: [PATCH] v2 disperser GetBlobStatus endpoint --- core/v2/types.go | 50 +++++++++ disperser/apiserver/get_blob_status_v2.go | 89 +++++++++++++++ disperser/apiserver/server_test.go | 46 ++++++++ disperser/apiserver/server_v2.go | 4 - disperser/apiserver/server_v2_test.go | 104 +++++++++++++++++- .../v2/blobstore/dynamo_metadata_store.go | 42 +++++++ .../blobstore/dynamo_metadata_store_test.go | 6 + 7 files changed, 334 insertions(+), 7 deletions(-) create mode 100644 disperser/apiserver/get_blob_status_v2.go diff --git a/core/v2/types.go b/core/v2/types.go index 9e01049fa5..258ab861f7 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -9,6 +9,7 @@ import ( "strings" commonpb "github.com/Layr-Labs/eigenda/api/grpc/common/v2" + disperserpb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" "github.com/consensys/gnark-crypto/ecc/bn254" @@ -206,6 +207,13 @@ type BatchHeader struct { ReferenceBlockNumber uint64 } +func (h *BatchHeader) ToProtobuf() *commonpb.BatchHeader { + return &commonpb.BatchHeader{ + BatchRoot: h.BatchRoot[:], + ReferenceBlockNumber: h.ReferenceBlockNumber, + } +} + type Batch struct { BatchHeader *BatchHeader BlobCertificates []*BlobCertificate @@ -228,6 +236,36 @@ type Attestation struct { QuorumNumbers []core.QuorumID } +func (a *Attestation) ToProtobuf() *disperserpb.Attestation { + nonSignerPubKeys := make([][]byte, len(a.NonSignerPubKeys)) + for i, p := range a.NonSignerPubKeys { + pubkeyBytes := p.Bytes() + nonSignerPubKeys[i] = pubkeyBytes[:] + } + + quorumAPKs := make([][]byte, len(a.QuorumAPKs)) + for i, p := range a.QuorumAPKs { + apkBytes := p.Bytes() + quorumAPKs[i] = apkBytes[:] + } + + quorumNumbers := make([]uint32, len(a.QuorumNumbers)) + for i, q := range a.QuorumNumbers { + quorumNumbers[i] = uint32(q) + } + + apkG2Bytes := a.APKG2.Bytes() + sigmaBytes := a.Sigma.Bytes() + + return &disperserpb.Attestation{ + NonSignerPubkeys: nonSignerPubKeys, + ApkG2: apkG2Bytes[:], + QuorumApks: quorumAPKs, + Sigma: sigmaBytes[:], + QuorumNumbers: quorumNumbers, + } +} + type BlobVerificationInfo struct { *BatchHeader @@ -236,6 +274,18 @@ type BlobVerificationInfo struct { InclusionProof []byte } +func (v *BlobVerificationInfo) ToProtobuf(blobCert *BlobCertificate) (*disperserpb.BlobVerificationInfo, error) { + blobCertProto, err := blobCert.ToProtobuf() + if err != nil { + return nil, err + } + return &disperserpb.BlobVerificationInfo{ + BlobCertificate: blobCertProto, + BlobIndex: v.BlobIndex, + InclusionProof: v.InclusionProof, + }, nil +} + type BlobVersionParameters struct { CodingRate uint32 ReconstructionThreshold float64 diff --git a/disperser/apiserver/get_blob_status_v2.go b/disperser/apiserver/get_blob_status_v2.go new file mode 100644 index 0000000000..ddb31ddbb9 --- /dev/null +++ b/disperser/apiserver/get_blob_status_v2.go @@ -0,0 +1,89 @@ +package apiserver + +import ( + "context" + "fmt" + + "github.com/Layr-Labs/eigenda/api" + pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" +) + +func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) { + if req.GetBlobKey() == nil || len(req.GetBlobKey()) != 32 { + return nil, api.NewErrorInvalidArg("invalid blob key") + } + + blobKey, err := corev2.BytesToBlobKey(req.GetBlobKey()) + if err != nil { + return nil, api.NewErrorInvalidArg("invalid blob key") + } + + metadata, err := s.blobMetadataStore.GetBlobMetadata(ctx, blobKey) + if err != nil { + s.logger.Error("failed to get blob metadata", "err", err, "blobKey", blobKey.Hex()) + return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob metadata: %s", err.Error())) + } + + if metadata.BlobStatus != dispv2.Certified { + return &pb.BlobStatusReply{ + Status: metadata.BlobStatus.ToProfobuf(), + }, nil + } + + cert, _, err := s.blobMetadataStore.GetBlobCertificate(ctx, blobKey) + if err != nil { + s.logger.Error("failed to get blob certificate", "err", err, "blobKey", blobKey.Hex()) + return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob certificate: %s", err.Error())) + } + + // For certified blobs, include signed batch and blob verification info + blobVerificationInfos, err := s.blobMetadataStore.GetBlobVerificationInfos(ctx, blobKey) + if err != nil { + s.logger.Error("failed to get blob verification info", "err", err, "blobKey", blobKey.Hex()) + return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob verification info: %s", err.Error())) + } + + if len(blobVerificationInfos) == 0 { + s.logger.Error("no verification info found for certified blob", "blobKey", blobKey.Hex()) + return nil, api.NewErrorInternal("no verification info found") + } + + if len(blobVerificationInfos) > 1 { + s.logger.Warn("multiple verification info found for certified blob", "blobKey", blobKey.Hex()) + } + + for _, verificationInfo := range blobVerificationInfos { + // get the signed batch from this verification info + batchHeaderHash, err := verificationInfo.BatchHeader.Hash() + if err != nil { + s.logger.Error("failed to get batch header hash", "err", err, "blobKey", blobKey.Hex()) + continue + } + batchHeader, attestation, err := s.blobMetadataStore.GetSignedBatch(ctx, batchHeaderHash) + if err != nil { + s.logger.Error("failed to get signed batch", "err", err, "blobKey", blobKey.Hex()) + continue + } + + blobVerificationInfoProto, err := verificationInfo.ToProtobuf(cert) + if err != nil { + s.logger.Error("failed to convert blob verification info to protobuf", "err", err, "blobKey", blobKey.Hex()) + continue + } + + // return the first signed batch found + return &pb.BlobStatusReply{ + Status: metadata.BlobStatus.ToProfobuf(), + SignedBatch: &pb.SignedBatch{ + Header: batchHeader.ToProtobuf(), + NonSignerStakesAndSignature: attestation.ToProtobuf(), + }, + BlobVerificationInfo: blobVerificationInfoProto, + }, nil + } + + s.logger.Error("no signed batch found for certified blob", "blobKey", blobKey.Hex()) + return nil, api.NewErrorInternal("no signed batch found") +} diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index ce5bce89cc..73fabaf3c5 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -39,6 +39,7 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/inabox/deploy" + "github.com/consensys/gnark-crypto/ecc/bn254" "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/ory/dockertest/v3" "github.com/stretchr/testify/assert" @@ -64,6 +65,7 @@ var ( localStackPort = "4569" allowlistFile *os.File testMaxBlobSize = 2 * 1024 * 1024 + mockCommitment = encoding.BlobCommitments{} ) func TestMain(m *testing.M) { @@ -657,6 +659,50 @@ func setup() { } dispersalServer = newTestServer(transactor, "setup") + + var X1, Y1 fp.Element + X1 = *X1.SetBigInt(big.NewInt(1)) + Y1 = *Y1.SetBigInt(big.NewInt(2)) + + var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element + _, err = lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + + var lengthProof, lengthCommitment bn254.G2Affine + lengthProof.X.A0 = lengthXA0 + lengthProof.X.A1 = lengthXA1 + lengthProof.Y.A0 = lengthYA0 + lengthProof.Y.A1 = lengthYA1 + + lengthCommitment = lengthProof + + mockCommitment = encoding.BlobCommitments{ + Commitment: &encoding.G1Commitment{ + X: X1, + Y: Y1, + }, + LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), + LengthProof: (*encoding.G2Commitment)(&lengthProof), + Length: 16, + } } func teardown() { diff --git a/disperser/apiserver/server_v2.go b/disperser/apiserver/server_v2.go index 2b4b1858fa..3a4e709112 100644 --- a/disperser/apiserver/server_v2.go +++ b/disperser/apiserver/server_v2.go @@ -144,10 +144,6 @@ func (s *DispersalServerV2) Start(ctx context.Context) error { return nil } -func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) { - return &pb.BlobStatusReply{}, api.NewErrorUnimplemented() -} - func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobCommitmentRequest) (*pb.BlobCommitmentReply, error) { return &pb.BlobCommitmentReply{}, api.NewErrorUnimplemented() } diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index e5a27f1573..e004212ff8 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -12,6 +12,7 @@ import ( "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigenda/common/aws/s3" + "github.com/Layr-Labs/eigenda/core" auth "github.com/Layr-Labs/eigenda/core/auth/v2" "github.com/Layr-Labs/eigenda/core/mock" corev2 "github.com/Layr-Labs/eigenda/core/v2" @@ -20,6 +21,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" "google.golang.org/grpc/peer" pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common" @@ -28,6 +30,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/stretchr/testify/assert" tmock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) type testComponents struct { @@ -201,10 +204,105 @@ func TestV2DisperseBlobRequestValidation(t *testing.T) { func TestV2GetBlobStatus(t *testing.T) { c := newTestServerV2(t) - _, err := c.DispersalServerV2.GetBlobStatus(context.Background(), &pbv2.BlobStatusRequest{ - BlobKey: []byte{1}, + ctx := peer.NewContext(context.Background(), c.Peer) + + blobHeader := &corev2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: mockCommitment, + QuorumNumbers: []core.QuorumID{0}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x1234", + BinIndex: 0, + CumulativePayment: big.NewInt(532), + }, + } + blobKey, err := blobHeader.BlobKey() + require.NoError(t, err) + now := time.Now() + metadata := &dispv2.BlobMetadata{ + BlobHeader: blobHeader, + BlobStatus: dispv2.Queued, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), + } + err = c.BlobMetadataStore.PutBlobMetadata(ctx, metadata) + require.NoError(t, err) + blobCert := &corev2.BlobCertificate{ + BlobHeader: blobHeader, + RelayKeys: []corev2.RelayKey{0, 1, 2}, + } + err = c.BlobMetadataStore.PutBlobCertificate(ctx, blobCert, nil) + require.NoError(t, err) + + // Non ceritified blob status + status, err := c.DispersalServerV2.GetBlobStatus(ctx, &pbv2.BlobStatusRequest{ + BlobKey: blobKey[:], }) - assert.ErrorContains(t, err, "not implemented") + require.NoError(t, err) + require.Equal(t, pbv2.BlobStatus_QUEUED, status.Status) + err = c.BlobMetadataStore.UpdateBlobStatus(ctx, blobKey, dispv2.Encoded) + require.NoError(t, err) + status, err = c.DispersalServerV2.GetBlobStatus(ctx, &pbv2.BlobStatusRequest{ + BlobKey: blobKey[:], + }) + require.NoError(t, err) + require.Equal(t, pbv2.BlobStatus_ENCODED, status.Status) + + // Certified blob status + err = c.BlobMetadataStore.UpdateBlobStatus(ctx, blobKey, dispv2.Certified) + require.NoError(t, err) + batchHeader := &corev2.BatchHeader{ + BatchRoot: [32]byte{1, 2, 3}, + ReferenceBlockNumber: 100, + } + err = c.BlobMetadataStore.PutBatchHeader(ctx, batchHeader) + require.NoError(t, err) + verificationInfo0 := &corev2.BlobVerificationInfo{ + BatchHeader: batchHeader, + BlobKey: blobKey, + BlobIndex: 123, + InclusionProof: []byte("inclusion proof"), + } + err = c.BlobMetadataStore.PutBlobVerificationInfo(ctx, verificationInfo0) + require.NoError(t, err) + + attestation := &corev2.Attestation{ + BatchHeader: batchHeader, + NonSignerPubKeys: []*core.G1Point{ + core.NewG1Point(big.NewInt(1), big.NewInt(2)), + core.NewG1Point(big.NewInt(3), big.NewInt(4)), + }, + APKG2: &core.G2Point{ + G2Affine: &bn254.G2Affine{ + X: mockCommitment.LengthCommitment.X, + Y: mockCommitment.LengthCommitment.Y, + }, + }, + Sigma: &core.Signature{ + G1Point: core.NewG1Point(big.NewInt(5), big.NewInt(6)), + }, + } + err = c.BlobMetadataStore.PutAttestation(ctx, attestation) + require.NoError(t, err) + + reply, err := c.DispersalServerV2.GetBlobStatus(context.Background(), &pbv2.BlobStatusRequest{ + BlobKey: blobKey[:], + }) + require.NoError(t, err) + require.Equal(t, pbv2.BlobStatus_CERTIFIED, reply.GetStatus()) + blobHeaderProto, err := blobHeader.ToProtobuf() + require.NoError(t, err) + blobCertProto, err := blobCert.ToProtobuf() + require.NoError(t, err) + require.Equal(t, blobHeaderProto, reply.GetBlobVerificationInfo().GetBlobCertificate().GetBlobHeader()) + require.Equal(t, blobCertProto.Relays, reply.GetBlobVerificationInfo().GetBlobCertificate().GetRelays()) + require.Equal(t, verificationInfo0.BlobIndex, reply.GetBlobVerificationInfo().GetBlobIndex()) + require.Equal(t, verificationInfo0.InclusionProof, reply.GetBlobVerificationInfo().GetInclusionProof()) + require.Equal(t, batchHeader.BatchRoot[:], reply.GetSignedBatch().GetHeader().BatchRoot) + require.Equal(t, batchHeader.ReferenceBlockNumber, reply.GetSignedBatch().GetHeader().ReferenceBlockNumber) + attestationProto := attestation.ToProtobuf() + require.Equal(t, attestationProto, reply.GetSignedBatch().GetNonSignerStakesAndSignature()) } func TestV2GetBlobCommitment(t *testing.T) { diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go index 847dfc93ab..0bdec5a1a4 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go @@ -524,6 +524,48 @@ func (s *BlobMetadataStore) GetBlobVerificationInfos(ctx context.Context, blobKe return responses, nil } +func (s *BlobMetadataStore) GetSignedBatch(ctx context.Context, batchHeaderHash [32]byte) (*corev2.BatchHeader, *corev2.Attestation, error) { + items, err := s.dynamoDBClient.Query(ctx, s.tableName, "PK = :pk", commondynamodb.ExpressionValues{ + ":pk": &types.AttributeValueMemberS{ + Value: batchHeaderKeyPrefix + hex.EncodeToString(batchHeaderHash[:]), + }, + }) + + if err != nil { + return nil, nil, err + } + + if len(items) == 0 { + return nil, nil, fmt.Errorf("%w: no records found for batch header hash %x", common.ErrMetadataNotFound, batchHeaderHash) + } + + var header *corev2.BatchHeader + var attestation *corev2.Attestation + for _, item := range items { + if strings.HasPrefix(item["SK"].(*types.AttributeValueMemberS).Value, batchHeaderSK) { + header, err = UnmarshalBatchHeader(item) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal batch header: %w", err) + } + } else if strings.HasPrefix(item["SK"].(*types.AttributeValueMemberS).Value, attestationSK) { + attestation, err = UnmarshalAttestation(item) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal attestation: %w", err) + } + } + } + + if header == nil { + return nil, nil, fmt.Errorf("%w: batch header not found for hash %x", common.ErrMetadataNotFound, batchHeaderHash) + } + + if attestation == nil { + return nil, nil, fmt.Errorf("%w: attestation not found for hash %x", common.ErrMetadataNotFound, batchHeaderHash) + } + + return header, attestation, nil +} + func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { return &dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index cbe192d30b..f1c2aa6da1 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -448,6 +448,12 @@ func TestBlobMetadataStoreBatchAttestation(t *testing.T) { err = blobMetadataStore.PutAttestation(ctx, attestation) assert.ErrorIs(t, err, common.ErrAlreadyExists) + // attempt to retrieve batch header and attestation at the same time + fetchedHeader, fetchedAttestation, err = blobMetadataStore.GetSignedBatch(ctx, bhh) + assert.NoError(t, err) + assert.Equal(t, h, fetchedHeader) + assert.Equal(t, attestation, fetchedAttestation) + deleteItems(t, []commondynamodb.Key{ { "PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])},