Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix/pipeline apitest cost time #2650

Merged
merged 3 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package apitest
import (
"context"
"fmt"
"sync"

"github.com/sirupsen/logrus"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/pipeline/dbclient"
Expand All @@ -28,9 +31,10 @@ import (
var Kind = types.Kind(spec.PipelineTaskExecutorKindAPITest)

type define struct {
name types.Name
options map[string]string
dbClient *dbclient.Client
name types.Name
options map[string]string
dbClient *dbclient.Client
runningAPIs sync.Map
}

func (d *define) Kind() types.Kind { return Kind }
Expand All @@ -44,7 +48,11 @@ func (d *define) Exist(ctx context.Context, task *spec.PipelineTask) (created bo
case status == apistructs.PipelineStatusCreated:
return true, false, nil
case status == apistructs.PipelineStatusQueue, status == apistructs.PipelineStatusRunning:
return true, true, nil
// if apitest task is not procesing, should make status-started false
if _, alreadyProcessing := d.runningAPIs.Load(d.makeRunningApiKey(task)); alreadyProcessing {
return true, true, nil
}
return true, false, nil
case status.IsEndStatus():
return true, true, nil
default:
Expand All @@ -57,7 +65,46 @@ func (d *define) Create(ctx context.Context, task *spec.PipelineTask) (interface
}

func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{}, error) {
sfwn marked this conversation as resolved.
Show resolved Hide resolved
logic.Do(ctx, task)

go func(ctx context.Context, task *spec.PipelineTask) {
if _, alreadyProcessing := d.runningAPIs.LoadOrStore(d.makeRunningApiKey(task), task); alreadyProcessing {
logrus.Warnf("apitest: task: %d already processing", task.ID)
return
}
executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{})
if !ok {
logrus.Warnf("apitest: failed to get executor channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID)
}

var status = apistructs.PipelineStatusFailed
defer func() {
if r := recover(); r != nil {
logrus.Errorf("api-test logic do panic recover:%s", r)
}
// if executor chan is nil, task framework can loop query meta get status
if executorDoneCh != nil {
executorDoneCh <- apistructs.PipelineStatusDesc{Status: status}
}
d.runningAPIs.Delete(d.makeRunningApiKey(task))
}()

logic.Do(ctx, task)

latestTask, err := d.dbClient.GetPipelineTask(task.ID)
if err != nil {
logrus.Errorf("failed to query latest task, err: %v \n", err)
return
}

meta := latestTask.Result.Metadata
for _, metaField := range meta {
if metaField.Name == logic.MetaKeyResult {
if metaField.Value == logic.ResultSuccess {
status = apistructs.PipelineStatusSuccess
}
}
}
}(ctx, task)
return nil, nil
}

Expand All @@ -70,13 +117,13 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
if err != nil {
return apistructs.PipelineStatusDesc{}, fmt.Errorf("failed to query latest task, err: %v", err)
}
*task = latestTask
//*task = latestTask

if task.Status.IsEndStatus() {
return apistructs.PipelineStatusDesc{Status: task.Status}, nil
}

created, _, err := d.Exist(ctx, task)
created, started, err := d.Exist(ctx, task)
if err != nil {
return apistructs.PipelineStatusDesc{}, err
}
Expand All @@ -85,19 +132,27 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusAnalyzed}, nil
}

if !started && len(latestTask.Result.Metadata) == 0 {
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusBorn}, nil
}

// status according to api success or not
meta := latestTask.Result.Metadata
var status = apistructs.PipelineStatusFailed
for _, metaField := range meta {
if metaField.Name == logic.MetaKeyResult {
if metaField.Value == logic.ResultSuccess {
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusSuccess}, nil
status = apistructs.PipelineStatusSuccess
}
if metaField.Value == logic.ResultFailed {
status = apistructs.PipelineStatusFailed
}
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusFailed}, nil
return apistructs.PipelineStatusDesc{Status: status}, nil
}
}

// return created status to do start step
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusCreated}, nil
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusRunning}, nil
}

func (d *define) Inspect(ctx context.Context, task *spec.PipelineTask) (apistructs.TaskInspect, error) {
Expand All @@ -116,16 +171,21 @@ func (d *define) BatchDelete(ctx context.Context, actions []*spec.PipelineTask)
return nil, nil
}

