-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathgollback.go
145 lines (120 loc) · 2.66 KB
/
gollback.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package gollback
import (
"context"
"errors"
"sync"
)
var ErrNoCallbacks = errors.New("no callback to run")
// AsyncFunc represents asynchronous function
type AsyncFunc func(ctx context.Context) (interface{}, error)
type response struct {
res interface{}
err error
}
// Race method returns a response as soon as one of the callbacks executes without an error,
// otherwise last error is returned
// will panic if context is nil
func Race(ctx context.Context, fns ...AsyncFunc) (interface{}, error) {
if ctx == nil {
panic("nil context provided")
}
if len(fns) == 0 {
return nil, ErrNoCallbacks
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
out := make(chan *response, 1)
defer close(out)
responses := make(chan *response, len(fns))
defer close(responses)
var responded bool
var lock sync.Mutex
var errCount int
go func() {
for {
select {
case <-ctx.Done():
lock.Lock()
if !responded {
responded = true
out <- &response{
err: ctx.Err(),
}
lock.Unlock()
return
}
lock.Unlock()
case r, more := <-responses:
lock.Lock()
if !more || (!responded && r.err == nil) || (errCount == len(fns)) {
responded = true
out <- r
lock.Unlock()
return
}
lock.Unlock()
}
}
}()
for _, fn := range fns {
go func(f AsyncFunc) {
var r response
r.res, r.err = f(ctx)
lock.Lock()
defer lock.Unlock()
if r.err != nil {
errCount ++
}
if !responded {
responses <- &r
}
}(fn)
}
r := <-out
return r.res, r.err
}
// All method returns when all the callbacks passed as an iterable have finished,
// returned responses and errors are ordered according to callback order
// will panic if context is nil
func All(ctx context.Context, fns ...AsyncFunc) ([]interface{}, []error) {
if ctx == nil {
panic("nil context provided")
}
rs := make([]interface{}, len(fns))
errs := make([]error, len(fns))
var wg sync.WaitGroup
wg.Add(len(fns))
for i, fn := range fns {
go func(index int, f AsyncFunc) {
defer wg.Done()
var r response
r.res, r.err = f(ctx)
rs[index] = r.res
errs[index] = r.err
}(i, fn)
}
wg.Wait()
return rs, errs
}
// Retry method retries callback given amount of times until it executes without an error,
// when retries = 0 it will retry infinitely
// will panic if context is nil
func Retry(ctx context.Context, retires int, fn AsyncFunc) (interface{}, error) {
if ctx == nil {
panic("nil context provided")
}
i := 1
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
var r response
r.res, r.err = fn(ctx)
if r.err == nil || i == retires {
return r.res, r.err
}
i++
}
}
}