Skip to content

Commit

Permalink
fix(avail): Avail potential bug fix (#1323)
Browse files Browse the repository at this point in the history
Co-authored-by: PrathyushaLakkireddy <prathyusha@vitwit.com>
  • Loading branch information
mtsitrin and PrathyushaLakkireddy authored Jan 22, 2025
1 parent f66ee1c commit 4f40cf0
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 28 deletions.
119 changes: 97 additions & 22 deletions da/avail/avail.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"time"

"github.com/avast/retry-go/v4"
Expand Down Expand Up @@ -119,6 +120,8 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S

// Set defaults
c.pubsubServer = pubsubServer

// TODO: Make configurable
c.txInclusionTimeout = defaultTxInculsionTimeout
c.batchRetryDelay = defaultBatchRetryDelay
c.batchRetryAttempts = defaultBatchRetryAttempts
Expand All @@ -128,6 +131,14 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
apply(c)
}

metrics.RollappConsecutiveFailedDASubmission.Set(0)
return nil
}

// Start starts DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Start() error {
c.logger.Info("Starting Avail Data Availability Layer Client.")
c.ctx, c.cancel = context.WithCancel(context.Background())
// If client wasn't set, create a new one
if c.client == nil {
substrateApiClient, err := gsrpc.NewSubstrateAPI(c.config.ApiURL)
Expand All @@ -141,15 +152,9 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
}
}

metrics.RollappConsecutiveFailedDASubmission.Set(0)
// check for synced client
go c.sync()

c.ctx, c.cancel = context.WithCancel(context.Background())
return nil
}

// Start starts DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Start() error {
c.synced <- struct{}{}
return nil
}

Expand Down Expand Up @@ -179,7 +184,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Error: err,
Error: errors.Join(da.ErrRetrieval, err),
},
}
}
Expand All @@ -189,7 +194,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Error: err,
Error: errors.Join(da.ErrRetrieval, err),
},
}
}
Expand All @@ -207,24 +212,34 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
err := proto.Unmarshal(data, &pbBatch)
if err != nil {
c.logger.Error("unmarshal batch", "daHeight", daMetaData.Height, "error", err)
continue
break
}
// Convert the proto batch to a batch
batch := &types.Batch{}
err = batch.FromProto(&pbBatch)
if err != nil {
c.logger.Error("batch from proto", "daHeight", daMetaData.Height, "error", err)
continue
break
}
// Add the batch to the list
batches = append(batches, batch)
// Remove the bytes we just decoded.
data = data[proto.Size(&pbBatch):]

}
}
}

// if no batches, return error
if len(batches) == 0 {
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "Blob not found",
Error: da.ErrBlobNotFound,
},
}
}

