Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

chore: refactor error handling and tests #1360

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 112 additions & 36 deletions da/weavevm/wvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
pb "github.com/dymensionxyz/dymint/types/pb/dymint"
uretry "github.com/dymensionxyz/dymint/utils/retry"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/ethereum/go-ethereum"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -286,13 +287,15 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
err := retry.Do(
func() error {
resultRetrieveBatch = c.retrieveBatches(daMetaData)

if errors.Is(resultRetrieveBatch.Error, da.ErrRetrieval) {
c.logger.Error("Retrieve batch.", "error", resultRetrieveBatch.Error)
return resultRetrieveBatch.Error
switch resultRetrieveBatch.Error {
case da.ErrRetrieval:
c.logger.Error("Retrieve batch failed with retrieval error. Retrying retrieve attempt.", "error", resultRetrieveBatch.Error)
return resultRetrieveBatch.Error // Trigger retry
case da.ErrBlobNotFound, da.ErrBlobNotIncluded, da.ErrProofNotMatching:
return retry.Unrecoverable(resultRetrieveBatch.Error)
default:
return retry.Unrecoverable(resultRetrieveBatch.Error)
}

return nil
},
retry.Attempts(uint(*c.config.RetryAttempts)), //nolint:gosec // RetryAttempts should be always positive
retry.DelayType(retry.FixedDelay),
Expand All @@ -314,25 +317,41 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet

// 1. Try WeaveVM RPC first
data, errRpc := c.retrieveFromWeaveVM(ctx, daMetaData.WvmTxHash)
if errRpc != nil {
c.logger.Error("Failed to retrieve blob from weavevm rpc, we will try to use weavevm gateway",
"wvm_tx_hash", daMetaData.WvmTxHash, "error", errRpc)
errRpc = fmt.Errorf("unable to retrieve data from weavevm chain rpc: %w", errRpc)
}
if errRpc == nil {
return c.processRetrievedData(data, daMetaData)
}
if isRpcTransactionNotFoundErr(errRpc) {
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: fmt.Errorf("failed to find transaction data in weavevm: %w", errRpc).Error(),
Error: da.ErrBlobNotFound,
},
}
}

c.logger.Error("Failed to retrieve blob from weavevm rpc, we will try to use weavevm gateway",
"wvm_tx_hash", daMetaData.WvmTxHash, "error", errRpc)

// 2. Try gateway
data, errGateway := c.gateway.RetrieveFromGateway(ctx, daMetaData.WvmTxHash)
if errGateway == nil {
if isGatewayTransactionNotFoundErr(data) {
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "failed to find transaction data in weavevm using gateway",
Error: da.ErrBlobNotFound,
},
}
}
return c.processRetrievedData(data, daMetaData)
}

