-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathharness.go
107 lines (91 loc) · 2 KB
/
harness.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package harness
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"time"
)
func New(options ...Option) Handler {
o := defaultOptions()
for _, opt := range options {
opt(o)
}
return &handler{
options: o,
}
}
type Handler interface {
Start(ctx context.Context)
}
type handler struct {
options *options
}
func (h *handler) Start(ctx context.Context) {
exitTypeCh := make(chan ExitType, 1)
ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
// Use a wait group to track the completion of all runners
var wg sync.WaitGroup
wg.Add(len(h.options.runners))
// Start each runner in a separate goroutine
go func() {
for _, runner := range h.options.runners {
go func(r Runner, fnCancel context.CancelFunc) {
defer wg.Done()
defer func(r Runner, fnCancel context.CancelFunc) {
if rec := recover(); rec != nil {
err := fmt.Errorf("recovered %+v", rec)
h.options.onError(err)
r.OnError(err)
fnCancel()
}
}(r, fnCancel)
err := r.Run(ctx)
if err != nil {
h.options.onError(err)
r.OnError(err)
fnCancel()
}
}(runner, cancelFunc)
}
wg.Wait()
time.Sleep(100 * time.Microsecond)
exitTypeCh <- ExitTypeNormal
}()
go func() {
// Wait for termination signal or cancellation
signalCh := make(chan os.Signal, 1)
// defer close(signalCh)
signal.Notify(signalCh, h.options.signals...)
// Wait for all runners to complete
select {
case <-signalCh:
exitTypeCh <- ExitTypeSignal
case <-ctx.Done():
exitTypeCh <- ExitTypeCancel
}
}()
exitType := <-exitTypeCh
h.gracefulShutdown(exitType)
}
func (h *handler) gracefulShutdown(exitType ExitType) {
var wg sync.WaitGroup
for _, runner := range h.options.runners {
wg.Add(1)
go func(r Runner) {
defer wg.Done()
defer func(r Runner) {
if rec := recover(); rec != nil {
err := fmt.Errorf("recovered %+v", rec)
h.options.onError(err)
r.OnError(err)
}
}(r)
r.Shutdown(exitType)
}(runner)
}
wg.Wait()
h.options.onCompleted()
}