Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

dxf: refactor error handling in task executor #57837

Merged
merged 8 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/disttask/framework/integrationtests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ go_test(
],
flaky = True,
race = "off",
shard_count = 22,
shard_count = 21,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
50 changes: 25 additions & 25 deletions pkg/disttask/framework/integrationtests/framework_ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,25 @@ func submitTaskAndCheckSuccessForHA(ctx context.Context, t *testing.T, taskKey s
}

func TestHANodeRandomShutdown(t *testing.T) {
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBShutdown", "return()")
c := testutil.NewDXFContextWithRandomNodes(t, 4, 15)
registerExampleTask(t, c.MockCtrl, testutil.GetMockHATestSchedulerExt(c.MockCtrl), c.TestContext, nil)

// we keep [1, 10] nodes running, as we only have 10 subtask at stepOne
keepCount := int(math.Min(float64(c.NodeCount()-1), float64(c.Rand.Intn(10)+1)))
nodeNeedDown := c.GetRandNodeIDs(c.NodeCount() - keepCount)
t.Logf("started %d nodes, and we keep %d nodes, nodes that need shutdown: %v", c.NodeCount(), keepCount, nodeNeedDown)
taskexecutor.MockTiDBDown = func(execID string, _ *proto.TaskBase) bool {
if _, ok := nodeNeedDown[execID]; ok {
c.AsyncShutdown(execID)
return true
}
return false
}
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBShutdown",
func(e taskexecutor.TaskExecutor, execID string, _ *proto.TaskBase) {
if _, ok := nodeNeedDown[execID]; ok {
c.AsyncShutdown(execID)
e.Cancel()
}
},
)
submitTaskAndCheckSuccessForHA(c.Ctx, t, "😊", c.TestContext)
}

func TestHARandomShutdownInDifferentStep(t *testing.T) {
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBShutdown", "return()")
c := testutil.NewDXFContextWithRandomNodes(t, 6, 15)

registerExampleTask(t, c.MockCtrl, testutil.GetMockHATestSchedulerExt(c.MockCtrl), c.TestContext, nil)
Expand All @@ -64,22 +63,23 @@ func TestHARandomShutdownInDifferentStep(t *testing.T) {
nodeNeedDownAtStepTwo := c.GetRandNodeIDs(c.NodeCount()/2 - 1)
t.Logf("started %d nodes, shutdown nodes at step 1: %v, shutdown nodes at step 2: %v",
c.NodeCount(), nodeNeedDownAtStepOne, nodeNeedDownAtStepTwo)
taskexecutor.MockTiDBDown = func(execID string, task *proto.TaskBase) bool {
var targetNodes map[string]struct{}
switch task.Step {
case proto.StepOne:
targetNodes = nodeNeedDownAtStepOne
case proto.StepTwo:
targetNodes = nodeNeedDownAtStepTwo
default:
return false
}
if _, ok := targetNodes[execID]; ok {
c.AsyncShutdown(execID)
return true
}
return false
}
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBShutdown",
func(e taskexecutor.TaskExecutor, execID string, task *proto.TaskBase) {
var targetNodes map[string]struct{}
switch task.Step {
case proto.StepOne:
targetNodes = nodeNeedDownAtStepOne
case proto.StepTwo:
targetNodes = nodeNeedDownAtStepTwo
default:
return
}
if _, ok := targetNodes[execID]; ok {
c.AsyncShutdown(execID)
e.Cancel()
}
},
)
submitTaskAndCheckSuccessForHA(c.Ctx, t, "😊", c.TestContext)
}

Expand Down
16 changes: 0 additions & 16 deletions pkg/disttask/framework/integrationtests/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,22 +234,6 @@ func TestGC(t *testing.T) {
}, 10*time.Second, 500*time.Millisecond)
}

func TestFrameworkSubtaskFinishedCancel(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this case test what we do when a subtask is cancelled by other routines by CancelRunningSubtask, it's actually duplicate with TestFrameworkRunSubtaskCancelOrFailed, so merge them

c := testutil.NewTestDXFContext(t, 3, 16, true)

registerExampleTask(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterOnFinishedCalled",
func(e *taskexecutor.BaseTaskExecutor) {
if counter.Add(1) == 1 {
e.CancelRunningSubtask()
}
},
)
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}

func TestFrameworkRunSubtaskCancelOrFailed(t *testing.T) {
c := testutil.NewTestDXFContext(t, 3, 16, true)

Expand Down
5 changes: 1 addition & 4 deletions pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ go_library(
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/taskexecutor/execute",
"//pkg/lightning/common",
"//pkg/lightning/log",
"//pkg/metrics",
"//pkg/sessionctx/variable",
Expand Down Expand Up @@ -51,7 +50,7 @@ go_test(
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 17,
shard_count = 16,
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
Expand All @@ -74,7 +73,5 @@ go_test(
"@org_golang_google_grpc//status",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zaptest/observer",
],
)
12 changes: 9 additions & 3 deletions pkg/disttask/framework/taskexecutor/execute/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ type StepExecutor interface {
StepExecFrameworkInfo

// Init is used to initialize the environment.
// if failed, task executor will retry later.
// task executor will retry if the returned error is retryable, see
// IsRetryableError in TaskExecutor.Extension, else framework will mark random
// subtask as failed, to trigger task failure.
Init(context.Context) error
// RunSubtask is used to run the subtask.
RunSubtask(ctx context.Context, subtask *proto.Subtask) error
Expand All @@ -42,9 +44,13 @@ type StepExecutor interface {
RealtimeSummary() *SubtaskSummary

// OnFinished is used to handle the subtask when it is finished.
// The subtask meta can be updated in place.
// The subtask meta can be updated in place. only when OnFinished returns no
// err, a subtask can be marked as 'success', if it returns error, the subtask
// might be completely rerun, so don't put code that's prone to error in it.
OnFinished(ctx context.Context, subtask *proto.Subtask) error
// Cleanup is used to clean up the environment.
// Cleanup is used to clean up the environment for this step.
// the returned error will not affect task/subtask state, it's only logged,
// so don't put code that's prone to error in it.
Cleanup(context.Context) error
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ type Extension interface {
// the Executor will mark the subtask as failed.
IsIdempotent(subtask *proto.Subtask) bool
// GetStepExecutor returns the subtask executor for the subtask.
// Note:
// 1. summary is the summary manager of all subtask of the same type now.
// 2. should not retry the error from it.
// Note, the error returned is fatal, framework will fail the task directly.
GetStepExecutor(task *proto.Task) (execute.StepExecutor, error)
// IsRetryableError returns whether the error is transient.
// When error is transient, the framework won't mark subtasks as failed,
Expand Down
Loading