// if we are here it means that gateway call get some kinda of error
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: fmt.Sprintf("failed to retrieve blob from both endpoints: %s :%s", errGateway.Error(), errRpc.Error()),
Message: fmt.Errorf("failed to retrieve data from weave vm gateway: %w", errGateway).Error(),
Error: da.ErrRetrieval,
},
}
Expand All @@ -349,12 +368,12 @@ func (c *DataAvailabilityLayerClient) retrieveFromWeaveVM(ctx context.Context, t

func (c *DataAvailabilityLayerClient) processRetrievedData(data *weaveVMtypes.WvmDymintBlob, daMetaData *da.DASubmitMetaData) da.ResultRetrieveBatch {
var batches []*types.Batch
if data.Blob == nil {
if len(data.Blob) == 0 {
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "Blob not found",
Error: da.ErrBlobNotFound,
Message: "Blob not included",
Error: da.ErrBlobNotIncluded,
},
}
}
Expand Down Expand Up @@ -421,13 +440,15 @@ func (c *DataAvailabilityLayerClient) CheckBatchAvailability(daMetaData *da.DASu
func() error {
result := c.checkBatchAvailability(daMetaData)
availabilityResult = result

if result.Code != da.StatusSuccess {
c.logger.Error("Blob submitted not found in DA. Retrying availability check.")
return result.Error
switch result.Error {
case da.ErrRetrieval:
c.logger.Error("CheckBatchAvailability failed with retrieval error. Retrying availability check.", "error", result.Error)
return result.Error // Trigger retry
case da.ErrBlobNotFound, da.ErrBlobNotIncluded, da.ErrProofNotMatching:
return retry.Unrecoverable(result.Error)
default:
return retry.Unrecoverable(result.Error)
}

return nil
},
retry.Attempts(uint(*c.config.RetryAttempts)), //nolint:gosec // RetryAttempts should be always positive
retry.DelayType(retry.FixedDelay),
Expand All @@ -450,54 +471,98 @@ func (c *DataAvailabilityLayerClient) checkBatchAvailability(daMetaData *da.DASu
Height: daMetaData.Height,
WvmTxHash: daMetaData.WvmTxHash,
WvmBlockHash: daMetaData.WvmBlockHash,
Commitment: daMetaData.Commitment,
}

wvmBlob, err := c.gateway.RetrieveFromGateway(ctx, daMetaData.WvmTxHash)
if err != nil {
data, errRpc := c.retrieveFromWeaveVM(ctx, daMetaData.WvmTxHash)
if errRpc == nil {
return c.processAvailabilityData(data, DACheckMetaData)
}
if isRpcTransactionNotFoundErr(errRpc) {
return da.ResultCheckBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Message: fmt.Errorf("failed to find transaction data in weavevm: %w", errRpc).Error(),
Error: da.ErrBlobNotFound,
},
CheckMetaData: DACheckMetaData,
}
}

if err := c.verifyBlobData(daMetaData.Commitment, wvmBlob.Blob); err != nil {
c.logger.Error("Failed to retrieve blob from weavevm rpc, we will try to use weavevm gateway",
"wvm_tx_hash", daMetaData.WvmTxHash, "error", errRpc)

data, errGateway := c.gateway.RetrieveFromGateway(ctx, daMetaData.WvmTxHash)
if errGateway == nil {
if isGatewayTransactionNotFoundErr(data) {
return da.ResultCheckBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "failed to find transaction data in weavevm using gateway",
Error: da.ErrBlobNotFound,
},
CheckMetaData: DACheckMetaData,
}
}
return c.processAvailabilityData(data, DACheckMetaData)
}
// if we are here it means that gateway call get some kinda of error
return da.ResultCheckBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: fmt.Errorf("failed to retrieve data from weave vm gateway: %w", errGateway).Error(),
Error: da.ErrRetrieval,
},
CheckMetaData: DACheckMetaData,
}
}

func (c *DataAvailabilityLayerClient) processAvailabilityData(data *weaveVMtypes.WvmDymintBlob, daMetaData *da.DACheckMetaData) da.ResultCheckBatch {
if len(data.Blob) == 0 {
return da.ResultCheckBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "Blob not included",
Error: da.ErrBlobNotIncluded,
},
CheckMetaData: daMetaData,
}
}

if err := c.verifyBlobData(daMetaData.Commitment, data.Blob); err != nil {
return da.ResultCheckBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Error: da.ErrProofNotMatching,
},
CheckMetaData: DACheckMetaData,
CheckMetaData: daMetaData,
}
}

// If ArweaveBlockHash is missing in metadata but available in the blob, update it.
if DACheckMetaData.WvmArweaveBlockHash == "" && wvmBlob.ArweaveBlockHash != "" {
DACheckMetaData.WvmArweaveBlockHash = wvmBlob.ArweaveBlockHash
if daMetaData.WvmArweaveBlockHash == "" && data.ArweaveBlockHash != "" {
daMetaData.WvmArweaveBlockHash = data.ArweaveBlockHash
}

if DACheckMetaData.Height < wvmBlob.WvmBlockNumber {
if daMetaData.Height < data.WvmBlockNumber {
// Update metadata only if the blob represents a higher block (reorg case)
DACheckMetaData.WvmArweaveBlockHash = wvmBlob.ArweaveBlockHash
DACheckMetaData.WvmBlockHash = wvmBlob.WvmBlockHash
DACheckMetaData.Height = wvmBlob.WvmBlockNumber
daMetaData.WvmArweaveBlockHash = data.ArweaveBlockHash
daMetaData.WvmBlockHash = data.WvmBlockHash
daMetaData.Height = data.WvmBlockNumber
}

// Ensure WvmBlockHash matches the latest blob hash for consistency
if DACheckMetaData.WvmBlockHash != wvmBlob.WvmBlockHash {
DACheckMetaData.WvmBlockHash = wvmBlob.WvmBlockHash
if daMetaData.WvmBlockHash != data.WvmBlockHash {
daMetaData.WvmBlockHash = data.WvmBlockHash
}

return da.ResultCheckBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Message: "batch available",
},
CheckMetaData: DACheckMetaData,
CheckMetaData: daMetaData,
}
}

