-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathworkerpool.go
139 lines (119 loc) · 3.95 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package ipstack
import (
"fmt"
"strconv"
"sync"
)
var (
// ErrFeedbackExistsFailed occurs when the provided implementation of
// ipstack.WorkerFeedback.Exists returns an error.
ErrFeedbackExistsFailed = "ipstack: WorkerPool: unable to check if ip already exists during feedback loop"
// ErrAPIRequestFailed occurs when the external ipstack api returns an
// error, or fails to meet the timeout requirements.
ErrAPIRequestFailed = "ipstack: WorkerPool: error during api request"
// ErrFeedbackCreateResponseFailed occurs when the provided implementation
// of ipstack.WorkerFeedback.CreateResponse returns an error.
ErrFeedbackCreateResponseFailed = "ipstack: WorkerPool: unable to create response during feedback loop"
)
// WorkerFeedback is used to give single workers inside the ipstack.WorkerPool
// feedback about the way they should handle the ip address in question
type WorkerFeedback interface {
Exists(ip string) (exists bool, err error)
CreateResponse(ip string, r *Response) (err error)
}
// WorkerPool represents a single ipstack.WorkerPool, which is able to perform
// ipstack IP Checks in a coordinated way across a fleet of goroutine workers.
// The pool increases performance and removes blocking calls to external APIs
// from your own goroutine
type WorkerPool struct {
unresolved chan string
shutdown chan struct{}
wg sync.WaitGroup
fb WorkerFeedback
c *Client
Config *WorkerPoolConfig
}
// NewWorkerPool initializes a new WorkerPool instance. It performs runtime-
// checks for the passed arguments and starts all worker goroutines.
func NewWorkerPool(config *WorkerPoolConfig, c *Client, fb WorkerFeedback) (wp *WorkerPool, err error) {
if config == nil {
return nil, fmt.Errorf("ipstack: unable to create WorkerPool with nil config")
}
if config.Log == nil {
return nil, fmt.Errorf("ipstack: unable to create WorkerPool with nil logger in config")
}
if c == nil {
return nil, fmt.Errorf("ipstack: unbale to create WorkerPool with nil client")
}
if fb == nil {
return nil, fmt.Errorf("ipstack: unable to create WorkerPool with nil WorkerFeeback")
}
// allocate WorkerPool
wp = &WorkerPool{
unresolved: make(chan string, config.QueueSize),
shutdown: make(chan struct{}),
fb: fb,
c: c,
Config: config,
}
// start worker goroutines
for i := 0; i < config.Workers; i++ {
wp.wg.Add(1)
go wp.worker(i + 1)
}
return wp, nil
}
// Queue queues the passed ip into the internal buffered channel for unresolved
// IPs. Workers will dequeue it once they are ready. If a shutdown occurs
// before all IPs are dequeued, the shutdown caller will synchronously handle
// all remaining IPs
func (wp *WorkerPool) Queue(ip string) {
wp.unresolved <- ip
}
// Shutdown shutdowns all previously started workers and handles all remaining
// unresolved IPs synchronously before returing
func (wp *WorkerPool) Shutdown() {
// send shutdown signal
close(wp.shutdown)
// wait for all to finish
wp.wg.Wait()
// finish remaining IPs in the buffer
for len(wp.unresolved) > 0 {
wp.resolveEntry(<-wp.unresolved)
}
}
func (wp *WorkerPool) worker(i int) {
wp.Config.Log.Info("ipstack: WorkerPool: worker" + strconv.Itoa(i) + " started")
for {
select {
case <-wp.shutdown:
wp.Config.Log.Info("ipstack: WorkerPool: worker" + strconv.Itoa(i) + " received shutdown. Finishing")
wp.wg.Done()
case ip := <-wp.unresolved:
wp.resolveEntry(ip)
}
}
}
func (wp *WorkerPool) resolveEntry(ip string) {
// check if ip already exists
exists, err := wp.fb.Exists(ip)
if err != nil {
wp.Config.Log.Error(ErrFeedbackExistsFailed, err)
return
}
// ip already resolved -> nothing to do
if exists {
return
}
// ip not resolved -> query external API
r, err := wp.c.Check(ip)
if err != nil {
wp.Config.Log.Error(ErrAPIRequestFailed, err)
return
}
// send response to feedback loop
err = wp.fb.CreateResponse(ip, r)
if err != nil {
wp.Config.Log.Error(ErrFeedbackCreateResponseFailed, err)
}
}