Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix working with multiple distributions in balancer #572

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 59 additions & 40 deletions balancer/provider/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ type BalancerImpl struct {
coordinatorConn *grpc.ClientConn
threshold []float64

keyRanges []*kr.KeyRange
krIdx map[string]int
shardKr map[string][]int
dsToKeyRanges map[string][]*kr.KeyRange
dsToKrIdx map[string]map[string]int
shardKr map[string][]string
krToDs map[string]string
}

func NewBalancer() (*BalancerImpl, error) {
Expand All @@ -41,9 +42,10 @@ func NewBalancer() (*BalancerImpl, error) {
return &BalancerImpl{
coordinatorConn: conn,
threshold: threshold,
keyRanges: []*kr.KeyRange{},
krIdx: map[string]int{},
shardKr: map[string][]int{},
dsToKeyRanges: map[string][]*kr.KeyRange{},
dsToKrIdx: map[string]map[string]int{},
shardKr: map[string][]string{},
krToDs: map[string]string{},
}, nil
}

Expand Down Expand Up @@ -252,7 +254,7 @@ func (b *BalancerImpl) getStatsByKeyRange(ctx context.Context, shard *ShardMetri
if err = rows.Scan(&krId, &cpu); err != nil {
return err
}
if _, ok := b.krIdx[krId]; !ok {
if _, ok := b.dsToKrIdx[krId]; !ok {
continue
}
if _, ok := shard.MetricsKR[krId]; !ok {
Expand All @@ -267,8 +269,10 @@ func (b *BalancerImpl) getStatsByKeyRange(ctx context.Context, shard *ShardMetri
return err
}

for _, i := range b.shardKr[shard.ShardId] {
krg := b.keyRanges[i]
for _, krId := range b.shardKr[shard.ShardId] {
ds := b.krToDs[krId]
i := b.dsToKrIdx[ds][krId]
krg := b.dsToKeyRanges[ds][i]
if krg.ShardID != shard.ShardId {
continue
}
Expand All @@ -284,8 +288,8 @@ func (b *BalancerImpl) getStatsByKeyRange(ctx context.Context, shard *ShardMetri
WHERE %s;
`
var nextKR *kr.KeyRange
if i < len(b.keyRanges)-1 {
nextKR = b.keyRanges[i+1]
if i < len(b.dsToKeyRanges[ds])-1 {
nextKR = b.dsToKeyRanges[ds][i+1]
}
condition, err := b.getKRCondition(rel, krg, nextKR, "t")
if err != nil {
Expand Down Expand Up @@ -417,15 +421,16 @@ func (b *BalancerImpl) maxFitOnShard(krMetrics []float64, krKeyCount int64, shar

func (b *BalancerImpl) getAdjacentShards(krId string) map[string]struct{} {
res := make(map[string]struct{}, 0)
krIdx := b.krIdx[krId]
ds := b.krToDs[krId]
krIdx := b.dsToKrIdx[ds][krId]
if krIdx != 0 {
res[b.keyRanges[krIdx-1].ShardID] = struct{}{}
res[b.dsToKeyRanges[ds][krIdx-1].ShardID] = struct{}{}
}
if krIdx < len(b.keyRanges)-1 {
res[b.keyRanges[krIdx+1].ShardID] = struct{}{}
if krIdx < len(b.dsToKeyRanges)-1 {
res[b.dsToKeyRanges[ds][krIdx+1].ShardID] = struct{}{}
}
// do not include current shard
delete(res, b.keyRanges[krIdx].ShardID)
delete(res, b.dsToKeyRanges[ds][krIdx].ShardID)
return res
}

Expand Down Expand Up @@ -463,14 +468,18 @@ func (b *BalancerImpl) getTasks(ctx context.Context, shardFrom *ShardMetrics, kr
Int("key_count", keyCount).
Msg("generating move tasks")
// Move from beginning or the end of key range
krInd := b.krIdx[krId]
if _, ok := b.krToDs[krId]; !ok {
return nil, fmt.Errorf("unknown key range id \"%s\"", krId)
}
ds := b.krToDs[krId]
krInd := b.dsToKrIdx[ds][krId]
krIdTo := ""
var join tasks.JoinType = tasks.JoinNone
if krInd < len(b.keyRanges)-1 && b.keyRanges[krInd+1].ShardID == shardToId {
krIdTo = b.keyRanges[krInd+1].ID
if krInd < len(b.dsToKeyRanges[ds])-1 && b.dsToKeyRanges[ds][krInd+1].ShardID == shardToId {
krIdTo = b.dsToKeyRanges[ds][krInd+1].ID
join = tasks.JoinRight
} else if krInd > 0 && b.keyRanges[krInd-1].ShardID == shardToId {
krIdTo = b.keyRanges[krInd-1].ID
} else if krInd > 0 && b.dsToKeyRanges[ds][krInd-1].ShardID == shardToId {
krIdTo = b.dsToKeyRanges[ds][krInd-1].ID
join = tasks.JoinLeft
}

Expand All @@ -491,11 +500,8 @@ func (b *BalancerImpl) getTasks(ctx context.Context, shardFrom *ShardMetrics, kr
maxCount = count
}
}
if _, ok := b.krIdx[krId]; !ok {
return nil, fmt.Errorf("unknown key range id \"%s\"", krId)
}
var rel *distributions.DistributedRelation = nil
allRels, err := b.getKRRelations(ctx, b.keyRanges[b.krIdx[krId]])
allRels, err := b.getKRRelations(ctx, b.dsToKeyRanges[ds][krInd])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -664,22 +670,35 @@ func (b *BalancerImpl) updateKeyRanges(ctx context.Context) error {
if err != nil {
return err
}
keyRanges := make([]*kr.KeyRange, len(keyRangesProto.KeyRangesInfo))
for i, krProto := range keyRangesProto.KeyRangesInfo {
keyRanges[i] = kr.KeyRangeFromProto(krProto)
keyRanges := make(map[string][]*kr.KeyRange)
for _, krProto := range keyRangesProto.KeyRangesInfo {
if _, ok := keyRanges[krProto.DistributionId]; !ok {
keyRanges[krProto.DistributionId] = make([]*kr.KeyRange, 0)
}
keyRanges[krProto.DistributionId] = append(keyRanges[krProto.DistributionId], kr.KeyRangeFromProto(krProto))
}
sort.Slice(keyRanges, func(i, j int) bool {
return kr.CmpRangesLess(keyRanges[i].LowerBound, keyRanges[j].LowerBound)
})
b.keyRanges = keyRanges
b.krIdx = make(map[string]int)
b.shardKr = make(map[string][]int)
for i, krg := range b.keyRanges {
b.krIdx[krg.ID] = i
if _, ok := b.shardKr[krg.ShardID]; !ok {
b.shardKr[krg.ShardID] = make([]int, 0)
}
b.shardKr[krg.ShardID] = append(b.shardKr[krg.ShardID], i)
for _, krs := range keyRanges {
sort.Slice(krs, func(i, j int) bool {
return kr.CmpRangesLess(krs[i].LowerBound, krs[j].LowerBound)
})
}

b.dsToKeyRanges = keyRanges
b.dsToKrIdx = make(map[string]map[string]int)
b.shardKr = make(map[string][]string)
b.krToDs = make(map[string]string)
for ds, krs := range b.dsToKeyRanges {
for i, krg := range krs {
b.krToDs[krg.ID] = ds
if _, ok := b.dsToKrIdx[ds]; !ok {
b.dsToKrIdx[ds] = make(map[string]int)
}
b.dsToKrIdx[ds][krg.ID] = i
if _, ok := b.shardKr[krg.ShardID]; !ok {
b.shardKr[krg.ShardID] = make([]string, 0)
}
b.shardKr[krg.ShardID] = append(b.shardKr[krg.ShardID], krg.ID)
}
}

return nil
Expand Down
99 changes: 98 additions & 1 deletion test/feature/features/balancer.feature
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Feature: Balancer test
Given cluster is up and running
And host "coordinator2" is stopped
And host "coordinator2" is started

When I execute SQL on host "coordinator"
"""
REGISTER ROUTER r1 ADDRESS regress_router:7000;
Expand Down Expand Up @@ -357,3 +357,100 @@ Feature: Balancer test
"Shard ID":"sh2"
}]
"""

Scenario: balancer works with several distributions
When I execute SQL on host "coordinator"
"""
CREATE DISTRIBUTION ds2 COLUMN TYPES integer;
ALTER DISTRIBUTION ds2 ATTACH RELATION xMove2 DISTRIBUTION KEY w_id;
CREATE KEY RANGE kr1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1;
CREATE KEY RANGE kr2 FROM 100000 ROUTE TO sh2 FOR DISTRIBUTION ds1;
CREATE KEY RANGE kr3 FROM 10 ROUTE TO sh1 FOR DISTRIBUTION ds2;
CREATE KEY RANGE kr4 FROM 10000 ROUTE TO sh2 FOR DISTRIBUTION ds2;
"""
Then command return code should be "0"
When I run SQL on host "router"
"""
CREATE TABLE xMove(w_id INT, s TEXT);
CREATE TABLE xMove2(w_id INT, s TEXT);
"""
Then command return code should be "0"
When I run SQL on host "shard1"
"""
INSERT INTO xMove (w_id, s) SELECT generate_series(0, 99999), 'sample text value';
INSERT INTO xMove2 (w_id, s) SELECT generate_series(10, 9999), 'sample text value';
"""
Then command return code should be "0"
When I run command on host "coordinator" with timeout "60" seconds
"""
/spqr/spqr-balancer --config /spqr/test/feature/conf/balancer_several_moves.yaml > /balancer.log
"""
Then command return code should be "0"
When I run SQL on host "shard2"
"""
SELECT count(*) FROM xMove2
"""
Then command return code should be "0"
And SQL result should match regexp
"""
0
"""
When I run SQL on host "shard1"
"""
SELECT count(*) FROM xMove2
"""
Then command return code should be "0"
And SQL result should match regexp
"""
9990
"""
When I run SQL on host "shard2"
"""
SELECT count(*) FROM xMove
"""
Then command return code should be "0"
And SQL result should match regexp
"""
30
"""
When I run SQL on host "shard1"
"""
SELECT count(*) FROM xMove
"""
Then command return code should be "0"
And SQL result should match regexp
"""
99970
"""
When I run SQL on host "coordinator"
"""
SHOW key_ranges;
"""
Then command return code should be "0"
And SQL result should match json
"""
[{
"Key range ID":"kr1",
"Distribution ID":"ds1",
"Lower bound":"0",
"Shard ID":"sh1"
},
{
"Key range ID":"kr2",
"Distribution ID":"ds1",
"Lower bound":"99970",
"Shard ID":"sh2"
},
{
"Key range ID":"kr3",
"Distribution ID":"ds2",
"Lower bound":"10",
"Shard ID":"sh1"
},
{
"Key range ID":"kr4",
"Distribution ID":"ds2",
"Lower bound":"10000",
"Shard ID":"sh2"
}]
"""
Loading