Expand Down Expand Up @@ -598,6 +663,17 @@ func (c *DataAvailabilityLayerClient) verifyBlobData(commitment []byte, data []b
return nil
}

func isRpcTransactionNotFoundErr(err error) bool {
return errors.Is(err, ethereum.NotFound)
}

// isGatewayTransactionNotFoundErr checks if the transaction is absent in the Gateway.
// TODO: Gateway indicates a missing transaction by setting WvmBlockHash to "0x".
// it will be fixed in the future
func isGatewayTransactionNotFoundErr(data *weaveVMtypes.WvmDymintBlob) bool {
return data.WvmBlockHash == "0x"
}

func (c *DataAvailabilityLayerClient) DAPath() string {
return fmt.Sprintf("%d", c.config.ChainID)
}
16 changes: 12 additions & 4 deletions da/weavevm/wvm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestRetrieveBatches(t *testing.T) {
Return(nil, false, errors.New("retrieval failed")).Once()

mockGateway.On("RetrieveFromGateway", mock.Anything, testTxHash).
Return(nil, errors.New("blob not found")).Once()
Return(nil, errors.New("retrieval failed")).Once()
},
submitMeta: &da.DASubmitMetaData{
Client: da.WeaveVM,
Expand Down Expand Up @@ -375,6 +375,8 @@ func TestCheckBatchAvailability(t *testing.T) {
{
name: "Successful Availability Check",
setupMocks: func() {
mockWVM.On("GetTransactionByHash", mock.Anything, testTxHash).
Return(nil, false, errors.New("retrieval failed")).Once()
mockGateway.On("RetrieveFromGateway", mock.Anything, testTxHash).
Return(&weaveVMtypes.WvmDymintBlob{
Blob: batchData,
Expand All @@ -395,8 +397,10 @@ func TestCheckBatchAvailability(t *testing.T) {
{
name: "Blob Not Found",
setupMocks: func() {
mockWVM.On("GetTransactionByHash", mock.Anything, testTxHash).
Return(nil, false, errors.New("retrieval failed")).Once()
mockGateway.On("RetrieveFromGateway", mock.Anything, testTxHash).
Return(nil, errors.New("blob not found")).Once()
Return(nil, errors.New("retrieval failed")).Once()
},
submitMeta: &da.DASubmitMetaData{
Client: da.WeaveVM,
Expand All @@ -405,11 +409,13 @@ func TestCheckBatchAvailability(t *testing.T) {
Commitment: batchHash,
},
expectCode: da.StatusError,
expectError: da.ErrBlobNotFound.Error(),
expectError: da.ErrRetrieval.Error(),
},
{
name: "Verification Failure",
setupMocks: func() {
mockWVM.On("GetTransactionByHash", mock.Anything, testTxHash).
Return(nil, false, errors.New("retrieval failed")).Once()
mockGateway.On("RetrieveFromGateway", mock.Anything, testTxHash).
Return(&weaveVMtypes.WvmDymintBlob{
Blob: []byte("corrupted data"),
Expand All @@ -431,6 +437,8 @@ func TestCheckBatchAvailability(t *testing.T) {
{
name: "Context Timeout",
setupMocks: func() {
mockWVM.On("GetTransactionByHash", mock.Anything, testTxHash).
Return(nil, false, errors.New("retrieval failed")).Once()
mockGateway.On("RetrieveFromGateway", mock.Anything, testTxHash).
Return(nil, context.DeadlineExceeded).Once()
},
Expand All @@ -441,7 +449,7 @@ func TestCheckBatchAvailability(t *testing.T) {
Commitment: batchHash,
},
expectCode: da.StatusError,
expectError: da.ErrBlobNotFound.Error(),
expectError: da.ErrRetrieval.Error(),
},
}

Expand Down
Loading