Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix(TKC-3004): kill nested processes on timeout properly #6097

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions cmd/testworkflow-init/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package main
import (
"encoding/json"
"errors"
"fmt"
"os"
"os/signal"
"slices"
"strconv"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -310,6 +312,7 @@ func main() {
}

// Configure timeout finalizer
var hasTimeout, hasOwnTimeout atomic.Bool
finalizeTimeout := func() {
// Check timed out steps in leaf
timedOut := orchestration.GetTimedOut(leaf...)
Expand All @@ -321,6 +324,10 @@ func main() {
for _, r := range timedOut {
r.SetStatus(constants.StepStatusTimeout)
sub := state.GetSubSteps(r.Ref)
hasTimeout.Store(true)
if step.Ref == r.Ref {
hasOwnTimeout.Store(true)
}
for i := range sub {
if sub[i].IsFinished() {
continue
Expand All @@ -331,7 +338,6 @@ func main() {
sub[i].SetStatus(constants.StepStatusSkipped)
}
}
stdoutUnsafe.Println("Timed out.")
}
_ = orchestration.Executions.Kill()

Expand All @@ -358,6 +364,8 @@ func main() {
}

// Register timeouts
hasTimeout.Store(false)
hasOwnTimeout.Store(false)
stopTimeoutWatcher := orchestration.WatchTimeout(finalizeTimeout, leaf...)

// Run the command
Expand All @@ -366,12 +374,17 @@ func main() {
// Stop timer listener
stopTimeoutWatcher()

// Handle timeout gracefully
if hasOwnTimeout.Load() {
orchestration.Executions.ClearAbortedStatus()
}

// Ensure there won't be any hanging processes after the command is executed
_ = orchestration.Executions.Kill()

// TODO: Handle retry policy in tree independently
// Verify if there may be any other iteration
if step.Iteration >= step.Retry.Count {
if step.Iteration >= step.Retry.Count || (!hasOwnTimeout.Load() && hasTimeout.Load()) {
break
}

Expand All @@ -393,7 +406,15 @@ func main() {
// Continue with the next iteration
step.Iteration++
stdout.HintDetails(step.Ref, constants.InstructionIteration, step.Iteration)
stdoutUnsafe.Printf("\nExit code: %d • Retrying: attempt #%d (of %d):\n", step.ExitCode, step.Iteration, step.Retry.Count)
message := fmt.Sprintf("Exit code: %d", step.ExitCode)
if hasOwnTimeout.Load() {
message = "Timed out"
}
stdoutUnsafe.Printf("\n%s • Retrying: attempt #%d (of %d):\n", message, step.Iteration, step.Retry.Count)

// Restart start time for the next iteration to allow retries
now := time.Now()
step.StartedAt = &now
}
}

Expand Down
8 changes: 7 additions & 1 deletion cmd/testworkflow-init/orchestration/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type executionGroup struct {

paused atomic.Bool
pauseMu sync.Mutex

softKillProgress atomic.Bool
}

func newExecutionGroup(outStream io.Writer, errStream io.Writer) *executionGroup {
Expand Down Expand Up @@ -142,6 +144,10 @@ func (e *executionGroup) IsAborted() bool {
return e.aborted.Load()
}

func (e *executionGroup) ClearAbortedStatus() {
e.aborted.Store(false)
}

type execution struct {
cmd *exec.Cmd
cmdMu sync.Mutex
Expand Down Expand Up @@ -203,7 +209,7 @@ func (e *execution) Run() (*executionResult, error) {

// Mark the execution group as aborted when this process was aborted.
// In Kubernetes, when that child process is killed, it may mean OOM Kill.
if aborted && !e.group.aborted.Load() {
if aborted && !e.group.aborted.Load() && !e.group.softKillProgress.Load() {
e.group.Abort()
}

Expand Down
28 changes: 12 additions & 16 deletions cmd/testworkflow-init/orchestration/processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,14 @@ func (p *processNode) Find(pid int32) []*processNode {
}

func (p *processNode) VirtualizePath(pid int32) {
path := p.Find(pid)
if path == nil {
return
}

// Cannot virtualize itself
if len(path) == 1 {
return
}

// Virtualize recursively
for i := 1; i < len(path); i++ {
delete(path[0].nodes, path[i])
for node := range path[i].nodes {
path[0].nodes[node] = struct{}{}
for ps := range p.nodes {
if ps.pid == pid {
for sub := range ps.nodes {
p.nodes[sub] = struct{}{}
}
delete(p.nodes, ps)
} else {
ps.VirtualizePath(pid)
}
}
}
Expand Down Expand Up @@ -106,7 +99,10 @@ func (p *processNode) Resume() error {
func (p *processNode) Kill() error {
errs := make([]error, 0)
if p.pid != -1 {
return errors.Wrap((&gopsutil.Process{Pid: p.pid}).Kill(), "killing processes")
err := errors.Wrap((&gopsutil.Process{Pid: p.pid}).Kill(), "killing processes")
if err != nil {
return err
}
}
for node := range p.nodes {
err := node.Kill()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package testkube

func (s *TestWorkflowStepStatus) Finished() bool {
return s != nil && *s != "" && *s != QUEUED_TestWorkflowStepStatus && *s != PAUSED_TestWorkflowStepStatus && *s != RUNNING_TestWorkflowStepStatus && *s != TIMEOUT_TestWorkflowStepStatus
return s != nil && *s != "" && *s != QUEUED_TestWorkflowStepStatus && *s != PAUSED_TestWorkflowStepStatus && *s != RUNNING_TestWorkflowStepStatus
}

func (s *TestWorkflowStepStatus) Aborted() bool {
Expand Down
Loading