From cecc649ce0221f1b4cb9e08e6ddace4cfc969d05 Mon Sep 17 00:00:00 2001 From: joey Date: Fri, 29 Oct 2021 13:56:11 +0800 Subject: [PATCH] api-test executor define add snyc.map to store running task --- .../actionexecutor/plugins/apitest/apitest.go | 42 +++++++--- .../reconciler/taskrun/taskop/wait.go | 32 +++++--- .../reconciler/taskrun/taskop/wait_test.go | 76 +++++++++++++++++++ 3 files changed, 130 insertions(+), 20 deletions(-) create mode 100644 modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go diff --git a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go index a7a2bcbf17e1..0927559d6045 100644 --- a/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go +++ b/modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go @@ -17,6 +17,7 @@ package apitest import ( "context" "fmt" + "sync" "github.com/sirupsen/logrus" @@ -30,9 +31,10 @@ import ( var Kind = types.Kind(spec.PipelineTaskExecutorKindAPITest) type define struct { - name types.Name - options map[string]string - dbClient *dbclient.Client + name types.Name + options map[string]string + dbClient *dbclient.Client + runningApis sync.Map } func (d *define) Kind() types.Kind { return Kind } @@ -46,7 +48,11 @@ func (d *define) Exist(ctx context.Context, task *spec.PipelineTask) (created bo case status == apistructs.PipelineStatusCreated: return true, false, nil case status == apistructs.PipelineStatusQueue, status == apistructs.PipelineStatusRunning: - return true, true, nil + // if apitest task is not procesing, should make status-started false + if _, alreadyProcessing := d.runningApis.Load(d.makeRunningApiKey(task)); alreadyProcessing { + return true, true, nil + } + return true, false, nil case status.IsEndStatus(): return true, true, nil default: @@ -61,6 +67,10 @@ func (d *define) Create(ctx context.Context, task *spec.PipelineTask) (interface func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{}, error) { go func(ctx context.Context, task *spec.PipelineTask) { + if _, alreadyProcessing := d.runningApis.LoadOrStore(d.makeRunningApiKey(task), task); alreadyProcessing { + logrus.Warnf("apitest: task: %d already processing", task.ID) + return + } executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{}) if !ok { logrus.Warnf("apitest: failed to get executor channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID) @@ -75,6 +85,7 @@ func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{ if executorDoneCh != nil { executorDoneCh <- apistructs.PipelineStatusDesc{Status: status} } + d.runningApis.Delete(d.makeRunningApiKey(task)) }() logic.Do(ctx, task) @@ -112,7 +123,7 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct return apistructs.PipelineStatusDesc{Status: task.Status}, nil } - created, _, err := d.Exist(ctx, task) + created, started, err := d.Exist(ctx, task) if err != nil { return apistructs.PipelineStatusDesc{}, err } @@ -121,16 +132,22 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusAnalyzed}, nil } + if !started && len(latestTask.Result.Metadata) == 0 { + return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusBorn}, nil + } + // status according to api success or not meta := latestTask.Result.Metadata + var status = apistructs.PipelineStatusFailed for _, metaField := range meta { if metaField.Name == logic.MetaKeyResult { if metaField.Value == logic.ResultSuccess { - return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusSuccess}, nil + status = apistructs.PipelineStatusSuccess } if metaField.Value == logic.ResultFailed { - return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusFailed}, nil + status = apistructs.PipelineStatusFailed } + return apistructs.PipelineStatusDesc{Status: status}, nil } } @@ -154,6 +171,10 @@ func (d *define) BatchDelete(ctx context.Context, actions []*spec.PipelineTask) return nil, nil } +func (d *define) makeRunningApiKey(task *spec.PipelineTask) string { + return fmt.Sprintf("%d-%d", task.PipelineID, task.ID) +} + func init() { types.MustRegister(Kind, func(name types.Name, options map[string]string) (types.ActionExecutor, error) { dbClient, err := dbclient.New() @@ -161,9 +182,10 @@ func init() { return nil, fmt.Errorf("failed to init dbclient, err: %v", err) } return &define{ - name: name, - options: options, - dbClient: dbClient, + name: name, + options: options, + dbClient: dbClient, + runningApis: sync.Map{}, }, nil }) } diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go index 8c93259ee41b..af3917708500 100644 --- a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go @@ -17,6 +17,7 @@ package taskop import ( "context" "errors" + "math" "time" "github.com/sirupsen/logrus" @@ -30,6 +31,11 @@ import ( var err4EnableDeclineRatio = errors.New("enable decline ratio") +var ( + declineRatio float64 = 1.5 + declineLimit time.Duration = 10 * time.Second +) + type wait taskrun.TaskRun func NewWait(tr *taskrun.TaskRun) *wait { @@ -46,13 +52,12 @@ func (w *wait) TaskRun() *taskrun.TaskRun { func (w *wait) Processing() (interface{}, error) { var ( - data interface{} - lastSleepTime float64 = 1 - declineRatio = 1.5 - declineLimit int = 10 + data interface{} + loopedTimes uint64 ) - timer := time.NewTimer(time.Duration(lastSleepTime) * time.Second) + timer := time.NewTimer(w.calculateNextLoopTimeDuration(loopedTimes)) + defer timer.Stop() for { select { case doneData := <-w.ExecutorDoneCh: @@ -77,11 +82,9 @@ func (w *wait) Processing() (interface{}, error) { if statusDesc.Status == apistructs.PipelineStatusUnknown { logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown) } - lastSleepTime = lastSleepTime * declineRatio - if lastSleepTime > float64(declineLimit) { - lastSleepTime = float64(declineLimit) - } - timer.Reset(time.Duration(lastSleepTime) * time.Second) + + loopedTimes++ + timer.Reset(w.calculateNextLoopTimeDuration(loopedTimes)) } } } @@ -174,3 +177,12 @@ func (w *wait) TuneTriggers() taskrun.TaskOpTuneTriggers { AfterProcessing: aoptypes.TuneTriggerTaskAfterWait, } } + +func (w *wait) calculateNextLoopTimeDuration(loopedTimes uint64) time.Duration { + lastSleepTime := time.Second + lastSleepTime = time.Duration(float64(lastSleepTime) * math.Pow(declineRatio, float64(loopedTimes))) + if lastSleepTime > declineLimit { + return declineLimit + } + return lastSleepTime +} diff --git a/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go new file mode 100644 index 000000000000..069eaf47088f --- /dev/null +++ b/modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go @@ -0,0 +1,76 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package taskop + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/taskrun" +) + +func TestCalculateNextLoopTimeDuration(t *testing.T) { + tt := []struct { + loopedTimes uint64 + want string + }{ + { + loopedTimes: 0, + want: "1s", + }, + { + loopedTimes: 1, + want: "1.5s", + }, + { + loopedTimes: 2, + want: "2.25s", + }, + { + loopedTimes: 3, + want: "3.375s", + }, + { + loopedTimes: 4, + want: "5.0625s", + }, + { + loopedTimes: 5, + want: "7.59375s", + }, + { + loopedTimes: 6, + want: "10s", + }, + { + loopedTimes: 7, + want: "10s", + }, + { + loopedTimes: 8, + want: "10s", + }, + { + loopedTimes: 9, + want: "10s", + }, + } + + w := NewWait(&taskrun.TaskRun{}) + for i := range tt { + assert.Equal(t, tt[i].want, w.calculateNextLoopTimeDuration(tt[i].loopedTimes).String()) + } +}