Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: Layr-Labs/eigenda
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 113b4c623b89e9c9e909563c0ef8ba9482a18670
Choose a base ref
..
head repository: Layr-Labs/eigenda
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: c6da8ce7158ec944c24fc9f6ac5ff81e7e6dc570
Choose a head ref
Showing with 2,431 additions and 739 deletions.
  1. +1 −0 Dockerfile
  2. +5 −4 api/clients/mock/relay_client.go
  3. +90 −25 api/clients/relay_client.go
  4. +328 −0 api/grpc/common/v2/frame.pb.go
  5. +239 −423 api/grpc/relay/relay.pb.go
  6. +17 −17 api/grpc/relay/relay_grpc.pb.go
  7. +32 −0 api/proto/common/v2/frame.proto
  8. +37 −62 api/proto/relay/relay.proto
  9. +2 −1 common/aws/cli.go
  10. +1 −1 common/aws/s3/client.go
  11. +5 −4 common/aws/s3/scoped_keys.go
  12. +11 −0 core/data_test.go
  13. +22 −0 core/utils.go
  14. +11 −0 core/v2/types.go
  15. +386 −0 disperser/api/grpc/encoder/v2/encoder.pb.go
  16. +115 −0 disperser/api/grpc/encoder/v2/encoder_grpc.pb.go
  17. +35 −0 disperser/api/proto/encoder/v2/encoder.proto
  18. +7 −7 disperser/apiserver/disperse_blob_v2.go
  19. +5 −5 disperser/apiserver/server_v2.go
  20. +2 −2 disperser/apiserver/server_v2_test.go
  21. +33 −4 disperser/cmd/encoder/config.go
  22. +0 −45 disperser/cmd/encoder/encoder.go
  23. +17 −0 disperser/cmd/encoder/flags/flags.go
  24. +53 −10 disperser/cmd/encoder/main.go
  25. +3 −4 disperser/common/v2/blobstore/s3_blob_store.go
  26. +5 −4 disperser/common/v2/blobstore/s3_blob_store_test.go
  27. +12 −12 disperser/controller/encoding_manager_test.go
  28. +69 −0 disperser/encoder/client_v2.go
  29. +42 −47 disperser/encoder/server.go
  30. +5 −5 disperser/encoder/server_test.go
  31. +231 −0 disperser/encoder/server_v2.go
  32. +169 −0 disperser/encoder/server_v2_test.go
  33. +131 −0 disperser/encoder/setup_test.go
  34. +2 −2 disperser/encoder_client_v2.go
  35. +2 −2 disperser/mock/encoder_v2.go
  36. +97 −1 encoding/data.go
  37. +2 −0 encoding/encoding.go
  38. +50 −9 encoding/kzg/prover/parametrized_prover.go
  39. +19 −3 encoding/kzg/prover/prover.go
  40. +0 −1 encoding/kzg/prover/prover_cpu.go
  41. +6 −0 encoding/mock/encoder.go
  42. +1 −0 encoding/rs/frame.go
  43. +2 −2 go.mod
  44. +4 −3 inabox/deploy/config.go
  45. +89 −5 inabox/deploy/env_vars.go
  46. +6 −5 relay/chunkstore/chunk_reader.go
  47. +11 −10 relay/chunkstore/chunk_store_test.go
  48. +12 −12 relay/chunkstore/chunk_writer.go
  49. +5 −0 relay/chunkstore/config.go
  50. +2 −2 test/integration_test.go
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ COPY api /app/api
COPY contracts /app/contracts
COPY indexer /app/indexer
COPY encoding /app/encoding
COPY relay /app/relay

# Churner build stage
FROM common-builder AS churner-builder
9 changes: 5 additions & 4 deletions api/clients/mock/relay_client.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)
@@ -23,14 +24,14 @@ func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey,
return args.Get(0).([]byte), args.Error(1)
}