return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Expand Down Expand Up @@ -259,13 +274,7 @@ func (c *DataAvailabilityLayerClient) submitBatchLoop(dataBlob []byte) da.Result
for {
select {
case <-c.ctx.Done():
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "context done",
Error: c.ctx.Err(),
},
}
return da.ResultSubmitBatch{}
default:
var daBlockHeight uint64
err := retry.Do(
Expand Down Expand Up @@ -381,11 +390,10 @@ func (c *DataAvailabilityLayerClient) broadcastTx(tx []byte) (uint64, error) {
if err != nil {
return 0, fmt.Errorf("%w: %s", da.ErrTxBroadcastNetworkError, err)
}
defer sub.Unsubscribe()

c.logger.Info("Submitted batch to avail. Waiting for inclusion event")

defer sub.Unsubscribe()

inclusionTimer := time.NewTimer(c.txInclusionTimeout)
defer inclusionTimer.Stop()

Expand Down Expand Up @@ -451,3 +459,70 @@ func (d *DataAvailabilityLayerClient) GetMaxBlobSizeBytes() uint32 {
func (c *DataAvailabilityLayerClient) GetSignerBalance() (da.Balance, error) {
return da.Balance{}, nil
}

func (c *DataAvailabilityLayerClient) sync() {
// wrapper to get finalized height and current height from the client
getHeights := func() (uint64, uint64, error) {
finalizedHash, err := c.client.GetFinalizedHead()
if err != nil {
return 0, 0, fmt.Errorf("failed to get finalized head: %w", err)
}

finalizedHeader, err := c.client.GetHeader(finalizedHash)
if err != nil {
return 0, 0, fmt.Errorf("failed to get finalized header: %w", err)
}
finalizedHeight := uint64(finalizedHeader.Number)

currentBlock, err := c.client.GetBlockLatest()
if err != nil {
return 0, 0, fmt.Errorf("failed to get current block: %w", err)
}
currentHeight := uint64(currentBlock.Block.Header.Number)

return finalizedHeight, currentHeight, nil
}

checkSync := func() error {
finalizedHeight, currentHeight, err := getHeights()
if err != nil {
return err
}

// Calculate blocks behind
blocksBehind := uint64(math.Abs(float64(currentHeight - finalizedHeight)))
defaultSyncThreshold := uint64(3)

// Check if within sync threshold
if blocksBehind <= defaultSyncThreshold && currentHeight > 0 {
c.logger.Info("Node is synced",
"current_height", currentHeight,
"finalized_height", finalizedHeight,
"blocks_behind", blocksBehind)
return nil
}

c.logger.Debug("Node is not yet synced",
"current_height", currentHeight,
"finalized_height", finalizedHeight,
"blocks_behind", blocksBehind)

return fmt.Errorf("node not synced: current=%d, finalized=%d, behind=%d",
currentHeight, finalizedHeight, blocksBehind)
}

// Start sync with retry mechanism
err := retry.Do(checkSync,
retry.Attempts(0), // try forever
retry.Context(c.ctx),
retry.Delay(5*time.Second), // TODO: make configurable
retry.LastErrorOnly(true),
retry.DelayType(retry.FixedDelay),
retry.OnRetry(func(n uint, err error) {
c.logger.Error("sync Avail DA", "attempt", n, "error", err)
}),
)

c.logger.Info("Avail-node sync completed.", "err", err)
c.synced <- struct{}{}
}
14 changes: 12 additions & 2 deletions da/avail/avail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,20 @@ func TestRetrieveBatches(t *testing.T) {
pubsubServer := pubsub.NewServer()
err = pubsubServer.Start()
assert.NoError(err)

// set mocks for sync flow
// Set the mock functions
mockSubstrateApiClient.On("GetFinalizedHead", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil)
mockSubstrateApiClient.On("GetHeader", mock.Anything).Return(&availtypes.Header{Number: 1}, nil)
mockSubstrateApiClient.On("GetBlockLatest", mock.Anything).Return(&availtypes.SignedBlock{Block: availtypes.Block{Header: availtypes.Header{Number: 1}}}, nil)

// Start the DALC
dalc := avail.DataAvailabilityLayerClient{}
err = dalc.Init(configBytes, pubsubServer, nil, testutil.NewLogger(t), options...)
require.NoError(err)
err = dalc.Start()
require.NoError(err)
// Set the mock functions
mockSubstrateApiClient.On("GetBlockHash", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil)

// Build batches for the block extrinsics
batch1 := testutil.MustGenerateBatchAndKey(0, 1)
batch2 := testutil.MustGenerateBatchAndKey(2, 3)
Expand Down Expand Up @@ -86,7 +92,11 @@ func TestRetrieveBatches(t *testing.T) {
},
},
}

// Set the mock functions
mockSubstrateApiClient.On("GetBlockHash", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil)
mockSubstrateApiClient.On("GetBlock", mock.Anything).Return(signedBlock, nil)

// Retrieve the batches and make sure we only get the batches relevant for our app id
daMetaData := &da.DASubmitMetaData{
Height: 1,
Expand Down
4 changes: 0 additions & 4 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@ func (c *DataAvailabilityLayerClient) Start() (err error) {
// Stop stops DataAvailabilityLayerClient.
func (c *DataAvailabilityLayerClient) Stop() error {
c.logger.Info("Stopping Celestia Data Availability Layer Client.")
err := c.pubsubServer.Stop()
if err != nil {
return err
}
c.cancel()
close(c.synced)
return nil
Expand Down

0 comments on commit 4f40cf0

Please # to comment.