Skip to content

Commit bfe4374

Browse files
authored
feat(metric): add success, failure and submitted tasks (#48)
1 parent 8af744c commit bfe4374

8 files changed

+142
-75
lines changed

metric.go

+36-1
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,21 @@ type Metric interface {
77
IncBusyWorker()
88
DecBusyWorker()
99
BusyWorkers() uint64
10+
SuccessTasks() uint64
11+
FailureTasks() uint64
12+
SubmittedTasks() uint64
13+
IncSuccessTask()
14+
IncFailureTask()
15+
IncSubmittedTask()
1016
}
1117

18+
var _ Metric = (*metric)(nil)
19+
1220
type metric struct {
13-
busyWorkers uint64
21+
busyWorkers uint64
22+
successTasks uint64
23+
failureTasks uint64
24+
submittedTasks uint64
1425
}
1526

1627
// NewMetric for default metric structure
@@ -29,3 +40,27 @@ func (m *metric) DecBusyWorker() {
2940
func (m *metric) BusyWorkers() uint64 {
3041
return atomic.LoadUint64(&m.busyWorkers)
3142
}
43+
44+
func (m *metric) IncSuccessTask() {
45+
atomic.AddUint64(&m.successTasks, 1)
46+
}
47+
48+
func (m *metric) IncFailureTask() {
49+
atomic.AddUint64(&m.failureTasks, 1)
50+
}
51+
52+
func (m *metric) IncSubmittedTask() {
53+
atomic.AddUint64(&m.submittedTasks, 1)
54+
}
55+
56+
func (m *metric) SuccessTasks() uint64 {
57+
return atomic.LoadUint64(&m.successTasks)
58+
}
59+
60+
func (m *metric) FailureTasks() uint64 {
61+
return atomic.LoadUint64(&m.failureTasks)
62+
}
63+
64+
func (m *metric) SubmittedTasks() uint64 {
65+
return atomic.LoadUint64(&m.submittedTasks)
66+
}

metric_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package queue
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestMetricData(t *testing.T) {
13+
w := NewConsumer(
14+
WithFn(func(ctx context.Context, m QueuedMessage) error {
15+
switch string(m.Bytes()) {
16+
case "foo1":
17+
panic("missing something")
18+
case "foo2":
19+
return errors.New("missing something")
20+
case "foo3":
21+
return nil
22+
}
23+
return nil
24+
}),
25+
)
26+
q, err := NewQueue(
27+
WithWorker(w),
28+
WithWorkerCount(4),
29+
)
30+
assert.NoError(t, err)
31+
assert.NoError(t, q.Queue(mockMessage{
32+
message: "foo1",
33+
}))
34+
assert.NoError(t, q.Queue(mockMessage{
35+
message: "foo2",
36+
}))
37+
assert.NoError(t, q.Queue(mockMessage{
38+
message: "foo3",
39+
}))
40+
assert.NoError(t, q.Queue(mockMessage{
41+
message: "foo4",
42+
}))
43+
q.Start()
44+
time.Sleep(50 * time.Millisecond)
45+
assert.Equal(t, 4, q.SubmittedTasks())
46+
assert.Equal(t, 2, q.SuccessTasks())
47+
assert.Equal(t, 2, q.FailureTasks())
48+
q.Release()
49+
}

queue.go

+56-30
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,6 @@ func NewQueue(opts ...Option) (*Queue, error) {
7979
return q, nil
8080
}
8181

82-
// Capacity for queue max size
83-
func (q *Queue) Capacity() int {
84-
return q.worker.Capacity()
85-
}
86-
87-
// Usage for count of queue usage
88-
func (q *Queue) Usage() int {
89-
return q.worker.Usage()
90-
}
91-
9282
// Start to enable all worker
9383
func (q *Queue) Start() {
9484
go q.start()
@@ -123,24 +113,24 @@ func (q *Queue) BusyWorkers() int {
123113
return int(q.metric.BusyWorkers())
124114
}
125115

126-
// Wait all process
127-
func (q *Queue) Wait() {
128-
q.routineGroup.Wait()
116+
// BusyWorkers returns the numbers of success tasks.
117+
func (q *Queue) SuccessTasks() int {
118+
return int(q.metric.SuccessTasks())
129119
}
130120

131-
func (q *Queue) handleQueue(timeout time.Duration, job QueuedMessage) error {
132-
if atomic.LoadInt32(&q.stopFlag) == 1 {
133-
return ErrQueueShutdown
134-
}
121+
// BusyWorkers returns the numbers of failure tasks.
122+
func (q *Queue) FailureTasks() int {
123+
return int(q.metric.FailureTasks())
124+
}
135125

136-
data := Job{
137-
Timeout: timeout,
138-
Payload: job.Bytes(),
139-
}
126+
// BusyWorkers returns the numbers of submitted tasks.
127+
func (q *Queue) SubmittedTasks() int {
128+
return int(q.metric.SubmittedTasks())
129+
}
140130

141-
return q.worker.Queue(Job{
142-
Payload: data.Encode(),
143-
})
131+
// Wait all process
132+
func (q *Queue) Wait() {
133+
q.routineGroup.Wait()
144134
}
145135

146136
// Queue to queue all job
@@ -153,19 +143,25 @@ func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error
153143
return q.handleQueue(timeout, job)
154144
}
155145

156-
func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error {
146+
func (q *Queue) handleQueue(timeout time.Duration, job QueuedMessage) error {
157147
if atomic.LoadInt32(&q.stopFlag) == 1 {
158148
return ErrQueueShutdown
159149
}
160150

161151
data := Job{
162152
Timeout: timeout,
153+
Payload: job.Bytes(),
163154
}
164155

165-
return q.worker.Queue(Job{
166-
Task: task,
156+
if err := q.worker.Queue(Job{
167157
Payload: data.Encode(),
168-
})
158+
}); err != nil {
159+
return err
160+
}
161+
162+
q.metric.IncSubmittedTask()
163+
164+
return nil
169165
}
170166

171167
// QueueTask to queue job task
@@ -178,18 +174,48 @@ func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task TaskFunc) error
178174
return q.handleQueueTask(timeout, task)
179175
}
180176

177+
func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error {
178+
if atomic.LoadInt32(&q.stopFlag) == 1 {
179+
return ErrQueueShutdown
180+
}
181+
182+
data := Job{
183+
Timeout: timeout,
184+
}
185+
186+
if err := q.worker.Queue(Job{
187+
Task: task,
188+
Payload: data.Encode(),
189+
}); err != nil {
190+
return err
191+
}
192+
193+
q.metric.IncSubmittedTask()
194+
195+
return nil
196+
}
197+
181198
func (q *Queue) work(task QueuedMessage) {
199+
var err error
182200
// to handle panic cases from inside the worker
183201
// in such case, we start a new goroutine
184202
defer func() {
185203
q.metric.DecBusyWorker()
186-
if err := recover(); err != nil {
204+
e := recover()
205+
if e != nil {
187206
q.logger.Errorf("panic error: %v", err)
188207
}
189208
q.schedule()
209+
210+
// increase success or failure number
211+
if err == nil && e == nil {
212+
q.metric.IncSuccessTask()
213+
} else {
214+
q.metric.IncFailureTask()
215+
}
190216
}()
191217

192-
if err := q.worker.Run(task); err != nil {
218+
if err = q.worker.Run(task); err != nil {
193219
q.logger.Errorf("runtime error: %s", err.Error())
194220
}
195221
}

queue_test.go

+1-27
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestNewQueue(t *testing.T) {
3434
assert.NotNil(t, q)
3535

3636
q.Start()
37-
assert.Equal(t, uint64(0), w.BusyWorkers())
37+
assert.Equal(t, 0, q.BusyWorkers())
3838
q.Shutdown()
3939
q.Wait()
4040
}
@@ -59,31 +59,6 @@ func TestShtdonwOnce(t *testing.T) {
5959
assert.Equal(t, 0, q.BusyWorkers())
6060
}
6161

62-
func TestWorkerStatus(t *testing.T) {
63-
m := mockMessage{
64-
message: "foobar",
65-
}
66-
w := &messageWorker{
67-
messages: make(chan QueuedMessage, 100),
68-
}
69-
q, err := NewQueue(
70-
WithWorker(w),
71-
WithWorkerCount(2),
72-
)
73-
assert.NoError(t, err)
74-
assert.NotNil(t, q)
75-
76-
assert.NoError(t, q.Queue(m))
77-
assert.NoError(t, q.Queue(m))
78-
assert.NoError(t, q.QueueWithTimeout(10*time.Millisecond, m))
79-
assert.NoError(t, q.QueueWithTimeout(10*time.Millisecond, m))
80-
assert.Equal(t, 100, q.Capacity())
81-
assert.Equal(t, 4, q.Usage())
82-
q.Start()
83-
time.Sleep(40 * time.Millisecond)
84-
q.Release()
85-
}
86-
8762
func TestCapacityReached(t *testing.T) {
8863
w := &messageWorker{
8964
messages: make(chan QueuedMessage, 1),
@@ -160,7 +135,6 @@ func TestQueueTaskJob(t *testing.T) {
160135
return nil
161136
}))
162137
time.Sleep(50 * time.Millisecond)
163-
assert.Equal(t, uint64(0), w.BusyWorkers())
164138
q.Shutdown()
165139
assert.Equal(t, ErrQueueShutdown, q.QueueTask(func(ctx context.Context) error {
166140
return nil

worker.go

-6
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,6 @@ type Worker interface {
1010
Queue(task QueuedMessage) error
1111
// Request to get message from Queue
1212
Request() (QueuedMessage, error)
13-
// Capacity queue capacity = cap(channel name)
14-
Capacity() int
15-
// Usage is how many message in queue
16-
Usage() int
17-
// BusyWorkers return count of busy worker currently
18-
BusyWorkers() uint64
1913
}
2014

2115
// QueuedMessage ...

worker_empty.go

-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,3 @@ func (w *emptyWorker) Run(task QueuedMessage) error { return nil }
99
func (w *emptyWorker) Shutdown() error { return nil }
1010
func (w *emptyWorker) Queue(task QueuedMessage) error { return nil }
1111
func (w *emptyWorker) Request() (QueuedMessage, error) { return nil, nil }
12-
func (w *emptyWorker) Capacity() int { return 0 }
13-
func (w *emptyWorker) Usage() int { return 0 }
14-
func (w *emptyWorker) BusyWorkers() uint64 { return uint64(0) }

worker_message.go

-4
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,3 @@ func (w *messageWorker) Request() (QueuedMessage, error) {
4242
return nil, errors.New("no message in queue")
4343
}
4444
}
45-
46-
func (w *messageWorker) Capacity() int { return cap(w.messages) }
47-
func (w *messageWorker) Usage() int { return len(w.messages) }
48-
func (w *messageWorker) BusyWorkers() uint64 { return uint64(0) }

worker_task.go

-4
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,3 @@ func (w *taskWorker) Request() (QueuedMessage, error) {
4343
return nil, errors.New("no message in queue")
4444
}
4545
}
46-
47-
func (w *taskWorker) Capacity() int { return cap(w.messages) }
48-
func (w *taskWorker) Usage() int { return len(w.messages) }
49-
func (w *taskWorker) BusyWorkers() uint64 { return uint64(0) }

0 commit comments

Comments
 (0)