Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

*: support cancel query like 'select * from information_schema.tables' (#57766) #57806

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func CheckPlacementPolicyNotInUseFromMeta(t *meta.Mutator, policy *model.PolicyI
return dbterror.ErrPlacementPolicyInUse.GenWithStackByArgs(policy.Name)
}

tables, err := t.ListTables(dbInfo.ID)
tables, err := t.ListTables(context.Background(), dbInfo.ID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ func testGetPolicyDependency(storage kv.Storage, name string) []int64 {
return err
}
for _, db := range dbs {
tbls, err := t.ListTables(db.ID)
tbls, err := t.ListTables(context.Background(), db.ID)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (w *worker) onDropSchema(jobCtx *jobContext, job *model.Job) (ver int64, _
return ver, errors.Trace(err)
}
var tables []*model.TableInfo
tables, err = metaMut.ListTables(job.SchemaID)
tables, err = metaMut.ListTables(jobCtx.stepCtx, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -205,7 +205,7 @@ func (w *worker) onDropSchema(jobCtx *jobContext, job *model.Job) (ver int64, _
case model.StateDeleteOnly:
dbInfo.State = model.StateNone
var tables []*model.TableInfo
tables, err = metaMut.ListTables(job.SchemaID)
tables, err = metaMut.ListTables(jobCtx.stepCtx, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func (w *worker) onRecoverSchema(jobCtx *jobContext, job *model.Job) (ver int64,
sid := recoverSchemaInfo.DBInfo.ID
snap := w.store.GetSnapshot(kv.NewVersion(recoverSchemaInfo.SnapshotTS))
snapMeta := meta.NewReader(snap)
tables, err2 := snapMeta.ListTables(sid)
tables, err2 := snapMeta.ListTables(jobCtx.stepCtx, sid)
if err2 != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err2)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ func checkConstraintNamesNotExists(t *meta.Mutator, schemaID int64, constraints
if len(constraints) == 0 {
return nil
}
tbInfos, err := t.ListTables(schemaID)
tbInfos, err := t.ListTables(context.Background(), schemaID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func (*Domain) fetchSchemasWithTables(ctx context.Context, schemas []*model.DBIn
di.TableName2ID = name2ID
tables = specialTableInfos
} else {
tables, err = m.ListTables(di.ID)
tables, err = m.ListTables(ctx, di.ID)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,9 @@ func (e *memtableRetriever) setDataFromTables(ctx context.Context, sctx sessionc
if err != nil {
return errors.Trace(err)
}
if ctx.Err() != nil {
return errors.Trace(ctx.Err())
}
}
e.rows = rows
return nil
Expand Down Expand Up @@ -1191,6 +1194,10 @@ func (e *memtableRetriever) setDataFromPartitions(ctx context.Context, sctx sess
}
createTime := types.NewTime(types.FromGoTime(table.GetUpdateTime()), createTimeTp, types.DefaultFsp)

if ctx.Err() != nil {
return errors.Trace(ctx.Err())
}

var rowCount, dataLength, indexLength uint64
if useStatsCache {
if table.GetPartitionInfo() == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ retry:
// the meta region leader is slow.
snapshot.SetOption(kv.TiKVClientReadTimeout, uint64(3000)) // 3000ms.
m := meta.NewReader(snapshot)
tblInfos, err := m.ListTables(dbInfo.ID)
tblInfos, err := m.ListTables(ctx, dbInfo.ID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
return nil, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package meta

import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -1131,7 +1132,7 @@ func GetTableInfoWithAttributes(m *Mutator, dbID int64, filterAttrs ...string) (
}

// ListTables shows all tables in database.
func (m *Mutator) ListTables(dbID int64) ([]*model.TableInfo, error) {
func (m *Mutator) ListTables(ctx context.Context, dbID int64) ([]*model.TableInfo, error) {
res, err := m.GetMetasByDBID(dbID)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -1144,6 +1145,9 @@ func (m *Mutator) ListTables(dbID int64) ([]*model.TableInfo, error) {
if !strings.HasPrefix(tableKey, mTablePrefix) {
continue
}
if ctx.Err() != nil {
return nil, errors.Trace(ctx.Err())
}

tbInfo := &model.TableInfo{}
err = json.Unmarshal(r.Value, tbInfo)
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestMeta(t *testing.T) {
tableNames, err := m.ListSimpleTables(1)
require.NoError(t, err)
require.Equal(t, []*model.TableNameInfo{tblName, tblName2}, tableNames)
tables, err := m.ListTables(1)
tables, err := m.ListTables(context.Background(), 1)
require.NoError(t, err)
require.Equal(t, []*model.TableInfo{tbInfo, tbInfo2}, tables)
{
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestMeta(t *testing.T) {
tableNames, err = m.ListSimpleTables(1)
require.NoError(t, err)
require.Equal(t, []*model.TableNameInfo{tblName}, tableNames)
tables, err = m.ListTables(1)
tables, err = m.ListTables(context.Background(), 1)
require.NoError(t, err)
require.Equal(t, []*model.TableInfo{tbInfo}, tables)
{
Expand Down
4 changes: 3 additions & 1 deletion pkg/meta/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package meta

import (
"context"

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/structure"
Expand All @@ -25,7 +27,7 @@ type Reader interface {
GetDatabase(dbID int64) (*model.DBInfo, error)
ListDatabases() ([]*model.DBInfo, error)
GetTable(dbID int64, tableID int64) (*model.TableInfo, error)
ListTables(dbID int64) ([]*model.TableInfo, error)
ListTables(ctx context.Context, dbID int64) ([]*model.TableInfo, error)
ListSimpleTables(dbID int64) ([]*model.TableNameInfo, error)
IterTables(dbID int64, fn func(info *model.TableInfo) error) error
GetAutoIDAccessors(dbID, tableID int64) AutoIDAccessors
Expand Down
3 changes: 3 additions & 0 deletions pkg/planner/core/memtable_infoschema_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,9 @@ func listTablesForEachSchema(
if err != nil {
return nil, nil, errors.Trace(err)
}
if ctx.Err() != nil {
return nil, nil, errors.Trace(err)
}
tables = filterSchemaObjectByRegexp(e, ec.table, tables, extractStrTableInfo)
for _, t := range tables {
schemaSlice = append(schemaSlice, s)
Expand Down