Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Disable move key range in LocalCoordinator + refactoring #780

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 47 additions & 50 deletions pkg/coord/local/clocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package local
import (
"context"
"fmt"
"github.com/pg-sharding/spqr/pkg/models/spqrerror"
"math/rand"
"sync"

Expand Down Expand Up @@ -439,21 +440,21 @@ func (lc *LocalCoordinator) WorldShards() []string {
//
// Returns:
// - error: an error if the move operation encounters any issues.
func (qr *LocalCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error {
func (lc *LocalCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error {
var krmv *qdb.KeyRange
var err error
if krmv, err = qr.qdb.CheckLockedKeyRange(ctx, req.Krid); err != nil {
if krmv, err = lc.qdb.CheckLockedKeyRange(ctx, req.Krid); err != nil {
return err
}

ds, err := qr.qdb.GetDistribution(ctx, krmv.DistributionId)
ds, err := lc.qdb.GetDistribution(ctx, krmv.DistributionId)
if err != nil {
return err
}

var reqKr = kr.KeyRangeFromDB(krmv, ds.ColTypes)
reqKr.ShardID = req.ShardId
return ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, reqKr)
return ops.ModifyKeyRangeWithChecks(ctx, lc.qdb, reqKr)
}

// TODO : unit tests
Expand All @@ -466,12 +467,12 @@ func (qr *LocalCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) erro
//
// Returns:
// - error: an error if the unite operation encounters any issues.
func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) error {
func (lc *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) error {
var krBase *qdb.KeyRange
var krAppendage *qdb.KeyRange
var err error

if krBase, err = qr.qdb.LockKeyRange(ctx, req.BaseKeyRangeId); err != nil { //nolint:all TODO
if krBase, err = lc.qdb.LockKeyRange(ctx, req.BaseKeyRangeId); err != nil { //nolint:all TODO
return err
}

Expand All @@ -481,19 +482,19 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er
spqrlog.Zero.Error().Err(err).Msg("")
return
}
}(qr.qdb, ctx, req.BaseKeyRangeId)
}(lc.qdb, ctx, req.BaseKeyRangeId)

ds, err := qr.qdb.GetDistribution(ctx, krBase.DistributionId)
ds, err := lc.qdb.GetDistribution(ctx, krBase.DistributionId)
if err != nil {
return err
}

// TODO: krRight seems to be empty.
if krAppendage, err = qr.qdb.GetKeyRange(ctx, req.AppendageKeyRangeId); err != nil {
if krAppendage, err = lc.qdb.GetKeyRange(ctx, req.AppendageKeyRangeId); err != nil {
return err
}

if err = qr.qdb.DropKeyRange(ctx, krAppendage.KeyRangeID); err != nil {
if err = lc.qdb.DropKeyRange(ctx, krAppendage.KeyRangeID); err != nil {
return err
}

Expand All @@ -506,7 +507,7 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er
krBaseCopy.LowerBound = newBound
united := kr.KeyRangeFromDB(krBaseCopy, ds.ColTypes)

return ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, united)
return ops.ModifyKeyRangeWithChecks(ctx, lc.qdb, united)
}

// Caller should lock key range
Expand All @@ -520,7 +521,7 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er
//
// Returns:
// - error: an error if the split operation encounters any issues.
func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) error {
func (lc *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) error {
var krOld *qdb.KeyRange
var err error

Expand All @@ -530,17 +531,17 @@ func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) er
Str("source-id", req.SourceID).
Msg("split request is")

if krOld, err = qr.qdb.LockKeyRange(ctx, req.SourceID); err != nil {
if krOld, err = lc.qdb.LockKeyRange(ctx, req.SourceID); err != nil {
return err
}

defer func() {
if err := qr.qdb.UnlockKeyRange(ctx, req.SourceID); err != nil {
if err := lc.qdb.UnlockKeyRange(ctx, req.SourceID); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}()

ds, err := qr.qdb.GetDistribution(ctx, krOld.DistributionId)
ds, err := lc.qdb.GetDistribution(ctx, krOld.DistributionId)
if err != nil {
return err
}
Expand Down Expand Up @@ -570,11 +571,11 @@ func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) er
krOld.LowerBound = req.Bound // TODO: fix
}

if err := ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, kr.KeyRangeFromDB(krOld, ds.ColTypes)); err != nil {
if err := ops.ModifyKeyRangeWithChecks(ctx, lc.qdb, kr.KeyRangeFromDB(krOld, ds.ColTypes)); err != nil {
return err
}

if err := ops.CreateKeyRangeWithChecks(ctx, qr.qdb, krNew); err != nil {
if err := ops.CreateKeyRangeWithChecks(ctx, lc.qdb, krNew); err != nil {
return fmt.Errorf("failed to add a new key range: %w", err)
}

Expand All @@ -592,13 +593,13 @@ func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) er
// Returns:
// - *kr.KeyRange: the locked KeyRange object.
// - error: an error if the lock operation encounters any issues.
func (qr *LocalCoordinator) LockKeyRange(ctx context.Context, krid string) (*kr.KeyRange, error) {
keyRangeDB, err := qr.qdb.LockKeyRange(ctx, krid)
func (lc *LocalCoordinator) LockKeyRange(ctx context.Context, krid string) (*kr.KeyRange, error) {
keyRangeDB, err := lc.qdb.LockKeyRange(ctx, krid)
if err != nil {
return nil, err
}

ds, err := qr.qdb.GetDistribution(ctx, keyRangeDB.DistributionId)
ds, err := lc.qdb.GetDistribution(ctx, keyRangeDB.DistributionId)
if err != nil {
return nil, err
}
Expand All @@ -616,8 +617,8 @@ func (qr *LocalCoordinator) LockKeyRange(ctx context.Context, krid string) (*kr.
//
// Returns:
// - error: an error if the unlock operation encounters any issues.
func (qr *LocalCoordinator) UnlockKeyRange(ctx context.Context, krid string) error {
return qr.qdb.UnlockKeyRange(ctx, krid)
func (lc *LocalCoordinator) UnlockKeyRange(ctx context.Context, krid string) error {
return lc.qdb.UnlockKeyRange(ctx, krid)
}

// TODO : unit tests
Expand Down Expand Up @@ -652,10 +653,10 @@ func (lc *LocalCoordinator) AddDataShard(ctx context.Context, ds *datashards.Dat
//
// Returns:
// - []string: a slice of strings containing the names of the data shards.
func (qr *LocalCoordinator) Shards() []string {
func (lc *LocalCoordinator) Shards() []string {
var ret []string

for name := range qr.DataShardCfgs {
for name := range lc.DataShardCfgs {
ret = append(ret, name)
}

Expand Down Expand Up @@ -697,13 +698,13 @@ func (lc *LocalCoordinator) GetKeyRange(ctx context.Context, krId string) (*kr.K
// Returns:
// - []*kr.KeyRange: a slice of KeyRange objects retrieved.
// - error: an error if the retrieval encounters any issues.
func (qr *LocalCoordinator) ListKeyRanges(ctx context.Context, distribution string) ([]*kr.KeyRange, error) {
func (lc *LocalCoordinator) ListKeyRanges(ctx context.Context, distribution string) ([]*kr.KeyRange, error) {
var ret []*kr.KeyRange
if krs, err := qr.qdb.ListKeyRanges(ctx, distribution); err != nil {
if krs, err := lc.qdb.ListKeyRanges(ctx, distribution); err != nil {
return nil, err
} else {
for _, keyRange := range krs {
ds, err := qr.qdb.GetDistribution(ctx, keyRange.DistributionId)
ds, err := lc.qdb.GetDistribution(ctx, keyRange.DistributionId)

if err != nil {
return nil, err
Expand All @@ -726,8 +727,8 @@ func (qr *LocalCoordinator) ListKeyRanges(ctx context.Context, distribution stri
// Returns:
// - []*kr.KeyRange: a slice of KeyRange objects representing all key ranges.
// - error: an error if the retrieval encounters any issues.
func (qr *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRange, error) {
if krs, err := qr.qdb.ListAllKeyRanges(ctx); err != nil {
func (lc *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRange, error) {
if krs, err := lc.qdb.ListAllKeyRanges(ctx); err != nil {
return nil, err
} else {
var ret []*kr.KeyRange
Expand All @@ -738,7 +739,7 @@ func (qr *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRang
var err error
var ok bool
if ds, ok = cache[keyRange.DistributionId]; !ok {
ds, err = qr.qdb.GetDistribution(ctx, keyRange.DistributionId)
ds, err = lc.qdb.GetDistribution(ctx, keyRange.DistributionId)
if err != nil {
return nil, err
}
Expand All @@ -761,7 +762,7 @@ func (qr *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRang
// Returns:
// - []*topology.Router: a slice of Router objects representing all routers.
// - error: an error if the retrieval encounters any issues.
func (qr *LocalCoordinator) ListRouters(ctx context.Context) ([]*topology.Router, error) {
func (lc *LocalCoordinator) ListRouters(ctx context.Context) ([]*topology.Router, error) {
return []*topology.Router{{
ID: "local",
}}, nil
Expand All @@ -775,20 +776,16 @@ func (qr *LocalCoordinator) ListRouters(ctx context.Context) ([]*topology.Router
//
// Returns:
// - error: An error if the creation encounters any issues.
func (qr *LocalCoordinator) CreateKeyRange(ctx context.Context, kr *kr.KeyRange) error {
return ops.CreateKeyRangeWithChecks(ctx, qr.qdb, kr)
func (lc *LocalCoordinator) CreateKeyRange(ctx context.Context, kr *kr.KeyRange) error {
return ops.CreateKeyRangeWithChecks(ctx, lc.qdb, kr)
}

// MoveKeyRange moves a key range in the LocalCoordinator.
//
// Parameters:
// - ctx (context.Context): The context of the operation.
// - kr (*kr.KeyRange): The key range object to be moved.
// MoveKeyRange is disabled in LocalCoordinator
//
// Returns:
// - error: An error if the move encounters any issues.
func (qr *LocalCoordinator) MoveKeyRange(ctx context.Context, kr *kr.KeyRange) error {
return ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, kr)
// - error: SPQR_INVALID_REQUEST error
func (lc *LocalCoordinator) MoveKeyRange(_ context.Context, _ *kr.KeyRange) error {
return spqrerror.New(spqrerror.SPQR_INVALID_REQUEST, "MoveKeyRange is not available in local coordinator")
}

var ErrNotCoordinator = fmt.Errorf("request is unprocessable in router")
Expand All @@ -801,7 +798,7 @@ var ErrNotCoordinator = fmt.Errorf("request is unprocessable in router")
//
// Returns:
// - error: An error indicating the registration status.
func (qr *LocalCoordinator) RegisterRouter(ctx context.Context, r *topology.Router) error {
func (lc *LocalCoordinator) RegisterRouter(ctx context.Context, r *topology.Router) error {
return ErrNotCoordinator
}

Expand All @@ -813,7 +810,7 @@ func (qr *LocalCoordinator) RegisterRouter(ctx context.Context, r *topology.Rout
//
// Returns:
// - error: An error indicating the unregistration status.
func (qr *LocalCoordinator) UnregisterRouter(ctx context.Context, id string) error {
func (lc *LocalCoordinator) UnregisterRouter(ctx context.Context, id string) error {
return ErrNotCoordinator
}

Expand All @@ -825,7 +822,7 @@ func (qr *LocalCoordinator) UnregisterRouter(ctx context.Context, id string) err
//
// Returns:
// - error: An error indicating the synchronization status. In this case, it returns ErrNotCoordinator.
func (qr *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topology.Router) error {
func (lc *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topology.Router) error {
return ErrNotCoordinator
}

Expand All @@ -837,7 +834,7 @@ func (qr *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topo
//
// Returns:
// - error: An error indicating the update status.
func (qr *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error {
func (lc *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error {
return ErrNotCoordinator
}

Expand All @@ -849,8 +846,8 @@ func (qr *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, ro
//
// Returns:
// - error: An error indicating the update status.
func (qr *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) error {
return qr.qdb.UpdateCoordinator(ctx, addr)
func (lc *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) error {
return lc.qdb.UpdateCoordinator(ctx, addr)
}

// GetCoordinator retrieves the coordinator address from the local coordinator.
Expand All @@ -861,8 +858,8 @@ func (qr *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string)
// Returns:
// - string: The address of the coordinator.
// - error: An error indicating the retrieval status.
func (qr *LocalCoordinator) GetCoordinator(ctx context.Context) (string, error) {
addr, err := qr.qdb.GetCoordinator(ctx)
func (lc *LocalCoordinator) GetCoordinator(ctx context.Context) (string, error) {
addr, err := lc.qdb.GetCoordinator(ctx)
spqrlog.Zero.Debug().Str("address", addr).Msg("resp local coordiantor: get coordinator")
return addr, err
}
Expand All @@ -876,7 +873,7 @@ func (qr *LocalCoordinator) GetCoordinator(ctx context.Context) (string, error)
// Returns:
// - *datashards.DataShard: The retrieved DataShard, or nil if it doesn't exist.
// - error: An error indicating the retrieval status, or ErrNotCoordinator if the operation is not supported by the LocalCoordinator.
func (qr *LocalCoordinator) GetShard(ctx context.Context, shardID string) (*datashards.DataShard, error) {
func (lc *LocalCoordinator) GetShard(ctx context.Context, shardID string) (*datashards.DataShard, error) {
return nil, ErrNotCoordinator
}

Expand Down
Loading