Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Change key range splitting/unification logic #549

Merged
merged 7 commits into from
Mar 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 38 additions & 27 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
@@ -547,8 +547,8 @@ func (qc *qdbCoordinator) UnlockKeyRange(ctx context.Context, keyRangeID string)
})
}

// Split splits key range by req.bound
// TODO : unit tests
// Split TODO: check bounds and keyRangeID (sourceID)
func (qc *qdbCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) error {
spqrlog.Zero.Debug().
Str("krid", req.Krid).
@@ -593,23 +593,29 @@ func (qc *qdbCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) erro
}
}

krNew := kr.KeyRangeFromDB(
&qdb.KeyRange{
LowerBound: req.Bound,
UpperBound: krOld.UpperBound,
KeyRangeID: req.Krid,
ShardID: krOld.ShardID,
DistributionId: krOld.DistributionId,
},
)
krNew := &kr.KeyRange{
LowerBound: func() []byte {
if req.SplitLeft {
return krOld.LowerBound
}
return req.Bound
}(),
ID: req.Krid,
ShardID: krOld.ShardID,
Distribution: krOld.DistributionId,
}

spqrlog.Zero.Debug().
Bytes("lower-bound", krNew.LowerBound).
Str("shard-id", krNew.ShardID).
Str("id", krNew.ID).
Msg("new key range")

