-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandler.go
141 lines (123 loc) · 3.01 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package op
import (
"context"
"fmt"
"regexp"
"strconv"
"sync"
)
// HandlerError is the error type return from a Handler.
type HandlerError struct {
ContextError error
OperationErrors map[string]error
}
func (err HandlerError) Error() string {
n := len(err.OperationErrors)
var sep, names string
for k := range err.OperationErrors {
names += sep + k
sep = ", "
}
return fmt.Sprintf("%d operations failed: %v", n, names)
}
// Unwrap returns the handler's context error, if any.
func (err HandlerError) Unwrap() error {
return err.ContextError
}
// Handler allows grouping and naming operatons.
type Handler struct {
ctx context.Context
wg sync.WaitGroup
mi []MiddlewareFunc
mu sync.Mutex
reg map[string]*Operation
}
// NewHandler returns a handler that will start operations using the passed in
// context. A sizeHint can optionally be provided.
func NewHandler(ctx context.Context, sizeHint ...int) *Handler {
var hint int
switch len(sizeHint) {
case 0:
hint = 64
case 1:
hint = sizeHint[0]
default:
panic("can only provide one sizeHint")
}
return &Handler{ctx: ctx, reg: make(map[string]*Operation, hint)}
}
// Use sets up a common middleware for use by all operatoins. It can be called
// multiple times to add multiple middleware functions. Middleware are applied
// in the oreder they are added, meaning the last middleware to be added will be
// the "outer" middleware.
func (h *Handler) Use(m MiddlewareFunc) {
if m != nil {
h.mi = append(h.mi, m)
}
}
// Start adds the passed i operation to the handler, and starts it with an
// operator key appended to context. An incremental suffix is added to key if
// the key has previously been used within the same context.
func (h *Handler) Start(key string, op *Operation) {
h.mu.Lock()
key = uniqueKey(key, h.reg)
h.reg[key] = op
h.mu.Unlock()
ctx := contextWithKey(h.ctx, key)
// Apply common middelware.
for _, m := range h.mi {
op.Use(m)
}
op.Use(func(f Func) Func {
return func(ctx context.Context) error {
defer h.wg.Done()
return f(ctx)
}
})
// Start operation.
h.wg.Add(1)
op.Start(ctx)
}
var reIncrString = regexp.MustCompile(`_#(\d+)$`)
func uniqueKey(key string, m map[string]*Operation) string {
_, invalid := m[key]
invalid = invalid && key != ""
if !invalid {
return key
}
var base string
var i int
if loc := reIncrString.FindStringIndex(key); loc != nil {
base = key[:loc[0]]
i, _ = strconv.Atoi(key[loc[0]+2 : loc[1]])
} else {
base = key
}
for invalid {
i++
key = fmt.Sprintf("%s_#%d", base, i)
_, invalid = m[key]
}
return key
}
// Wait waits for all started operations to completed, and returns an error if
// at least one of them failed.
func (h *Handler) Wait() error {
h.wg.Wait()
hErr := HandlerError{
ContextError: h.ctx.Err(),
OperationErrors: make(map[string]error, len(h.reg)),
}
h.mu.Lock()
defer h.mu.Unlock()
for k, op := range h.reg {
err := op.Wait()
if err != nil {
hErr.OperationErrors[k] = err
}
}
if len(hErr.OperationErrors) > 0 {
return hErr
}
return nil
}