Skip to content

Commit e7afbb9

Browse files
authored
Fix working with multiple distributions in balancer (#572)
* Added test for many distributions to balancer feature tests * Fixed working with many distributions in balancer * Fix feature tests
1 parent 35bca1b commit e7afbb9

File tree

2 files changed

+157
-41
lines changed

2 files changed

+157
-41
lines changed

balancer/provider/balancer.go

+59-40
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ type BalancerImpl struct {
2222
coordinatorConn *grpc.ClientConn
2323
threshold []float64
2424

25-
keyRanges []*kr.KeyRange
26-
krIdx map[string]int
27-
shardKr map[string][]int
25+
dsToKeyRanges map[string][]*kr.KeyRange
26+
dsToKrIdx map[string]map[string]int
27+
shardKr map[string][]string
28+
krToDs map[string]string
2829
}
2930

3031
func NewBalancer() (*BalancerImpl, error) {
@@ -41,9 +42,10 @@ func NewBalancer() (*BalancerImpl, error) {
4142
return &BalancerImpl{
4243
coordinatorConn: conn,
4344
threshold: threshold,
44-
keyRanges: []*kr.KeyRange{},
45-
krIdx: map[string]int{},
46-
shardKr: map[string][]int{},
45+
dsToKeyRanges: map[string][]*kr.KeyRange{},
46+
dsToKrIdx: map[string]map[string]int{},
47+
shardKr: map[string][]string{},
48+
krToDs: map[string]string{},
4749
}, nil
4850
}
4951

@@ -252,7 +254,7 @@ func (b *BalancerImpl) getStatsByKeyRange(ctx context.Context, shard *ShardMetri
252254
if err = rows.Scan(&krId, &cpu); err != nil {
253255
return err
254256
}
255-
if _, ok := b.krIdx[krId]; !ok {
257+
if _, ok := b.krToDs[krId]; !ok {
256258
continue
257259
}
258260
if _, ok := shard.MetricsKR[krId]; !ok {
@@ -267,8 +269,10 @@ func (b *BalancerImpl) getStatsByKeyRange(ctx context.Context, shard *ShardMetri
267269
return err
268270
}
269271

270-
for _, i := range b.shardKr[shard.ShardId] {
271-
krg := b.keyRanges[i]
272+
for _, krId := range b.shardKr[shard.ShardId] {
273+
ds := b.krToDs[krId]
274+
i := b.dsToKrIdx[ds][krId]
275+
krg := b.dsToKeyRanges[ds][i]
272276
if krg.ShardID != shard.ShardId {
273277
continue
274278
}
@@ -284,8 +288,8 @@ func (b *BalancerImpl) getStatsByKeyRange(ctx context.Context, shard *ShardMetri
284288
WHERE %s;
285289
`
286290
var nextKR *kr.KeyRange
287-
if i < len(b.keyRanges)-1 {
288-
nextKR = b.keyRanges[i+1]
291+
if i < len(b.dsToKeyRanges[ds])-1 {
292+
nextKR = b.dsToKeyRanges[ds][i+1]
289293
}
290294
condition, err := b.getKRCondition(rel, krg, nextKR, "t")
291295
if err != nil {
@@ -417,15 +421,16 @@ func (b *BalancerImpl) maxFitOnShard(krMetrics []float64, krKeyCount int64, shar
417421

418422
func (b *BalancerImpl) getAdjacentShards(krId string) map[string]struct{} {
419423
res := make(map[string]struct{}, 0)
420-
krIdx := b.krIdx[krId]
424+
ds := b.krToDs[krId]
425+
krIdx := b.dsToKrIdx[ds][krId]
421426
if krIdx != 0 {
422-
res[b.keyRanges[krIdx-1].ShardID] = struct{}{}
427+
res[b.dsToKeyRanges[ds][krIdx-1].ShardID] = struct{}{}
423428
}
424-
if krIdx < len(b.keyRanges)-1 {
425-
res[b.keyRanges[krIdx+1].ShardID] = struct{}{}
429+
if krIdx < len(b.dsToKeyRanges)-1 {
430+
res[b.dsToKeyRanges[ds][krIdx+1].ShardID] = struct{}{}
426431
}
427432
// do not include current shard
428-
delete(res, b.keyRanges[krIdx].ShardID)
433+
delete(res, b.dsToKeyRanges[ds][krIdx].ShardID)
429434
return res
430435
}
431436

@@ -463,14 +468,18 @@ func (b *BalancerImpl) getTasks(ctx context.Context, shardFrom *ShardMetrics, kr
463468
Int("key_count", keyCount).
464469
Msg("generating move tasks")
465470
// Move from beginning or the end of key range
466-
krInd := b.krIdx[krId]
471+
if _, ok := b.krToDs[krId]; !ok {
472+
return nil, fmt.Errorf("unknown key range id \"%s\"", krId)
473+
}
474+
ds := b.krToDs[krId]
475+
krInd := b.dsToKrIdx[ds][krId]
467476
krIdTo := ""
468477
var join tasks.JoinType = tasks.JoinNone
469-
if krInd < len(b.keyRanges)-1 && b.keyRanges[krInd+1].ShardID == shardToId {
470-
krIdTo = b.keyRanges[krInd+1].ID
478+
if krInd < len(b.dsToKeyRanges[ds])-1 && b.dsToKeyRanges[ds][krInd+1].ShardID == shardToId {
479+
krIdTo = b.dsToKeyRanges[ds][krInd+1].ID
471480
join = tasks.JoinRight
472-
} else if krInd > 0 && b.keyRanges[krInd-1].ShardID == shardToId {
473-
krIdTo = b.keyRanges[krInd-1].ID
481+
} else if krInd > 0 && b.dsToKeyRanges[ds][krInd-1].ShardID == shardToId {
482+
krIdTo = b.dsToKeyRanges[ds][krInd-1].ID
474483
join = tasks.JoinLeft
475484
}
476485

@@ -491,11 +500,8 @@ func (b *BalancerImpl) getTasks(ctx context.Context, shardFrom *ShardMetrics, kr
491500
maxCount = count
492501
}
493502
}
494-
if _, ok := b.krIdx[krId]; !ok {
495-
return nil, fmt.Errorf("unknown key range id \"%s\"", krId)
496-
}
497503
var rel *distributions.DistributedRelation = nil
498-
allRels, err := b.getKRRelations(ctx, b.keyRanges[b.krIdx[krId]])
504+
allRels, err := b.getKRRelations(ctx, b.dsToKeyRanges[ds][krInd])
499505
if err != nil {
500506
return nil, err
501507
}
@@ -664,22 +670,35 @@ func (b *BalancerImpl) updateKeyRanges(ctx context.Context) error {
664670
if err != nil {
665671
return err
666672
}
667-
keyRanges := make([]*kr.KeyRange, len(keyRangesProto.KeyRangesInfo))
668-
for i, krProto := range keyRangesProto.KeyRangesInfo {
669-
keyRanges[i] = kr.KeyRangeFromProto(krProto)
673+
keyRanges := make(map[string][]*kr.KeyRange)
674+
for _, krProto := range keyRangesProto.KeyRangesInfo {
675+
if _, ok := keyRanges[krProto.DistributionId]; !ok {
676+
keyRanges[krProto.DistributionId] = make([]*kr.KeyRange, 0)
677+
}
678+
keyRanges[krProto.DistributionId] = append(keyRanges[krProto.DistributionId], kr.KeyRangeFromProto(krProto))
670679
}
671-
sort.Slice(keyRanges, func(i, j int) bool {
672-
return kr.CmpRangesLess(keyRanges[i].LowerBound, keyRanges[j].LowerBound)
673-
})
674-
b.keyRanges = keyRanges
675-
b.krIdx = make(map[string]int)
676-
b.shardKr = make(map[string][]int)
677-
for i, krg := range b.keyRanges {
678-
b.krIdx[krg.ID] = i
679-
if _, ok := b.shardKr[krg.ShardID]; !ok {
680-
b.shardKr[krg.ShardID] = make([]int, 0)
681-
}
682-
b.shardKr[krg.ShardID] = append(b.shardKr[krg.ShardID], i)
680+
for _, krs := range keyRanges {
681+
sort.Slice(krs, func(i, j int) bool {
682+
return kr.CmpRangesLess(krs[i].LowerBound, krs[j].LowerBound)
683+
})
684+
}
685+
686+
b.dsToKeyRanges = keyRanges
687+
b.dsToKrIdx = make(map[string]map[string]int)
688+
b.shardKr = make(map[string][]string)
689+
b.krToDs = make(map[string]string)
690+
for ds, krs := range b.dsToKeyRanges {
691+
for i, krg := range krs {
692+
b.krToDs[krg.ID] = ds
693+
if _, ok := b.dsToKrIdx[ds]; !ok {
694+
b.dsToKrIdx[ds] = make(map[string]int)
695+
}
696+
b.dsToKrIdx[ds][krg.ID] = i
697+
if _, ok := b.shardKr[krg.ShardID]; !ok {
698+
b.shardKr[krg.ShardID] = make([]string, 0)
699+
}
700+
b.shardKr[krg.ShardID] = append(b.shardKr[krg.ShardID], krg.ID)
701+
}
683702
}
684703

685704
return nil

test/feature/features/balancer.feature

+98-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Feature: Balancer test
66
Given cluster is up and running
77
And host "coordinator2" is stopped
88
And host "coordinator2" is started
9-
9+
1010
When I execute SQL on host "coordinator"
1111
"""
1212
REGISTER ROUTER r1 ADDRESS regress_router:7000;
@@ -357,3 +357,100 @@ Feature: Balancer test
357357
"Shard ID":"sh2"
358358
}]
359359
"""
360+
361+
Scenario: balancer works with several distributions
362+
When I execute SQL on host "coordinator"
363+
"""
364+
CREATE DISTRIBUTION ds2 COLUMN TYPES integer;
365+
ALTER DISTRIBUTION ds2 ATTACH RELATION xMove2 DISTRIBUTION KEY w_id;
366+
CREATE KEY RANGE kr1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1;
367+
CREATE KEY RANGE kr2 FROM 100000 ROUTE TO sh2 FOR DISTRIBUTION ds1;
368+
CREATE KEY RANGE kr3 FROM 10 ROUTE TO sh1 FOR DISTRIBUTION ds2;
369+
CREATE KEY RANGE kr4 FROM 10000 ROUTE TO sh2 FOR DISTRIBUTION ds2;
370+
"""
371+
Then command return code should be "0"
372+
When I run SQL on host "router"
373+
"""
374+
CREATE TABLE xMove(w_id INT, s TEXT);
375+
CREATE TABLE xMove2(w_id INT, s TEXT);
376+
"""
377+
Then command return code should be "0"
378+
When I run SQL on host "shard1"
379+
"""
380+
INSERT INTO xMove (w_id, s) SELECT generate_series(0, 99999), 'sample text value';
381+
INSERT INTO xMove2 (w_id, s) SELECT generate_series(10, 9999), 'sample text value';
382+
"""
383+
Then command return code should be "0"
384+
When I run command on host "coordinator" with timeout "60" seconds
385+
"""
386+
/spqr/spqr-balancer --config /spqr/test/feature/conf/balancer_several_moves.yaml > /balancer.log
387+
"""
388+
Then command return code should be "0"
389+
When I run SQL on host "shard2"
390+
"""
391+
SELECT count(*) FROM xMove2
392+
"""
393+
Then command return code should be "0"
394+
And SQL result should match regexp
395+
"""
396+
0
397+
"""
398+
When I run SQL on host "shard1"
399+
"""
400+
SELECT count(*) FROM xMove2
401+
"""
402+
Then command return code should be "0"
403+
And SQL result should match regexp
404+
"""
405+
9990
406+
"""
407+
When I run SQL on host "shard2"
408+
"""
409+
SELECT count(*) FROM xMove
410+
"""
411+
Then command return code should be "0"
412+
And SQL result should match regexp
413+
"""
414+
30
415+
"""
416+
When I run SQL on host "shard1"
417+
"""
418+
SELECT count(*) FROM xMove
419+
"""
420+
Then command return code should be "0"
421+
And SQL result should match regexp
422+
"""
423+
99970
424+
"""
425+
When I run SQL on host "coordinator"
426+
"""
427+
SHOW key_ranges;
428+
"""
429+
Then command return code should be "0"
430+
And SQL result should match json
431+
"""
432+
[{
433+
"Key range ID":"kr1",
434+
"Distribution ID":"ds1",
435+
"Lower bound":"0",
436+
"Shard ID":"sh1"
437+
},
438+
{
439+
"Key range ID":"kr2",
440+
"Distribution ID":"ds1",
441+
"Lower bound":"99970",
442+
"Shard ID":"sh2"
443+
},
444+
{
445+
"Key range ID":"kr3",
446+
"Distribution ID":"ds2",
447+
"Lower bound":"10",
448+
"Shard ID":"sh1"
449+
},
450+
{
451+
"Key range ID":"kr4",
452+
"Distribution ID":"ds2",
453+
"Lower bound":"10000",
454+
"Shard ID":"sh2"
455+
}]
456+
"""

0 commit comments

Comments
 (0)