func (d *define) makeRunningApiKey(task *spec.PipelineTask) string {
return fmt.Sprintf("%d-%d", task.PipelineID, task.ID)
}

func init() {
types.MustRegister(Kind, func(name types.Name, options map[string]string) (types.ActionExecutor, error) {
dbClient, err := dbclient.New()
if err != nil {
return nil, fmt.Errorf("failed to init dbclient, err: %v", err)
}
return &define{
name: name,
options: options,
dbClient: dbClient,
name: name,
options: options,
dbClient: dbClient,
runningAPIs: sync.Map{},
}, nil
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
"time"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/actionagent"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/erda-project/erda/pkg/apitestsv2"
"github.com/erda-project/erda/pkg/apitestsv2/cookiejar"
"github.com/erda-project/erda/pkg/encoding/jsonparse"
"github.com/erda-project/erda/pkg/loop"
)

const (
Expand Down Expand Up @@ -112,11 +114,15 @@ func writeMetaFile(ctx context.Context, task *spec.PipelineTask, meta *Meta) {
cb.PipelineTaskID = task.ID

cbData, _ := json.Marshal(&cb)
err := pipelinefunc.CallbackActionFunc(cbData)
if err != nil {
log.Errorf("failed to callback, err: %v", err)
return
}

// apitest should ensure that callback to pipeline after doing request
_ = loop.New(loop.WithDeclineRatio(2), loop.WithDeclineLimit(10*time.Second)).
Do(func() (bool, error) {
err := pipelinefunc.CallbackActionFunc(cbData)
if err != nil {
log.Errorf("failed to callback, err: %v", err)
return false, err
}
return true, nil
})
return
}
1 change: 0 additions & 1 deletion modules/pipeline/pipengine/reconciler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (r *Reconciler) reconcile(ctx context.Context, pipelineID uint64) error {
if err != nil {
return
}

tr := taskrun.New(ctx, task,
ctx.Value(ctxKeyPipelineExitCh).(chan struct{}), ctx.Value(ctxKeyPipelineExitChCancelFunc).(context.CancelFunc),
r.TaskThrottler, executor, &p, r.bdl, r.dbClient, r.js,
Expand Down
12 changes: 1 addition & 11 deletions modules/pipeline/pipengine/reconciler/taskrun/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ import (
"github.com/erda-project/erda/modules/pipeline/conf"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/rlog"
"github.com/erda-project/erda/modules/pipeline/pkg/errorsx"
"github.com/erda-project/erda/modules/pipeline/spec"
"github.com/erda-project/erda/pkg/loop"
"github.com/erda-project/erda/pkg/strutil"
)

func (tr *TaskRun) Do(itr TaskOp) error {
logrus.Infof("reconciler: pipelineID: %d, task %q begin %s", tr.P.ID, tr.Task.Name, itr.Op())

executorDoneCh := tr.Ctx.Value(spec.MakeTaskExecutorCtxKey(tr.Task)).(chan interface{})
o := &Elem{ErrCh: make(chan error), DoneCh: make(chan interface{}), ExitCh: make(chan struct{}), ExecutorDoneCh: executorDoneCh}
o := &Elem{ErrCh: make(chan error), DoneCh: make(chan interface{}), ExitCh: make(chan struct{})}
o.TimeoutCh, o.Cancel, o.Timeout = itr.TimeoutConfig()

// define op handle func
Expand Down Expand Up @@ -161,14 +159,6 @@ func (tr *TaskRun) waitOp(itr TaskOp, o *Elem) (result error) {
// aop
_ = aop.Handle(aop.NewContextForTask(*tr.Task, *tr.P, itr.TuneTriggers().AfterProcessing))

case data := <-o.ExecutorDoneCh:
tr.LogStep(itr.Op(), fmt.Sprintf("framework accept signal from executor %s, begin do WhenDone", tr.Executor.Name()))
defer tr.LogStep(itr.Op(), fmt.Sprintf("framework accept signal from executor %s, end do WhenDone", tr.Executor.Name()))
if err := itr.WhenDone(data); err != nil {
errs = append(errs, err.Error())
}
_ = aop.Handle(aop.NewContextForTask(*tr.Task, *tr.P, itr.TuneTriggers().AfterProcessing))

case err := <-o.ErrCh:
logrus.Errorf("reconciler: pipelineID: %d, task %q %s received error (%v)", tr.P.ID, tr.Task.Name, itr.Op(), err)
if errorsx.IsNetworkError(err) {
Expand Down
90 changes: 45 additions & 45 deletions modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package taskop
import (
"context"
"errors"
"math"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -25,13 +26,16 @@ import (
"github.com/erda-project/erda/modules/pipeline/aop/aoptypes"
"github.com/erda-project/erda/modules/pipeline/commonutil/costtimeutil"
"github.com/erda-project/erda/modules/pipeline/conf"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/rlog"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/taskrun"
"github.com/erda-project/erda/pkg/loop"
)

var err4EnableDeclineRatio = errors.New("enable decline ratio")

var (
declineRatio float64 = 1.5
declineLimit time.Duration = 10 * time.Second
)

type wait taskrun.TaskRun

func NewWait(tr *taskrun.TaskRun) *wait {
Expand All @@ -47,55 +51,42 @@ func (w *wait) TaskRun() *taskrun.TaskRun {
}

func (w *wait) Processing() (interface{}, error) {
stopWaitCh := make(chan struct{})
defer func() {
stopWaitCh <- struct{}{}
}()
go func() {
var (
data interface{}
loopedTimes uint64
)

timer := time.NewTimer(w.calculateNextLoopTimeDuration(loopedTimes))
defer timer.Stop()
for {
select {
case doneData := <-w.ExecutorDoneCh:
logrus.Infof("%s: accept signal from executor %s, data: %v", w.Op(), w.Executor.Name(), doneData)
return doneData, nil
case <-w.Ctx.Done():
w.StopWaitLoop = true
return
return data, nil
case <-w.PExitCh:
logrus.Warnf("reconciler: pipeline exit, stop wait, pipelineID: %d, taskID: %d", w.P.ID, w.Task.ID)
return
case <-stopWaitCh:
rlog.TDebugf(w.P.ID, w.Task.ID, "stop wait")
close(stopWaitCh)
return
}
}()

var (
data interface{}
)

err := loop.New(loop.WithDeclineRatio(1.5), loop.WithDeclineLimit(time.Second*10)).Do(func() (abort bool, err error) {
if w.QuitWaitTimeout {
return true, nil
}

statusDesc, err := w.Executor.Status(w.Ctx, w.Task)
if err != nil {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status failed, err: %v",
w.P.ID, w.Task.Name, err)
return true, err
}

if statusDesc.Status == apistructs.PipelineStatusUnknown {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown)
return false, err4EnableDeclineRatio
}
return data, nil
case <-timer.C:
statusDesc, err := w.Executor.Status(w.Ctx, w.Task)
if err != nil {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status failed, err: %v",
w.P.ID, w.Task.Name, err)
return nil, err
}
if statusDesc.Status.IsEndStatus() {
data = statusDesc
return data, nil
}
if statusDesc.Status == apistructs.PipelineStatusUnknown {
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown)
}

if statusDesc.Status.IsEndStatus() {
data = statusDesc
return true, nil
loopedTimes++
timer.Reset(w.calculateNextLoopTimeDuration(loopedTimes))
}

return w.StopWaitLoop, err4EnableDeclineRatio
})

return data, err
}
}

func (w *wait) WhenDone(data interface{}) error {
Expand Down Expand Up @@ -186,3 +177,12 @@ func (w *wait) TuneTriggers() taskrun.TaskOpTuneTriggers {
AfterProcessing: aoptypes.TuneTriggerTaskAfterWait,
}
}

func (w *wait) calculateNextLoopTimeDuration(loopedTimes uint64) time.Duration {
lastSleepTime := time.Second
lastSleepTime = time.Duration(float64(lastSleepTime) * math.Pow(declineRatio, float64(loopedTimes)))
if lastSleepTime > declineLimit {
return declineLimit
}
return lastSleepTime
}
Loading