func (c *MockRelayClient) GetChunksByRange(ctx context.Context, requests []*clients.ChunkRequestByRange) ([]clients.BundleResult, error) {
func (c *MockRelayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByRange) ([]core.Bundle, error) {
args := c.Called()
return args.Get(0).([]clients.BundleResult), args.Error(1)
return args.Get(0).([]core.Bundle), args.Error(1)
}

func (c *MockRelayClient) GetChunksByIndex(ctx context.Context, requests []*clients.ChunkRequestByIndex) ([]clients.BundleResult, error) {
func (c *MockRelayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByIndex) ([]core.Bundle, error) {
args := c.Called()
return args.Get(0).([]clients.BundleResult), args.Error(1)
return args.Get(0).([]core.Bundle), args.Error(1)
}

func (c *MockRelayClient) Close() error {
115 changes: 90 additions & 25 deletions api/clients/relay_client.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
relaygrpc "github.com/Layr-Labs/eigenda/api/grpc/relay"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"
@@ -19,32 +20,25 @@ type RelayClientConfig struct {
}

type ChunkRequestByRange struct {
BlobKey corev2.BlobKey
RelayKey corev2.RelayKey
Start uint32
End uint32
BlobKey corev2.BlobKey
Start uint32
End uint32
}

type ChunkRequestByIndex struct {
BlobKey corev2.BlobKey
RelayKey corev2.RelayKey
Indexes []uint32
}

type BundleResult struct {
Bundle core.Bundle
Err error
BlobKey corev2.BlobKey
Indexes []uint32
}

type RelayClient interface {
// GetBlob retrieves a blob from a relay
GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error)
// GetChunksByRange retrieves blob chunks from a relay by chunk index range
// The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request.
GetChunksByRange(ctx context.Context, requests []*ChunkRequestByRange) ([]BundleResult, error)
GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByRange) ([]core.Bundle, error)
// GetChunksByIndex retrieves blob chunks from a relay by index
// The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request.
GetChunksByIndex(ctx context.Context, requests []*ChunkRequestByIndex) ([]BundleResult, error)
GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([]core.Bundle, error)
Close() error
}

@@ -89,27 +83,98 @@ func (c *relayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blo
return nil, err
}

return nil, fmt.Errorf("not implemented")
client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

res, err := client.GetBlob(ctx, &relaygrpc.GetBlobRequest{
BlobKey: blobKey[:],
})
if err != nil {
return nil, err
}

return res.GetBlob(), nil
}

func (c *relayClient) GetChunksByRange(ctx context.Context, requests []*ChunkRequestByRange) ([]BundleResult, error) {
for _, req := range requests {
if err := c.initOnceGrpcConnection(req.RelayKey); err != nil {
return nil, err
func (c *relayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByRange) ([]core.Bundle, error) {
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
for i, req := range requests {
grpcRequests[i] = &relaygrpc.ChunkRequest{
Request: &relaygrpc.ChunkRequest_ByRange{
ByRange: &relaygrpc.ChunkRequestByRange{
BlobKey: req.BlobKey[:],
StartIndex: req.Start,
EndIndex: req.End,
},
},
}
}
res, err := client.GetChunks(ctx, &relaygrpc.GetChunksRequest{
ChunkRequests: grpcRequests,
})

if err != nil {
return nil, err
}

return nil, fmt.Errorf("not implemented")
bundles := make([]core.Bundle, len(res.GetData()))
for i, chunks := range res.GetData() {
bundles[i] = encoding.FramesFromProtobuf(chunks.GetData())
}
return bundles, nil
}

func (c *relayClient) GetChunksByIndex(ctx context.Context, requests []*ChunkRequestByIndex) ([]BundleResult, error) {
for _, req := range requests {
if err := c.initOnceGrpcConnection(req.RelayKey); err != nil {
return nil, err
func (c *relayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([]core.Bundle, error) {
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
for i, req := range requests {
grpcRequests[i] = &relaygrpc.ChunkRequest{
Request: &relaygrpc.ChunkRequest_ByIndex{
ByIndex: &relaygrpc.ChunkRequestByIndex{
BlobKey: req.BlobKey[:],
ChunkIndices: req.Indexes,
},
},
}
}
res, err := client.GetChunks(ctx, &relaygrpc.GetChunksRequest{
ChunkRequests: grpcRequests,
})

if err != nil {
return nil, err
}

return nil, fmt.Errorf("not implemented")
bundles := make([]core.Bundle, len(res.GetData()))
for i, chunks := range res.GetData() {
bundles[i] = encoding.FramesFromProtobuf(chunks.GetData())
}
return bundles, nil
}

func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
Loading