Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys #59884

Merged
merged 2 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/executor/test/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 47,
shard_count = 48,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
56 changes: 56 additions & 0 deletions pkg/executor/test/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2246,3 +2246,59 @@ func TestIssue59703(t *testing.T) {
require.Contains(t, err.Error(), "mock mpp error")
require.Equal(t, mppcoordmanager.InstanceMPPCoordinatorManager.GetCoordCount(), 0)
}

func TestIssue59877(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(1))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, t3")
tk.MustExec("create table t1(id bigint, v1 int)")
tk.MustExec("alter table t1 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t1")
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("create table t2(id bigint unsigned, v1 int)")
tk.MustExec("alter table t2 set tiflash replica 1")
tb = external.GetTableByName(t, tk, "test", "t2")
err = domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("create table t3(id bigint, v1 int)")
tk.MustExec("alter table t3 set tiflash replica 1")
tb = external.GetTableByName(t, tk, "test", "t3")
err = domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)

tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
// unistore does not support later materialization
tk.MustExec("set tidb_opt_enable_late_materialization=0")
tk.MustExec("set @@session.tidb_allow_mpp=ON")
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
tk.MustExec("set tidb_broadcast_join_threshold_size=0")
tk.MustExec("set tidb_broadcast_join_threshold_count=0")
tk.MustExec("set tiflash_fine_grained_shuffle_stream_count=8")
tk.MustExec("set tidb_enforce_mpp=1")
tk.MustQuery("explain format=\"brief\" select /*+ hash_join_build(t3) */ count(*) from t1 straight_join t2 on t1.id = t2.id straight_join t3 on t1.id = t3.id").Check(
testkit.Rows("HashAgg 1.00 root funcs:count(Column#18)->Column#10",
"└─TableReader 1.00 root MppVersion: 3, data:ExchangeSender",
" └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 mpp[tiflash] funcs:count(1)->Column#18",
" └─Projection 15609.38 mpp[tiflash] test.t1.id, Column#14",
" └─HashJoin 15609.38 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t3.id)]",
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#17, collate: binary]",
" │ └─Projection 9990.00 mpp[tiflash] test.t3.id, cast(test.t3.id, decimal(20,0))->Column#17",
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t3.id))",
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t3 keep order:false, stats:pseudo",
" └─Projection(Probe) 12487.50 mpp[tiflash] test.t1.id, Column#14",
" └─HashJoin 12487.50 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t2.id)]",
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#13, collate: binary]",
" │ └─Projection 9990.00 mpp[tiflash] test.t1.id, cast(test.t1.id, decimal(20,0))->Column#13",
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t1.id))",
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo",
" └─ExchangeReceiver(Probe) 9990.00 mpp[tiflash] ",
" └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#14, collate: binary]",
" └─Projection 9990.00 mpp[tiflash] test.t2.id, cast(test.t2.id, decimal(20,0) UNSIGNED)->Column#14",
" └─Selection 9990.00 mpp[tiflash] not(isnull(test.t2.id))",
" └─TableFullScan 10000.00 mpp[tiflash] table:t2 keep order:false, stats:pseudo"))
}
20 changes: 16 additions & 4 deletions pkg/planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ const (
type fineGrainedShuffleHelper struct {
shuffleTarget shuffleTarget
plans []*physicalop.BasePhysicalPlan
joinKeysCount int
joinKeys []*expression.Column
}

type tiflashClusterInfoStatus uint8
Expand All @@ -687,7 +687,7 @@ type tiflashClusterInfo struct {
func (h *fineGrainedShuffleHelper) clear() {
h.shuffleTarget = unknown
h.plans = h.plans[:0]
h.joinKeysCount = 0
h.joinKeys = nil
}

func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *physicalop.BasePhysicalPlan) {
Expand Down Expand Up @@ -888,7 +888,7 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx base.PlanContext,
if len(joinKeys) > 0 { // Not cross join
buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*physicalop.BasePhysicalPlan{}}
buildHelper.plans = append(buildHelper.plans, &x.BasePhysicalPlan)
buildHelper.joinKeysCount = len(joinKeys)
buildHelper.joinKeys = joinKeys
setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo)
} else {
buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*physicalop.BasePhysicalPlan{}}
Expand Down Expand Up @@ -918,7 +918,19 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx base.PlanContext,
}
case joinBuild:
// Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations
if len(x.HashCols) != helper.joinKeysCount {
if len(x.HashCols) != len(helper.joinKeys) {
break
}
// Check the shuffle key should be equal to joinKey, otherwise the shuffle hash code may not be equal to
// actual join hash code due to type cast
applyFlag := true
for i, joinKey := range helper.joinKeys {
if !x.HashCols[i].Col.EqualColumn(joinKey) {
applyFlag = false
break
}
}
if !applyFlag {
break
}
applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result
Expand Down