From d719074d53197453265888a928b474ae6f24f83d Mon Sep 17 00:00:00 2001 From: joey Date: Thu, 14 Oct 2021 16:53:51 +0800 Subject: [PATCH 1/3] modify task wait op, listen executor channel --- .../pipengine/reconciler/reconcile.go | 4 +- .../pipengine/reconciler/taskrun/framework.go | 12 +-- .../reconciler/taskrun/taskop/wait.go | 80 ++++++++----------- .../pipengine/reconciler/taskrun/taskrun.go | 20 ++--- 4 files changed, 49 insertions(+), 67 deletions(-) diff --git a/modules/pipeline/pipengine/reconciler/reconcile.go b/modules/pipeline/pipengine/reconciler/reconcile.go index c96997bee98..4986b920826 100644 --- a/modules/pipeline/pipengine/reconciler/reconcile.go +++ b/modules/pipeline/pipengine/reconciler/reconcile.go @@ -105,10 +105,12 @@ func (r *Reconciler) reconcile(ctx context.Context, pipelineID uint64) error { return } + executorChan := make(chan interface{}) + ctx = context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), executorChan) tr := taskrun.New(ctx, task, ctx.Value(ctxKeyPipelineExitCh).(chan struct{}), ctx.Value(ctxKeyPipelineExitChCancelFunc).(context.CancelFunc), r.TaskThrottler, executor, &p, r.bdl, r.dbClient, r.js, - r.actionAgentSvc, r.extMarketSvc) + r.actionAgentSvc, r.extMarketSvc, executorChan) // tear down task defer func() { diff --git a/modules/pipeline/pipengine/reconciler/taskrun/framework.go b/modules/pipeline/pipengine/reconciler/taskrun/framework.go index 0db3fba6db4..12b0dcfd4f3 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/framework.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/framework.go @@ -27,7 +27,6 @@ import ( "github.com/erda-project/erda/modules/pipeline/conf" "github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/rlog" "github.com/erda-project/erda/modules/pipeline/pkg/errorsx" - "github.com/erda-project/erda/modules/pipeline/spec" "github.com/erda-project/erda/pkg/loop" "github.com/erda-project/erda/pkg/strutil" ) @@ -35,8 +34,7 @@ import ( func (tr *TaskRun) Do(itr TaskOp) error { logrus.Infof("reconciler: pipelineID: %d, task %q begin %s", tr.P.ID, tr.Task.Name, itr.Op()) - executorDoneCh := tr.Ctx.Value(spec.MakeTaskExecutorCtxKey(tr.Task)).(chan interface{}) - o := &Elem{ErrCh: make(chan error), DoneCh: make(chan interface{}), ExitCh: make(chan struct{}), ExecutorDoneCh: executorDoneCh} + o := &Elem{ErrCh: make(chan error), DoneCh: make(chan interface{}), ExitCh: make(chan struct{})} o.TimeoutCh, o.Cancel, o.Timeout = itr.TimeoutConfig() // define op handle func @@ -161,14 +159,6 @@ func (tr *TaskRun) waitOp(itr TaskOp, o *Elem) (result error) { // aop _ = aop.Handle(aop.NewContextForTask(*tr.Task, *tr.P, itr.TuneTriggers().AfterProcessing)) - case data := <-o.ExecutorDoneCh: - tr.LogStep(itr.Op(), fmt.Sprintf("framework accept signal from executor %s, begin do WhenDone", tr.Executor.Name())) - defer tr.LogStep(itr.Op(), fmt.Sprintf("framework accept signal from executor %s, end do WhenDone", tr.Executor.Name())) - if err := itr.WhenDone(data); err != nil { - errs = append(errs, err.Error()) - } - _ = aop.Handle(aop.NewContextForTask(*tr.Task, *tr.P, itr.TuneTriggers().AfterProcessing)) - case err := <-o.ErrCh: logrus.Errorf("reconciler: pipelineID: %d, task %q %s received error (%v)", tr.P.ID, tr.Task.Name, itr.Op(), err) if errorsx.IsNetworkError(err) { diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go index c3183c20de5..8c93259ee41 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go @@ -25,9 +25,7 @@ import ( "github.com/erda-project/erda/modules/pipeline/aop/aoptypes" "github.com/erda-project/erda/modules/pipeline/commonutil/costtimeutil" "github.com/erda-project/erda/modules/pipeline/conf" - "github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/rlog" "github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/taskrun" - "github.com/erda-project/erda/pkg/loop" ) var err4EnableDeclineRatio = errors.New("enable decline ratio") @@ -47,55 +45,45 @@ func (w *wait) TaskRun() *taskrun.TaskRun { } func (w *wait) Processing() (interface{}, error) { - stopWaitCh := make(chan struct{}) - defer func() { - stopWaitCh <- struct{}{} - }() - go func() { + var ( + data interface{} + lastSleepTime float64 = 1 + declineRatio = 1.5 + declineLimit int = 10 + ) + + timer := time.NewTimer(time.Duration(lastSleepTime) * time.Second) + for { select { + case doneData := <-w.ExecutorDoneCh: + logrus.Infof("%s: accept signal from executor %s, data: %v", w.Op(), w.Executor.Name(), doneData) + return doneData, nil case <-w.Ctx.Done(): - w.StopWaitLoop = true - return + return data, nil case <-w.PExitCh: logrus.Warnf("reconciler: pipeline exit, stop wait, pipelineID: %d, taskID: %d", w.P.ID, w.Task.ID) - return - case <-stopWaitCh: - rlog.TDebugf(w.P.ID, w.Task.ID, "stop wait") - close(stopWaitCh) - return - } - }() - - var ( - data interface{} - ) - - err := loop.New(loop.WithDeclineRatio(1.5), loop.WithDeclineLimit(time.Second*10)).Do(func() (abort bool, err error) { - if w.QuitWaitTimeout { - return true, nil - } - - statusDesc, err := w.Executor.Status(w.Ctx, w.Task) - if err != nil { - logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status failed, err: %v", - w.P.ID, w.Task.Name, err) - return true, err - } - - if statusDesc.Status == apistructs.PipelineStatusUnknown { - logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown) - return false, err4EnableDeclineRatio - } - - if statusDesc.Status.IsEndStatus() { - data = statusDesc - return true, nil + return data, nil + case <-timer.C: + statusDesc, err := w.Executor.Status(w.Ctx, w.Task) + if err != nil { + logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status failed, err: %v", + w.P.ID, w.Task.Name, err) + return nil, err + } + if statusDesc.Status.IsEndStatus() { + data = statusDesc + return data, nil + } + if statusDesc.Status == apistructs.PipelineStatusUnknown { + logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown) + } + lastSleepTime = lastSleepTime * declineRatio + if lastSleepTime > float64(declineLimit) { + lastSleepTime = float64(declineLimit) + } + timer.Reset(time.Duration(lastSleepTime) * time.Second) } - - return w.StopWaitLoop, err4EnableDeclineRatio - }) - - return data, err + } } func (w *wait) WhenDone(data interface{}) error { diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go b/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go index 6f947ae411c..4e7badb9934 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go @@ -50,9 +50,10 @@ type TaskRun struct { StopQueueLoop bool StopWaitLoop bool - PExitCh <-chan struct{} - PExitChCancel context.CancelFunc - PExit bool + PExitCh <-chan struct{} + PExitChCancel context.CancelFunc + PExit bool + ExecutorDoneCh chan interface{} // executorDoneCh allow action executor return directly // 轮训状态间隔期间可能任务已经是终态,FakeTimeout = true FakeTimeout bool @@ -69,9 +70,10 @@ func New(ctx context.Context, task *spec.PipelineTask, executor types.ActionExecutor, p *spec.Pipeline, bdl *bundle.Bundle, dbClient *dbclient.Client, js jsonstore.JsonStore, actionAgentSvc *actionagentsvc.ActionAgentSvc, extMarketSvc *extmarketsvc.ExtMarketSvc, + executorCh chan interface{}, ) *TaskRun { return &TaskRun{ - Ctx: context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), make(chan interface{})), + Ctx: ctx, Task: task, Executor: executor, Throttler: throttler, @@ -87,8 +89,9 @@ func New(ctx context.Context, task *spec.PipelineTask, StopQueueLoop: false, StopWaitLoop: false, - PExitCh: pExitCh, - PExitChCancel: pExitChCancel, + PExitCh: pExitCh, + PExitChCancel: pExitChCancel, + ExecutorDoneCh: executorCh, ActionAgentSvc: actionAgentSvc, ExtMarketSvc: extMarketSvc, @@ -135,9 +138,8 @@ type Elem struct { Cancel context.CancelFunc Timeout time.Duration - ErrCh chan error - DoneCh chan interface{} - ExecutorDoneCh chan interface{} // executorDoneCh allow action executor return directly + ErrCh chan error + DoneCh chan interface{} ExitCh chan struct{} } From 68ae71e25876f8dafd8c8a23a7b039636956cc7d Mon Sep 17 00:00:00 2001 From: joey Date: Tue, 19 Oct 2021 15:27:17 +0800 Subject: [PATCH 2/3] apitest action don't set task pointers value --- .../actionexecutor/plugins/apitest/apitest.go | 46 +++++++++++++++++-- .../plugins/apitest/logic/meta.go | 18 +++++--- .../pipengine/reconciler/reconcile.go | 5 +- .../pipengine/reconciler/taskrun/taskrun.go | 9 ++-- 4 files changed, 60 insertions(+), 18 deletions(-) diff --git a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go index d69cb97839b..a7a2bcbf17e 100644 --- a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go +++ b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go @@ -18,6 +18,8 @@ import ( "context" "fmt" + "github.com/sirupsen/logrus" + "github.com/erda-project/erda/apistructs" "github.com/erda-project/erda/modules/pipeline/dbclient" "github.com/erda-project/erda/modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic" @@ -57,7 +59,41 @@ func (d *define) Create(ctx context.Context, task *spec.PipelineTask) (interface } func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{}, error) { - logic.Do(ctx, task) + + go func(ctx context.Context, task *spec.PipelineTask) { + executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{}) + if !ok { + logrus.Warnf("apitest: failed to get executor channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID) + } + + var status = apistructs.PipelineStatusFailed + defer func() { + if r := recover(); r != nil { + logrus.Errorf("api-test logic do panic recover:%s", r) + } + // if executor chan is nil, task framework can loop query meta get status + if executorDoneCh != nil { + executorDoneCh <- apistructs.PipelineStatusDesc{Status: status} + } + }() + + logic.Do(ctx, task) + + latestTask, err := d.dbClient.GetPipelineTask(task.ID) + if err != nil { + logrus.Errorf("failed to query latest task, err: %v \n", err) + return + } + + meta := latestTask.Result.Metadata + for _, metaField := range meta { + if metaField.Name == logic.MetaKeyResult { + if metaField.Value == logic.ResultSuccess { + status = apistructs.PipelineStatusSuccess + } + } + } + }(ctx, task) return nil, nil } @@ -70,7 +106,7 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct if err != nil { return apistructs.PipelineStatusDesc{}, fmt.Errorf("failed to query latest task, err: %v", err) } - *task = latestTask + //*task = latestTask if task.Status.IsEndStatus() { return apistructs.PipelineStatusDesc{Status: task.Status}, nil @@ -92,12 +128,14 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct if metaField.Value == logic.ResultSuccess { return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusSuccess}, nil } - return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusFailed}, nil + if metaField.Value == logic.ResultFailed { + return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusFailed}, nil + } } } // return created status to do start step - return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusCreated}, nil + return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusRunning}, nil } func (d *define) Inspect(ctx context.Context, task *spec.PipelineTask) (apistructs.TaskInspect, error) { diff --git a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic/meta.go b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic/meta.go index 16bdd5b0db2..489795099f4 100644 --- a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic/meta.go +++ b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic/meta.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "strconv" + "time" "github.com/erda-project/erda/apistructs" "github.com/erda-project/erda/modules/actionagent" @@ -26,6 +27,7 @@ import ( "github.com/erda-project/erda/pkg/apitestsv2" "github.com/erda-project/erda/pkg/apitestsv2/cookiejar" "github.com/erda-project/erda/pkg/encoding/jsonparse" + "github.com/erda-project/erda/pkg/loop" ) const ( @@ -112,11 +114,15 @@ func writeMetaFile(ctx context.Context, task *spec.PipelineTask, meta *Meta) { cb.PipelineTaskID = task.ID cbData, _ := json.Marshal(&cb) - err := pipelinefunc.CallbackActionFunc(cbData) - if err != nil { - log.Errorf("failed to callback, err: %v", err) - return - } - + // apitest should ensure that callback to pipeline after doing request + _ = loop.New(loop.WithDeclineRatio(2), loop.WithDeclineLimit(10*time.Second)). + Do(func() (bool, error) { + err := pipelinefunc.CallbackActionFunc(cbData) + if err != nil { + log.Errorf("failed to callback, err: %v", err) + return false, err + } + return true, nil + }) return } diff --git a/modules/pipeline/pipengine/reconciler/reconcile.go b/modules/pipeline/pipengine/reconciler/reconcile.go index 4986b920826..fe7971148ce 100644 --- a/modules/pipeline/pipengine/reconciler/reconcile.go +++ b/modules/pipeline/pipengine/reconciler/reconcile.go @@ -104,13 +104,10 @@ func (r *Reconciler) reconcile(ctx context.Context, pipelineID uint64) error { if err != nil { return } - - executorChan := make(chan interface{}) - ctx = context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), executorChan) tr := taskrun.New(ctx, task, ctx.Value(ctxKeyPipelineExitCh).(chan struct{}), ctx.Value(ctxKeyPipelineExitChCancelFunc).(context.CancelFunc), r.TaskThrottler, executor, &p, r.bdl, r.dbClient, r.js, - r.actionAgentSvc, r.extMarketSvc, executorChan) + r.actionAgentSvc, r.extMarketSvc) // tear down task defer func() { diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go b/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go index 4e7badb9934..19c56ba4fd0 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go @@ -53,7 +53,7 @@ type TaskRun struct { PExitCh <-chan struct{} PExitChCancel context.CancelFunc PExit bool - ExecutorDoneCh chan interface{} // executorDoneCh allow action executor return directly + ExecutorDoneCh chan interface{} // 轮训状态间隔期间可能任务已经是终态,FakeTimeout = true FakeTimeout bool @@ -70,10 +70,11 @@ func New(ctx context.Context, task *spec.PipelineTask, executor types.ActionExecutor, p *spec.Pipeline, bdl *bundle.Bundle, dbClient *dbclient.Client, js jsonstore.JsonStore, actionAgentSvc *actionagentsvc.ActionAgentSvc, extMarketSvc *extmarketsvc.ExtMarketSvc, - executorCh chan interface{}, ) *TaskRun { + // make executor has buffer, don't block task framework + executorCH := make(chan interface{}, 1) return &TaskRun{ - Ctx: ctx, + Ctx: context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), executorCH), Task: task, Executor: executor, Throttler: throttler, @@ -91,7 +92,7 @@ func New(ctx context.Context, task *spec.PipelineTask, PExitCh: pExitCh, PExitChCancel: pExitChCancel, - ExecutorDoneCh: executorCh, + ExecutorDoneCh: executorCH, ActionAgentSvc: actionAgentSvc, ExtMarketSvc: extMarketSvc, From 1ef17f8c82195838a480dd402fa3c9522fea87f6 Mon Sep 17 00:00:00 2001 From: joey Date: Fri, 29 Oct 2021 13:56:11 +0800 Subject: [PATCH 3/3] api-test executor define add snyc.map to store running task --- .../actionexecutor/plugins/apitest/apitest.go | 42 +++++++--- .../reconciler/taskrun/taskop/wait.go | 32 +++++--- .../reconciler/taskrun/taskop/wait_test.go | 76 +++++++++++++++++++ .../pipengine/reconciler/taskrun/taskrun.go | 6 +- 4 files changed, 133 insertions(+), 23 deletions(-) create mode 100644 modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go diff --git a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go index a7a2bcbf17e..e3b14d7983f 100644 --- a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go +++ b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go @@ -17,6 +17,7 @@ package apitest import ( "context" "fmt" + "sync" "github.com/sirupsen/logrus" @@ -30,9 +31,10 @@ import ( var Kind = types.Kind(spec.PipelineTaskExecutorKindAPITest) type define struct { - name types.Name - options map[string]string - dbClient *dbclient.Client + name types.Name + options map[string]string + dbClient *dbclient.Client + runningAPIs sync.Map } func (d *define) Kind() types.Kind { return Kind } @@ -46,7 +48,11 @@ func (d *define) Exist(ctx context.Context, task *spec.PipelineTask) (created bo case status == apistructs.PipelineStatusCreated: return true, false, nil case status == apistructs.PipelineStatusQueue, status == apistructs.PipelineStatusRunning: - return true, true, nil + // if apitest task is not procesing, should make status-started false + if _, alreadyProcessing := d.runningAPIs.Load(d.makeRunningApiKey(task)); alreadyProcessing { + return true, true, nil + } + return true, false, nil case status.IsEndStatus(): return true, true, nil default: @@ -61,6 +67,10 @@ func (d *define) Create(ctx context.Context, task *spec.PipelineTask) (interface func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{}, error) { go func(ctx context.Context, task *spec.PipelineTask) { + if _, alreadyProcessing := d.runningAPIs.LoadOrStore(d.makeRunningApiKey(task), task); alreadyProcessing { + logrus.Warnf("apitest: task: %d already processing", task.ID) + return + } executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{}) if !ok { logrus.Warnf("apitest: failed to get executor channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID) @@ -75,6 +85,7 @@ func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{ if executorDoneCh != nil { executorDoneCh <- apistructs.PipelineStatusDesc{Status: status} } + d.runningAPIs.Delete(d.makeRunningApiKey(task)) }() logic.Do(ctx, task) @@ -112,7 +123,7 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct return apistructs.PipelineStatusDesc{Status: task.Status}, nil } - created, _, err := d.Exist(ctx, task) + created, started, err := d.Exist(ctx, task) if err != nil { return apistructs.PipelineStatusDesc{}, err } @@ -121,16 +132,22 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusAnalyzed}, nil } + if !started && len(latestTask.Result.Metadata) == 0 { + return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusBorn}, nil + } + // status according to api success or not meta := latestTask.Result.Metadata + var status = apistructs.PipelineStatusFailed for _, metaField := range meta { if metaField.Name == logic.MetaKeyResult { if metaField.Value == logic.ResultSuccess { - return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusSuccess}, nil + status = apistructs.PipelineStatusSuccess } if metaField.Value == logic.ResultFailed { - return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusFailed}, nil + status = apistructs.PipelineStatusFailed } + return apistructs.PipelineStatusDesc{Status: status}, nil } } @@ -154,6 +171,10 @@ func (d *define) BatchDelete(ctx context.Context, actions []*spec.PipelineTask) return nil, nil } +func (d *define) makeRunningApiKey(task *spec.PipelineTask) string { + return fmt.Sprintf("%d-%d", task.PipelineID, task.ID) +} + func init() { types.MustRegister(Kind, func(name types.Name, options map[string]string) (types.ActionExecutor, error) { dbClient, err := dbclient.New() @@ -161,9 +182,10 @@ func init() { return nil, fmt.Errorf("failed to init dbclient, err: %v", err) } return &define{ - name: name, - options: options, - dbClient: dbClient, + name: name, + options: options, + dbClient: dbClient, + runningAPIs: sync.Map{}, }, nil }) } diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go index 8c93259ee41..af391770850 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go @@ -17,6 +17,7 @@ package taskop import ( "context" "errors" + "math" "time" "github.com/sirupsen/logrus" @@ -30,6 +31,11 @@ import ( var err4EnableDeclineRatio = errors.New("enable decline ratio") +var ( + declineRatio float64 = 1.5 + declineLimit time.Duration = 10 * time.Second +) + type wait taskrun.TaskRun func NewWait(tr *taskrun.TaskRun) *wait { @@ -46,13 +52,12 @@ func (w *wait) TaskRun() *taskrun.TaskRun { func (w *wait) Processing() (interface{}, error) { var ( - data interface{} - lastSleepTime float64 = 1 - declineRatio = 1.5 - declineLimit int = 10 + data interface{} + loopedTimes uint64 ) - timer := time.NewTimer(time.Duration(lastSleepTime) * time.Second) + timer := time.NewTimer(w.calculateNextLoopTimeDuration(loopedTimes)) + defer timer.Stop() for { select { case doneData := <-w.ExecutorDoneCh: @@ -77,11 +82,9 @@ func (w *wait) Processing() (interface{}, error) { if statusDesc.Status == apistructs.PipelineStatusUnknown { logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown) } - lastSleepTime = lastSleepTime * declineRatio - if lastSleepTime > float64(declineLimit) { - lastSleepTime = float64(declineLimit) - } - timer.Reset(time.Duration(lastSleepTime) * time.Second) + + loopedTimes++ + timer.Reset(w.calculateNextLoopTimeDuration(loopedTimes)) } } } @@ -174,3 +177,12 @@ func (w *wait) TuneTriggers() taskrun.TaskOpTuneTriggers { AfterProcessing: aoptypes.TuneTriggerTaskAfterWait, } } + +func (w *wait) calculateNextLoopTimeDuration(loopedTimes uint64) time.Duration { + lastSleepTime := time.Second + lastSleepTime = time.Duration(float64(lastSleepTime) * math.Pow(declineRatio, float64(loopedTimes))) + if lastSleepTime > declineLimit { + return declineLimit + } + return lastSleepTime +} diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go new file mode 100644 index 00000000000..069eaf47088 --- /dev/null +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go @@ -0,0 +1,76 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package taskop + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/taskrun" +) + +func TestCalculateNextLoopTimeDuration(t *testing.T) { + tt := []struct { + loopedTimes uint64 + want string + }{ + { + loopedTimes: 0, + want: "1s", + }, + { + loopedTimes: 1, + want: "1.5s", + }, + { + loopedTimes: 2, + want: "2.25s", + }, + { + loopedTimes: 3, + want: "3.375s", + }, + { + loopedTimes: 4, + want: "5.0625s", + }, + { + loopedTimes: 5, + want: "7.59375s", + }, + { + loopedTimes: 6, + want: "10s", + }, + { + loopedTimes: 7, + want: "10s", + }, + { + loopedTimes: 8, + want: "10s", + }, + { + loopedTimes: 9, + want: "10s", + }, + } + + w := NewWait(&taskrun.TaskRun{}) + for i := range tt { + assert.Equal(t, tt[i].want, w.calculateNextLoopTimeDuration(tt[i].loopedTimes).String()) + } +} diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go b/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go index 19c56ba4fd0..55b8475b4b4 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskrun.go @@ -72,9 +72,9 @@ func New(ctx context.Context, task *spec.PipelineTask, extMarketSvc *extmarketsvc.ExtMarketSvc, ) *TaskRun { // make executor has buffer, don't block task framework - executorCH := make(chan interface{}, 1) + executorCh := make(chan interface{}, 1) return &TaskRun{ - Ctx: context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), executorCH), + Ctx: context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), executorCh), Task: task, Executor: executor, Throttler: throttler, @@ -92,7 +92,7 @@ func New(ctx context.Context, task *spec.PipelineTask, PExitCh: pExitCh, PExitChCancel: pExitChCancel, - ExecutorDoneCh: executorCH, + ExecutorDoneCh: executorCh, ActionAgentSvc: actionAgentSvc, ExtMarketSvc: extMarketSvc,