diff --git a/pkg/planner/core/casetest/instanceplancache/BUILD.bazel b/pkg/planner/core/casetest/instanceplancache/BUILD.bazel index 7f76bc5536fa2..bcb5b8cb919b7 100644 --- a/pkg/planner/core/casetest/instanceplancache/BUILD.bazel +++ b/pkg/planner/core/casetest/instanceplancache/BUILD.bazel @@ -12,7 +12,7 @@ go_test( "others_test.go", ], flaky = True, - shard_count = 32, + shard_count = 34, deps = [ "//pkg/parser/auth", "//pkg/testkit", diff --git a/pkg/planner/core/casetest/instanceplancache/concurrency_test.go b/pkg/planner/core/casetest/instanceplancache/concurrency_test.go index 59ffb18478e78..5c10ed613a1ec 100644 --- a/pkg/planner/core/casetest/instanceplancache/concurrency_test.go +++ b/pkg/planner/core/casetest/instanceplancache/concurrency_test.go @@ -335,6 +335,75 @@ func TestInstancePlanCacheTableIndexScan(t *testing.T) { wg.Wait() } +func TestInstancePlanCacheConcurrencyPointPartitioning(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(`use test`) + tk.MustExec(`create table t1 (a int, primary key(a)) partition by hash(a) partitions 10`) + tk.MustExec(`create table t2 (a int, primary key(a)) partition by range(a) ( + partition p0 values less than (10), + partition p1 values less than (20), + partition p2 values less than (30), + partition p3 values less than (40), + partition p4 values less than (50), + partition p5 values less than (60), + partition p6 values less than (70), + partition p7 values less than (80), + partition p8 values less than (90), + partition p9 values less than (100))`) + tk.MustExec(`set global tidb_enable_instance_plan_cache=1`) + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert into t1 values (%v)", i)) + tk.MustExec(fmt.Sprintf("insert into t2 values (%v)", i)) + } + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + tki := testkit.NewTestKit(t, store) + tki.MustExec(`use test`) + for k := 0; k < 100; k++ { + tName := fmt.Sprintf("t%v", rand.Intn(2)+1) + tki.MustExec(fmt.Sprintf("prepare st from 'select * from %v where a=?'", tName)) + a := rand.Intn(100) + tki.MustExec("set @a = ?", a) + tki.MustQuery("execute st using @a").Check(testkit.Rows(fmt.Sprintf("%v", a))) + } + }() + } + wg.Wait() +} + +func TestInstancePlanCacheConcurrencyPointMultipleColPKNoTxn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(`use test`) + tk.MustExec(`create table t (a int, b int, primary key(a, b))`) + tk.MustExec(`set global tidb_enable_instance_plan_cache=1`) + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%v, %v)", i, i)) + } + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + tki := testkit.NewTestKit(t, store) + tki.MustExec(`use test`) + tki.MustExec(`prepare st from 'select * from t where a=? and b=?'`) + for k := 0; k < 100; k++ { + a := rand.Intn(100) + tki.MustExec("set @a = ?, @b = ?", a, a) + tki.MustQuery("execute st using @a, @b").Check(testkit.Rows(fmt.Sprintf("%v %v", a, a))) + } + }() + } + wg.Wait() +} + func TestInstancePlanCacheConcurrencyPointNoTxn(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/pkg/planner/core/plan_cache.go b/pkg/planner/core/plan_cache.go index b2126ae22ed34..731ac256d28e6 100644 --- a/pkg/planner/core/plan_cache.go +++ b/pkg/planner/core/plan_cache.go @@ -226,6 +226,9 @@ func GetPlanFromPlanCache(ctx context.Context, sctx sessionctx.Context, if stmtCtx.UseCache() { plan, outputCols, stmtHints, hit := lookupPlanCache(ctx, sctx, cacheKey, paramTypes) skipPrivCheck := stmt.PointGet.Executor != nil // this case is specially handled + if hit && instancePlanCacheEnabled(ctx) { + plan, hit = clonePlanForInstancePlanCache(ctx, sctx, stmt, plan) + } if hit { if plan, ok, err := adjustCachedPlan(ctx, sctx, plan, stmtHints, isNonPrepared, skipPrivCheck, binding, is, stmt); err != nil || ok { return plan, outputCols, err @@ -236,6 +239,29 @@ func GetPlanFromPlanCache(ctx context.Context, sctx sessionctx.Context, return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, paramTypes) } +func clonePlanForInstancePlanCache(ctx context.Context, sctx sessionctx.Context, + stmt *PlanCacheStmt, plan base.Plan) (clonedPlan base.Plan, ok bool) { + // TODO: add metrics to record the time cost of this clone operation. + fastPoint := stmt.PointGet.Executor != nil // this case is specially handled + pointPlan, isPoint := plan.(*PointGetPlan) + if fastPoint && isPoint { // special optimization for fast point plans + if stmt.PointGet.FastPlan == nil { + stmt.PointGet.FastPlan = new(PointGetPlan) + } + FastClonePointGetForPlanCache(sctx.GetPlanCtx(), pointPlan, stmt.PointGet.FastPlan) + clonedPlan = stmt.PointGet.FastPlan + } else { + clonedPlan, ok = plan.CloneForPlanCache(sctx.GetPlanCtx()) + if !ok { // clone the value to solve concurrency problem + return nil, false + } + } + if intest.InTest && ctx.Value(PlanCacheKeyTestClone{}) != nil { + ctx.Value(PlanCacheKeyTestClone{}).(func(plan, cloned base.Plan))(plan, clonedPlan) + } + return clonedPlan, true +} + func instancePlanCacheEnabled(ctx context.Context) bool { if intest.InTest && ctx.Value(PlanCacheKeyEnableInstancePlanCache{}) != nil { return true @@ -252,25 +278,17 @@ func lookupPlanCache(ctx context.Context, sctx sessionctx.Context, cacheKey stri core_metrics.GetPlanCacheLookupDuration(useInstanceCache).Observe(time.Since(begin).Seconds()) } }(time.Now()) + var v any if useInstanceCache { - if v, hit := domain.GetDomain(sctx).GetInstancePlanCache().Get(cacheKey, paramTypes); hit { - pcv := v.(*PlanCacheValue) - clonedPlan, ok := pcv.Plan.CloneForPlanCache(sctx.GetPlanCtx()) - if !ok { // clone the value to solve concurrency problem - return nil, nil, nil, false - } - if intest.InTest && ctx.Value(PlanCacheKeyTestClone{}) != nil { - ctx.Value(PlanCacheKeyTestClone{}).(func(plan, cloned base.Plan))(pcv.Plan, clonedPlan) - } - return clonedPlan, pcv.OutputColumns, pcv.stmtHints, true - } + v, hit = domain.GetDomain(sctx).GetInstancePlanCache().Get(cacheKey, paramTypes) } else { - if v, hit := sctx.GetSessionPlanCache().Get(cacheKey, paramTypes); hit { - pcv := v.(*PlanCacheValue) - return pcv.Plan, pcv.OutputColumns, pcv.stmtHints, true - } + v, hit = sctx.GetSessionPlanCache().Get(cacheKey, paramTypes) + } + if !hit { + return nil, nil, nil, false } - return nil, nil, nil, false + pcv := v.(*PlanCacheValue) + return pcv.Plan, pcv.OutputColumns, pcv.stmtHints, true } func adjustCachedPlan(ctx context.Context, sctx sessionctx.Context, diff --git a/pkg/planner/core/plan_cache_rebuild_test.go b/pkg/planner/core/plan_cache_rebuild_test.go index e356d382fd885..f502d1d863b7d 100644 --- a/pkg/planner/core/plan_cache_rebuild_test.go +++ b/pkg/planner/core/plan_cache_rebuild_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math/rand" + "os" "reflect" "strings" "testing" @@ -25,8 +26,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/planner" "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/planner/core/base" + "github.com/pingcap/tidb/pkg/planner/core/resolve" "github.com/pingcap/tidb/pkg/planner/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" @@ -441,3 +445,86 @@ type visit struct { a2 unsafe.Pointer typ reflect.Type } + +func TestFastPointGetClone(t *testing.T) { + codeFile := "plan_clone_utils.go" + codeData, err := os.ReadFile(codeFile) + require.NoError(t, err) + codeLines := strings.Split(string(codeData), "\n") + beginPrefix := `func FastClonePointGetForPlanCache(` + endPrefix := `}` + beginIdx, endIdx := -1, -1 + for i, line := range codeLines { + if strings.HasPrefix(line, beginPrefix) { + beginIdx = i + } + if beginIdx != -1 && strings.HasPrefix(line, endPrefix) { + endIdx = i + break + } + } + cloneFuncCode := strings.Join(codeLines[beginIdx:endIdx+1], "\n") + fieldNoNeedToClone := map[string]struct{}{ + "cost": {}, + "planCostInit": {}, + "planCost": {}, + "planCostVer2": {}, + "accessCols": {}, + } + + pointPlan := reflect.TypeOf(core.PointGetPlan{}) + for i := 0; i < pointPlan.NumField(); i++ { + fieldName := pointPlan.Field(i).Name + if _, ok := fieldNoNeedToClone[fieldName]; ok { + continue + } + assignFieldCode := fmt.Sprintf("%v =", fieldName) + if !strings.Contains(cloneFuncCode, assignFieldCode) { + errMsg := fmt.Sprintf("field %v might not be set in FastClonePointGetForPlanCache correctly", fieldName) + t.Fatal(errMsg) + } + } +} + +func BenchmarkPointGetCloneFast(b *testing.B) { + store, domain := testkit.CreateMockStoreAndDomain(b) + tk := testkit.NewTestKit(b, store) + tk.MustExec(`use test`) + tk.MustExec(`create table t (a int, b int, primary key(a, b))`) + + p := parser.New() + stmt, err := p.ParseOneStmt("select a, b from t where a=1 and b=1", "", "") + require.NoError(b, err) + nodeW := resolve.NewNodeW(stmt) + plan, _, err := planner.Optimize(context.TODO(), tk.Session(), nodeW, domain.InfoSchema()) + require.NoError(b, err) + + b.ResetTimer() + src := plan.(*core.PointGetPlan) + dst := new(core.PointGetPlan) + sctx := tk.Session().GetPlanCtx() + for i := 0; i < b.N; i++ { + core.FastClonePointGetForPlanCache(sctx, src, dst) + } +} + +func BenchmarkPointGetClone(b *testing.B) { + store, domain := testkit.CreateMockStoreAndDomain(b) + tk := testkit.NewTestKit(b, store) + tk.MustExec(`use test`) + tk.MustExec(`create table t (a int, b int, primary key(a, b))`) + + p := parser.New() + stmt, err := p.ParseOneStmt("select a, b from t where a=1 and b=1", "", "") + require.NoError(b, err) + nodeW := resolve.NewNodeW(stmt) + plan, _, err := planner.Optimize(context.TODO(), tk.Session(), nodeW, domain.InfoSchema()) + require.NoError(b, err) + + b.ResetTimer() + src := plan.(*core.PointGetPlan) + sctx := tk.Session().GetPlanCtx() + for i := 0; i < b.N; i++ { + src.CloneForPlanCache(sctx) + } +} diff --git a/pkg/planner/core/plan_cache_utils.go b/pkg/planner/core/plan_cache_utils.go index b599c7c1cac0b..6901f152b323a 100644 --- a/pkg/planner/core/plan_cache_utils.go +++ b/pkg/planner/core/plan_cache_utils.go @@ -523,6 +523,11 @@ type PointGetExecutorCache struct { // Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it. // If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here. Executor any + + // FastPlan is only used for instance plan cache. + // To ensure thread-safe, we have to clone each plan before reusing if using instance plan cache. + // To reduce the memory allocation and increase performance, we cache the FastPlan here. + FastPlan *PointGetPlan } // PlanCacheStmt store prepared ast from PrepareExec and other related fields diff --git a/pkg/planner/core/plan_clone_utils.go b/pkg/planner/core/plan_clone_utils.go index b402c444c3a08..2a3c9646aa998 100644 --- a/pkg/planner/core/plan_clone_utils.go +++ b/pkg/planner/core/plan_clone_utils.go @@ -17,6 +17,7 @@ package core import ( "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/planner/core/base" + "github.com/pingcap/tidb/pkg/types" ) func clonePhysicalPlansForPlanCache(newCtx base.PlanContext, plans []base.PhysicalPlan) ([]base.PhysicalPlan, bool) { @@ -172,3 +173,51 @@ func cloneConstant2DForPlanCache(constants [][]*expression.Constant) [][]*expres } return cloned } + +// FastClonePointGetForPlanCache is a fast path to clone a PointGetPlan for plan cache. +func FastClonePointGetForPlanCache(newCtx base.PlanContext, src, dst *PointGetPlan) *PointGetPlan { + if dst == nil { + dst = new(PointGetPlan) + } + dst.Plan = src.Plan + dst.Plan.SetSCtx(newCtx) + dst.probeParents = src.probeParents + dst.PartitionNames = src.PartitionNames + dst.dbName = src.dbName + dst.schema = src.schema + dst.TblInfo = src.TblInfo + dst.IndexInfo = src.IndexInfo + dst.PartitionIdx = nil // partition prune will be triggered during execution phase + dst.Handle = nil // handle will be set during rebuild phase + if src.HandleConstant == nil { + dst.HandleConstant = nil + } else { + if src.HandleConstant.SafeToShareAcrossSession() { + dst.HandleConstant = src.HandleConstant + } else { + dst.HandleConstant = src.HandleConstant.Clone().(*expression.Constant) + } + } + dst.handleFieldType = src.handleFieldType + dst.HandleColOffset = src.HandleColOffset + if len(dst.IndexValues) < len(src.IndexValues) { // actually set during rebuild phase + dst.IndexValues = make([]types.Datum, len(src.IndexValues)) + } else { + dst.IndexValues = dst.IndexValues[:len(src.IndexValues)] + } + dst.IndexConstants = cloneConstantsForPlanCache(src.IndexConstants, dst.IndexConstants) + dst.ColsFieldType = src.ColsFieldType + dst.IdxCols = cloneColumnsForPlanCache(src.IdxCols, dst.IdxCols) + dst.IdxColLens = src.IdxColLens + dst.AccessConditions = cloneExpressionsForPlanCache(src.AccessConditions, dst.AccessConditions) + dst.UnsignedHandle = src.UnsignedHandle + dst.IsTableDual = src.IsTableDual + dst.Lock = src.Lock + dst.outputNames = src.outputNames + dst.LockWaitTime = src.LockWaitTime + dst.Columns = src.Columns + + // remaining fields are unnecessary to clone: + // cost, planCostInit, planCost, planCostVer2, accessCols + return dst +} diff --git a/pkg/planner/core/point_get_plan.go b/pkg/planner/core/point_get_plan.go index 5599d7bd72139..4cc0d5f0d14bd 100644 --- a/pkg/planner/core/point_get_plan.go +++ b/pkg/planner/core/point_get_plan.go @@ -105,14 +105,16 @@ type PointGetPlan struct { outputNames []*types.FieldName `plan-cache-clone:"shallow"` LockWaitTime int64 Columns []*model.ColumnInfo `plan-cache-clone:"shallow"` - cost float64 // required by cost model + cost float64 planCostInit bool planCost float64 planCostVer2 costusage.CostVer2 `plan-cache-clone:"shallow"` // accessCols represents actual columns the PointGet will access, which are used to calculate row-size accessCols []*expression.Column + + // NOTE: please update FastClonePointGetForPlanCache accordingly if you add new fields here. } // GetEstRowCountForDisplay implements PhysicalPlan interface.