Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

multiple checkpoint saving fix #40

Merged
merged 6 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/data/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import "errors"

// Standard data package errors.
var (
ErrInvalidType = errors.New("invalid type")
ErrObjectNotFound = errors.New("object not found")
ErrVersionMismatch = errors.New("error version mismatch")
ErrInvalidType = errors.New("invalid type")
ErrObjectNotFound = errors.New("object not found")
)
1 change: 1 addition & 0 deletions pkg/hlf/chcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (cc *chCollector) loopProxyByEvents(ctx context.Context, startedFrom uint64
case <-ctx.Done():
return lastPushedBlockNum
case cc.stream <- data:
// sending data to channel executor
lastPushedBlockNum = &ev.Block.Header.Number
expectedBlockNum++
}
Expand Down
68 changes: 27 additions & 41 deletions pkg/hlf/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/go-errors/errors"
hlfcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/fabsdk"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
Expand Down Expand Up @@ -83,7 +82,7 @@ func NewPool(
pool.fabricSDK, err = createFabricSDK(profilePath)
if err != nil {
return nil, errorshlp.WrapWithDetails(
errors.Errorf("create connection to fabric: %w", err),
fmt.Errorf("create connection to fabric: %w", err),
nerrors.ErrTypeHlf,
nerrors.ComponentHLFStreamsPool,
)
Expand All @@ -109,7 +108,7 @@ func NewPool(
// if not, create the new one
gRPCClient, err = createGRPCClient(channel.TaskExecutor)
if err != nil {
return nil, errorshlp.WrapWithDetails(errors.Errorf("create gRPC client: %w", err), nerrors.ErrTypeHlf, nerrors.ComponentHLFStreamsPool)
return nil, errorshlp.WrapWithDetails(fmt.Errorf("create gRPC client: %w", err), nerrors.ErrTypeHlf, nerrors.ComponentHLFStreamsPool)
}
gRPCClients[channel.TaskExecutor.AddressGRPC] = gRPCClient
}
Expand All @@ -118,7 +117,7 @@ func NewPool(
err = pool.createExecutor(ctx, channel.Name, channelProvider, gRPCClient)
if err != nil {
return nil, errorshlp.WrapWithDetails(
errors.Errorf("create executor: %w", err),
fmt.Errorf("create executor: %w", err),
nerrors.ErrTypeHlf,
nerrors.ComponentHLFStreamsPool,
)
Expand Down Expand Up @@ -217,7 +216,7 @@ func (pool *Pool) createExecutor(
)
if err != nil {
pool.m.TotalReconnectsToFabric().Inc(metrics.Labels().Channel.Create(channel))
return errors.Errorf("start executor: %w", err)
return fmt.Errorf("start executor: %w", err)
}

if gRPCClient != nil {
Expand Down Expand Up @@ -268,7 +267,7 @@ func (pool *Pool) Expand(ctx context.Context, channel string) error {
ready, err := pool.Readiness(channel)
if err != nil {
return errorshlp.WrapWithDetails(
errors.Errorf("pool readiness: %w", err),
fmt.Errorf("pool readiness: %w", err),
nerrors.ErrTypeHlf,
nerrors.ComponentHLFStreamsPool,
)
Expand All @@ -289,19 +288,15 @@ func (pool *Pool) blockchainHeight(key channelKey) (*uint64, error) {

//nolint:funlen
func (pool *Pool) blockKeeper(key channelKey, provider hlfcontext.ChannelProvider) error {
var (
blockNumber uint64
checkPointVersion atomic.Int64
)
var blockNumber uint64

checkPoint, err := pool.checkPoint.CheckpointLoad(pool.gCtx, model.ID(key))
if err != nil {
if !errors.Is(err, data.ErrObjectNotFound) {
pool.log.Error(errors.Errorf("load checkpoint of %s: %w", string(key), err))
pool.log.Error(fmt.Errorf("load checkpoint of %s: %w", string(key), err))
}
} else {
blockNumber = checkPoint.SrcCollectFromBlockNums
checkPointVersion.Store(checkPoint.Ver)
}

collector := createChCollector(pool.gCtx, string(key), blockNumber, provider, pool.opts.BatchTxPreimagePrefix, pool.opts.CollectorsBufSize)
Expand All @@ -316,7 +311,7 @@ func (pool *Pool) blockKeeper(key channelKey, provider hlfcontext.ChannelProvide

bcHeight, err := pool.blockchainHeight(key)
if err != nil {
return errors.Errorf("create channel %s hasCollector, get blockchain height: %w", string(key), err)
return fmt.Errorf("create channel %s hasCollector, get blockchain height: %w", string(key), err)
}
readiness := func() {
if *bcHeight-blockNumber <= 1 {
Expand All @@ -325,8 +320,6 @@ func (pool *Pool) blockKeeper(key channelKey, provider hlfcontext.ChannelProvide
}
readiness()

saver := make(chan struct{}, 1)

pool.m.TotalReconnectsToFabric().Inc(metrics.Labels().Channel.Create(string(key)))

for pool.gCtx.Err() == nil {
Expand All @@ -339,40 +332,33 @@ func (pool *Pool) blockKeeper(key channelKey, provider hlfcontext.ChannelProvide
}
pool.log.Debugf("store block: %d", block.BlockNum)
if err = pool.storeTransfer(key, *block); err != nil {
return errors.Errorf("store block to redis: %w", err)
return fmt.Errorf("store block to redis: %w", err)
}
blockNumber = block.BlockNum
readiness()
sendSaver(saver)
case <-saver:
go func() {
ver := checkPointVersion.Load()
nCheckPoint, err := pool.checkPoint.CheckpointSave(
pool.gCtx,
model.Checkpoint{
Ver: ver,
Channel: model.ID(key),
SrcCollectFromBlockNums: blockNumber,
},
)
if err != nil {
pool.log.Errorf("save checkpoint of %s : %s", string(key), err.Error())
return
}

checkPointVersion.CompareAndSwap(ver, nCheckPoint.Ver)
}()
// saving event checkpoint
checkPoint = pool.saveCheckPoint(checkPoint, key, blockNumber)
}
}

return errors.New(pool.gCtx.Err())
}

func sendSaver(c chan struct{}) {
select {
case c <- struct{}{}:
default:
func (pool *Pool) saveCheckPoint(oldCheckPoint model.Checkpoint, key channelKey, blockNumber uint64) model.Checkpoint {
nCheckPoint, err := pool.checkPoint.CheckpointSave(
pool.gCtx,
model.Checkpoint{
Ver: oldCheckPoint.Ver + 1,
Channel: model.ID(key),
SrcCollectFromBlockNums: blockNumber,
},
)
if err != nil {
pool.log.Errorf("save checkpoint of %s : %s", string(key), err.Error())
return oldCheckPoint
}

return nCheckPoint
}

func (pool *Pool) storeTransfer(key channelKey, block model.BlockData) error {
Expand Down Expand Up @@ -421,7 +407,7 @@ func (pool *Pool) storeTransfer(key channelKey, block model.BlockData) error {

if canBeStored {
if err := pool.syncTransferRequest(*transferBlock, ttl); err != nil {
return errors.Errorf("sync transfer request: %w", err)
return fmt.Errorf("sync transfer request: %w", err)
}
}

Expand Down Expand Up @@ -544,7 +530,7 @@ func (pool *Pool) sendEvent(channel string) {
func createGRPCClient(options *config.TaskExecutor) (*grpc.ClientConn, error) {
gRPCClient, err := newGRPCClient(options)
if err != nil {
return nil, errors.Errorf("create grpc client: %w", err)
return nil, fmt.Errorf("create grpc client: %w", err)
}

return gRPCClient, nil
Expand Down
17 changes: 2 additions & 15 deletions pkg/transfer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package transfer

import (
"context"
"errors"
"fmt"

"github.com/anoideaopen/channel-transfer/pkg/data"
Expand All @@ -21,21 +20,9 @@ func NewBlockCheckpoint(storage *redis.Storage) *BlockCheckpoint {
}

func (ckp *BlockCheckpoint) CheckpointSave(ctx context.Context, checkpoint model.Checkpoint) (model.Checkpoint, error) {
existsCheckPoint := model.Checkpoint{}
if err := ckp.storage.Load(ctx, &existsCheckPoint, data.Key(checkpoint.Channel)); err != nil {
if !errors.Is(err, data.ErrObjectNotFound) {
return model.Checkpoint{}, fmt.Errorf("save checkpoint : %w", err)
}
} else {
if checkpoint.Ver > existsCheckPoint.Ver {
return model.Checkpoint{}, data.ErrVersionMismatch
}
if checkpoint.Ver < existsCheckPoint.Ver {
checkpoint.Ver = existsCheckPoint.Ver
}
checkpoint.Ver++
if checkpoint.Channel == "" || checkpoint.SrcCollectFromBlockNums == 0 || checkpoint.Ver == 0 {
return checkpoint, nil
}

if err := ckp.storage.Save(ctx, &checkpoint, data.Key(checkpoint.Channel)); err != nil {
return model.Checkpoint{}, fmt.Errorf("save checkpoint : %w", err)
}
Expand Down
53 changes: 34 additions & 19 deletions pkg/transfer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/stretchr/testify/assert"
)

var (
channelName = model.ID("cc")
version1 = int64(1)
version2 = int64(2)
)

func TestBlockCheckpoint(t *testing.T) {
storage, err := redis.NewStorage(
redis2.NewUniversalClient(&redis2.UniversalOptions{
Expand All @@ -22,26 +28,35 @@ func TestBlockCheckpoint(t *testing.T) {
)
assert.NoError(t, err)

checkpoint := model.Checkpoint{
Channel: "cc",
Ver: 1,
SrcCollectFromBlockNums: 1,
}

blockCheckpoint := NewBlockCheckpoint(storage)

got, err := blockCheckpoint.CheckpointSave(context.TODO(), checkpoint)
assert.NoError(t, err)

got.SrcCollectFromBlockNums++
checkpoint.SrcCollectFromBlockNums++
checkpoint.Ver++

got, err = blockCheckpoint.CheckpointSave(context.TODO(), got)
assert.NoError(t, err)
assert.Equal(t, checkpoint, got)
checkpoint := model.Checkpoint{
Channel: channelName,
Ver: version1,
SrcCollectFromBlockNums: uint64(version1),
}

got, err = blockCheckpoint.CheckpointLoad(context.TODO(), checkpoint.Channel)
assert.NoError(t, err)
assert.Equal(t, int64(2), got.Ver)
resultCheckPoint := model.Checkpoint{}
t.Run("saving initial checkpoint", func(t *testing.T) {
resultCheckPoint, err = blockCheckpoint.CheckpointSave(context.TODO(), checkpoint)
assert.NoError(t, err)
})

resultCheckPoint.SrcCollectFromBlockNums++
resultCheckPoint.Ver++
checkpoint.SrcCollectFromBlockNums = uint64(version2)
checkpoint.Ver = version2

t.Run("saving new checkpoint", func(t *testing.T) {
resultCheckPoint, err = blockCheckpoint.CheckpointSave(context.TODO(), checkpoint)
assert.NoError(t, err)
assert.Equal(t, checkpoint, resultCheckPoint)
})

t.Run("loading checkpoint", func(t *testing.T) {
resultCheckPoint, err = blockCheckpoint.CheckpointLoad(context.TODO(), checkpoint.Channel)
assert.NoError(t, err)
assert.Equal(t, version2, resultCheckPoint.Ver)
assert.Equal(t, uint64(version2), resultCheckPoint.SrcCollectFromBlockNums)
})
}