Skip to content

Commit

Permalink
api-test executor define add snyc.map to store running task
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Nov 1, 2021
1 parent be514e5 commit cecc649
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package apitest
import (
"context"
"fmt"
"sync"

"github.com/sirupsen/logrus"

Expand All @@ -30,9 +31,10 @@ import (
var Kind = types.Kind(spec.PipelineTaskExecutorKindAPITest)

type define struct {
name types.Name
options map[string]string
dbClient *dbclient.Client
name types.Name
options map[string]string
dbClient *dbclient.Client
runningApis sync.Map
}

func (d *define) Kind() types.Kind { return Kind }
Expand All @@ -46,7 +48,11 @@ func (d *define) Exist(ctx context.Context, task *spec.PipelineTask) (created bo
case status == apistructs.PipelineStatusCreated:
return true, false, nil
case status == apistructs.PipelineStatusQueue, status == apistructs.PipelineStatusRunning:
return true, true, nil
// if apitest task is not procesing, should make status-started false
if _, alreadyProcessing := d.runningApis.Load(d.makeRunningApiKey(task)); alreadyProcessing {
return true, true, nil
}
return true, false, nil
case status.IsEndStatus():
return true, true, nil
default:
Expand All @@ -61,6 +67,10 @@ func (d *define) Create(ctx context.Context, task *spec.PipelineTask) (interface
func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{}, error) {

go func(ctx context.Context, task *spec.PipelineTask) {
if _, alreadyProcessing := d.runningApis.LoadOrStore(d.makeRunningApiKey(task), task); alreadyProcessing {
logrus.Warnf("apitest: task: %d already processing", task.ID)
return
}
executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{})
if !ok {
logrus.Warnf("apitest: failed to get executor channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID)
Expand All @@ -75,6 +85,7 @@ func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{
if executorDoneCh != nil {
executorDoneCh <- apistructs.PipelineStatusDesc{Status: status}
}
d.runningApis.Delete(d.makeRunningApiKey(task))
}()

logic.Do(ctx, task)
Expand Down Expand Up @@ -112,7 +123,7 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
return apistructs.PipelineStatusDesc{Status: task.Status}, nil
}

created, _, err := d.Exist(ctx, task)
created, started, err := d.Exist(ctx, task)
if err != nil {
return apistructs.PipelineStatusDesc{}, err
}
Expand All @@ -121,16 +132,22 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusAnalyzed}, nil
}

if !started && len(latestTask.Result.Metadata) == 0 {
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusBorn}, nil
}

// status according to api success or not
meta := latestTask.Result.Metadata
var status = apistructs.PipelineStatusFailed
for _, metaField := range meta {
if metaField.Name == logic.MetaKeyResult {
if metaField.Value == logic.ResultSuccess {
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusSuccess}, nil
status = apistructs.PipelineStatusSuccess
}
if metaField.Value == logic.ResultFailed {
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusFailed}, nil
status = apistructs.PipelineStatusFailed
}
return apistructs.PipelineStatusDesc{Status: status}, nil
}
}

Expand All @@ -154,16 +171,21 @@ func (d *define) BatchDelete(ctx context.Context, actions []*spec.PipelineTask)
return nil, nil
}

func (d *define) makeRunningApiKey(task *spec.PipelineTask) string {
return fmt.Sprintf("%d-%d", task.PipelineID, task.ID)
}

func init() {
types.MustRegister(Kind, func(name types.Name, options map[string]string) (types.ActionExecutor, error) {
dbClient, err := dbclient.New()
if err != nil {
return nil, fmt.Errorf("failed to init dbclient, err: %v", err)
}
return &define{
name: name,
options: options,
dbClient: dbClient,
name: name,
options: options,
dbClient: dbClient,
runningApis: sync.Map{},
}, nil
})
}
32 changes: 22 additions & 10 deletions modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package taskop
import (
"context"
"errors"
"math"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -30,6 +31,11 @@ import (

var err4EnableDeclineRatio = errors.New("enable decline ratio")

var (
declineRatio float64 = 1.5
declineLimit time.Duration = 10 * time.Second
)

type wait taskrun.TaskRun

func NewWait(tr *taskrun.TaskRun) *wait {
Expand All @@ -46,13 +52,12 @@ func (w *wait) TaskRun() *taskrun.TaskRun {

func (w *wait) Processing() (interface{}, error) {
var (
data interface{}
lastSleepTime float64 = 1
declineRatio = 1.5
declineLimit int = 10
data interface{}
loopedTimes uint64
)

timer := time.NewTimer(time.Duration(lastSleepTime) * time.Second)
timer := time.NewTimer(w.calculateNextLoopTimeDuration(loopedTimes))
defer timer.Stop()
for {
select {
case doneData := <-w.ExecutorDoneCh:
Expand All @@ -77,11 +82,9 @@ func (w *wait) Processing() (interface{}, error) {
if statusDesc.Status == apistructs.PipelineStatusUnknown {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown)
}
lastSleepTime = lastSleepTime * declineRatio
if lastSleepTime > float64(declineLimit) {
lastSleepTime = float64(declineLimit)
}
timer.Reset(time.Duration(lastSleepTime) * time.Second)

loopedTimes++
timer.Reset(w.calculateNextLoopTimeDuration(loopedTimes))
}
}
}
Expand Down Expand Up @@ -174,3 +177,12 @@ func (w *wait) TuneTriggers() taskrun.TaskOpTuneTriggers {
AfterProcessing: aoptypes.TuneTriggerTaskAfterWait,
}
}

func (w *wait) calculateNextLoopTimeDuration(loopedTimes uint64) time.Duration {
lastSleepTime := time.Second
lastSleepTime = time.Duration(float64(lastSleepTime) * math.Pow(declineRatio, float64(loopedTimes)))
if lastSleepTime > declineLimit {
return declineLimit
}
return lastSleepTime
}
76 changes: 76 additions & 0 deletions modules/pipeline/pipengine/reconciler/taskrun/taskop/wait_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package taskop

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/taskrun"
)

func TestCalculateNextLoopTimeDuration(t *testing.T) {
tt := []struct {
loopedTimes uint64
want string
}{
{
loopedTimes: 0,
want: "1s",
},
{
loopedTimes: 1,
want: "1.5s",
},
{
loopedTimes: 2,
want: "2.25s",
},
{
loopedTimes: 3,
want: "3.375s",
},
{
loopedTimes: 4,
want: "5.0625s",
},
{
loopedTimes: 5,
want: "7.59375s",
},
{
loopedTimes: 6,
want: "10s",
},
{
loopedTimes: 7,
want: "10s",
},
{
loopedTimes: 8,
want: "10s",
},
{
loopedTimes: 9,
want: "10s",
},
}

w := NewWait(&taskrun.TaskRun{})
for i := range tt {
assert.Equal(t, tt[i].want, w.calculateNextLoopTimeDuration(tt[i].loopedTimes).String())
}
}

0 comments on commit cecc649

Please # to comment.