krOld.UpperBound = req.Bound
if req.SplitLeft {
krOld.LowerBound = req.Bound
} else {
krOld.UpperBound = req.Bound
}
if err := ops.ModifyKeyRangeWithChecks(ctx, qc.db, kr.KeyRangeFromDB(krOld)); err != nil {
return err
}
@@ -621,9 +627,9 @@ func (qc *qdbCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) erro
if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewKeyRangeServiceClient(cc)
resp, err := cl.SplitKeyRange(ctx, &routerproto.SplitKeyRangeRequest{
Bound: req.Bound,
SourceId: req.SourceID,
KeyRangeInfo: krNew.ToProto(),
Bound: req.Bound,
SourceId: req.SourceID,
NewId: krNew.ID,
})
spqrlog.Zero.Debug().
Interface("response", resp).
@@ -679,39 +685,40 @@ func (qc *qdbCoordinator) DropKeyRange(ctx context.Context, id string) error {

// TODO : unit tests
func (qc *qdbCoordinator) Unite(ctx context.Context, uniteKeyRange *kr.UniteKeyRange) error {
krLeft, err := qc.db.LockKeyRange(ctx, uniteKeyRange.KeyRangeIDLeft)
krBase, err := qc.db.LockKeyRange(ctx, uniteKeyRange.BaseKeyRangeId)
if err != nil {
return err
}

defer func() {
if err := qc.db.UnlockKeyRange(ctx, uniteKeyRange.KeyRangeIDLeft); err != nil {
if err := qc.db.UnlockKeyRange(ctx, uniteKeyRange.BaseKeyRangeId); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}()

krRight, err := qc.db.LockKeyRange(ctx, uniteKeyRange.KeyRangeIDRight)
krAppendage, err := qc.db.LockKeyRange(ctx, uniteKeyRange.AppendageKeyRangeId)
if err != nil {
return err
}

defer func() {
if err := qc.db.UnlockKeyRange(ctx, uniteKeyRange.KeyRangeIDRight); err != nil {
if err := qc.db.UnlockKeyRange(ctx, uniteKeyRange.AppendageKeyRangeId); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}()

if krLeft.ShardID != krRight.ShardID {
if krBase.ShardID != krAppendage.ShardID {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "failed to unite key ranges routing different shards")
}
if krLeft.DistributionId != krRight.DistributionId {
if krBase.DistributionId != krAppendage.DistributionId {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "failed to unite key ranges of different distributions")
}
ds, err := qc.db.GetDistribution(ctx, krLeft.DistributionId)
ds, err := qc.db.GetDistribution(ctx, krBase.DistributionId)
if err != nil {
return err
}
// TODO: check all types when composite keys are supported
krLeft, krRight := krBase, krAppendage
if kr.CmpRangesLess(krRight.LowerBound, krLeft.LowerBound) {
krLeft, krRight = krRight, krLeft
}
@@ -729,19 +736,23 @@ func (qc *qdbCoordinator) Unite(ctx context.Context, uniteKeyRange *kr.UniteKeyR
}
}

if err := qc.db.DropKeyRange(ctx, krRight.KeyRangeID); err != nil {
if err := qc.db.DropKeyRange(ctx, krAppendage.KeyRangeID); err != nil {
return spqrerror.Newf(spqrerror.SPQR_KEYRANGE_ERROR, "failed to drop an old key range: %s", err.Error())
}

if err := ops.ModifyKeyRangeWithChecks(ctx, qc.db, kr.KeyRangeFromDB(krLeft)); err != nil {
if krLeft.KeyRangeID != krBase.KeyRangeID {
krBase.LowerBound = krAppendage.LowerBound
}

if err := ops.ModifyKeyRangeWithChecks(ctx, qc.db, kr.KeyRangeFromDB(krBase)); err != nil {
return spqrerror.Newf(spqrerror.SPQR_KEYRANGE_ERROR, "failed to update a new key range: %s", err.Error())
}

if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewKeyRangeServiceClient(cc)
resp, err := cl.MergeKeyRange(ctx, &routerproto.MergeKeyRangeRequest{
Bound: krRight.LowerBound,
Distribution: krRight.DistributionId,
BaseId: uniteKeyRange.BaseKeyRangeId,
AppendageId: uniteKeyRange.AppendageKeyRangeId,
})

spqrlog.Zero.Debug().
@@ -778,10 +789,10 @@ func (qc *qdbCoordinator) RecordKeyRangeMove(ctx context.Context, m *qdb.MoveKey
return m.MoveId, nil
}

// TODO : unit tests
// Move key range from one logical shard to another
// This function reshards data by locking a portion of it,
// making it unavailable for read and write access during the process.
// TODO : unit tests
func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error {
// First, we create a record in the qdb to track the data movement.
// If the coordinator crashes during the process, we need to rerun this function.
@@ -849,7 +860,7 @@ func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error
if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewKeyRangeServiceClient(cc)
moveResp, err := cl.MoveKeyRange(ctx, &routerproto.MoveKeyRangeRequest{
KeyRange: krmv.ToProto(),
Id: krmv.ID,
ToShardId: krmv.ShardID,
})
spqrlog.Zero.Debug().
51 changes: 9 additions & 42 deletions coordinator/provider/keyranges.go
Original file line number Diff line number Diff line change
@@ -5,8 +5,6 @@ import (

"github.com/pg-sharding/spqr/pkg/models/spqrerror"

"github.com/pg-sharding/spqr/pkg/spqrlog"

"github.com/pg-sharding/spqr/coordinator"
"github.com/pg-sharding/spqr/pkg/models/kr"
protos "github.com/pg-sharding/spqr/pkg/protos"
@@ -52,9 +50,10 @@ func (c *CoordinatorService) UnlockKeyRange(ctx context.Context, request *protos
// TODO : unit tests
func (c *CoordinatorService) SplitKeyRange(ctx context.Context, request *protos.SplitKeyRangeRequest) (*protos.ModifyReply, error) {
splitKR := &kr.SplitKeyRange{
Bound: request.Bound,
Krid: request.KeyRangeInfo.Krid,
SourceID: request.SourceId,
Bound: request.Bound,
Krid: request.NewId,
SourceID: request.SourceId,
SplitLeft: request.SplitLeft,
}

if err := c.impl.Split(ctx, splitKR); err != nil {
@@ -101,7 +100,7 @@ func (c *CoordinatorService) ListAllKeyRanges(ctx context.Context, _ *protos.Lis
// TODO : unit tests
func (c *CoordinatorService) MoveKeyRange(ctx context.Context, request *protos.MoveKeyRangeRequest) (*protos.ModifyReply, error) {
if err := c.impl.Move(ctx, &kr.MoveKeyRange{
Krid: request.KeyRange.Krid,
Krid: request.Id,
ShardId: request.ToShardId,
}); err != nil {
return nil, err
@@ -112,42 +111,10 @@ func (c *CoordinatorService) MoveKeyRange(ctx context.Context, request *protos.M

// TODO : unit tests
func (c *CoordinatorService) MergeKeyRange(ctx context.Context, request *protos.MergeKeyRangeRequest) (*protos.ModifyReply, error) {
krsqb, err := c.impl.ListAllKeyRanges(ctx)
if err != nil {
return nil, err
}

bound := request.GetBound()
uniteKeyRange := &kr.UniteKeyRange{}

var matched_low *kr.KeyRange = nil

for _, krqb := range krsqb {
if kr.CmpRangesEqual(krqb.LowerBound, bound) {
uniteKeyRange.KeyRangeIDRight = krqb.ID
}

if kr.CmpRangesLess(krqb.LowerBound, bound) {
if matched_low == nil || kr.CmpRangesLess(matched_low.LowerBound, krqb.LowerBound) {
uniteKeyRange.KeyRangeIDLeft = krqb.ID
matched_low = krqb
}
}
}

spqrlog.Zero.Debug().
Str("left", uniteKeyRange.KeyRangeIDLeft).
Str("right", uniteKeyRange.KeyRangeIDRight).
Msg("unite keyrange")

if uniteKeyRange.KeyRangeIDLeft == "" || uniteKeyRange.KeyRangeIDRight == "" {
spqrlog.Zero.Debug().
Bytes("bound", bound).
Msg("key ranges to merge by border not found")
return &protos.ModifyReply{}, nil
}

if err := c.impl.Unite(ctx, uniteKeyRange); err != nil {
if err := c.impl.Unite(ctx, &kr.UniteKeyRange{
BaseKeyRangeId: request.GetBaseId(),
AppendageKeyRangeId: request.GetAppendageId(),
}); err != nil {
return nil, spqrerror.Newf(spqrerror.SPQR_KEYRANGE_ERROR, "failed to unite key ranges: %s", err.Error())
}

2 changes: 1 addition & 1 deletion pkg/clientinteractor/interactor.go
Original file line number Diff line number Diff line change
@@ -562,7 +562,7 @@ func (pi *PSQLInteractor) MergeKeyRanges(_ context.Context, unite *kr.UniteKeyRa
},
},
},
&pgproto3.DataRow{Values: [][]byte{[]byte(fmt.Sprintf("merge key ranges %v and %v", unite.KeyRangeIDLeft, unite.KeyRangeIDRight))}},
&pgproto3.DataRow{Values: [][]byte{[]byte(fmt.Sprintf("merge key ranges %v and %v", unite.BaseKeyRangeId, unite.AppendageKeyRangeId))}},
&pgproto3.CommandComplete{},
&pgproto3.ReadyForQuery{},
} {
33 changes: 17 additions & 16 deletions pkg/coord/adapter.go
Original file line number Diff line number Diff line change
@@ -129,15 +129,10 @@ func (a *Adapter) Split(ctx context.Context, split *kr.SplitKeyRange) error {
if keyRange.ID == split.SourceID {
c := proto.NewKeyRangeServiceClient(a.conn)
_, err := c.SplitKeyRange(ctx, &proto.SplitKeyRangeRequest{
Bound: split.Bound,
SourceId: split.SourceID,
KeyRangeInfo: &proto.KeyRangeInfo{
Krid: split.Krid,
ShardId: keyRange.ShardID,
KeyRange: &proto.KeyRange{
LowerBound: string(keyRange.LowerBound),
},
},
Bound: split.Bound,
SourceId: split.SourceID,
NewId: split.Krid,
SplitLeft: split.SplitLeft,
})
return err
}
@@ -156,20 +151,25 @@ func (a *Adapter) Unite(ctx context.Context, unite *kr.UniteKeyRange) error {
var left *kr.KeyRange
var right *kr.KeyRange

// Check for in-between key ranges
for _, kr := range krs {
if kr.ID == unite.KeyRangeIDLeft {
if kr.ID == unite.BaseKeyRangeId {
left = kr
}
if kr.ID == unite.KeyRangeIDRight {
if kr.ID == unite.AppendageKeyRangeId {
right = kr
}
}

for _, krcurr := range krs {
if krcurr.ID == unite.KeyRangeIDLeft || krcurr.ID == unite.KeyRangeIDRight {
if kr.CmpRangesLess(right.LowerBound, left.LowerBound) {
left, right = right, left
}

for _, krCurr := range krs {
if krCurr.ID == unite.BaseKeyRangeId || krCurr.ID == unite.AppendageKeyRangeId {
continue
}
if kr.CmpRangesLess(krcurr.LowerBound, right.LowerBound) && kr.CmpRangesLess(left.LowerBound, krcurr.LowerBound) {
if kr.CmpRangesLess(krCurr.LowerBound, right.LowerBound) && kr.CmpRangesLess(left.LowerBound, krCurr.LowerBound) {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "unvalid unite request")
}
}
@@ -180,7 +180,8 @@ func (a *Adapter) Unite(ctx context.Context, unite *kr.UniteKeyRange) error {

c := proto.NewKeyRangeServiceClient(a.conn)
_, err = c.MergeKeyRange(ctx, &proto.MergeKeyRangeRequest{
Bound: right.LowerBound,
BaseId: unite.BaseKeyRangeId,
AppendageId: unite.AppendageKeyRangeId,
})
return err
}
@@ -196,7 +197,7 @@ func (a *Adapter) Move(ctx context.Context, move *kr.MoveKeyRange) error {
if keyRange.ID == move.Krid {
c := proto.NewKeyRangeServiceClient(a.conn)
_, err := c.MoveKeyRange(ctx, &proto.MoveKeyRangeRequest{
KeyRange: keyRange.ToProto(),
Id: keyRange.ID,
ToShardId: move.ShardId,
})
return err
Loading
Loading