Skip to content

Commit 73c8a83

Browse files
committed
chore(job): support retry count and delay between retry
Signed-off-by: Bo-Yi.Wu <appleboy.tw@gmail.com>
1 parent a819ea1 commit 73c8a83

File tree

4 files changed

+96
-8
lines changed

4 files changed

+96
-8
lines changed

consumer.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,21 @@ func (s *Consumer) handle(m *job.Message) error {
4747
}()
4848

4949
// run custom process function
50-
if m.Task != nil {
51-
done <- m.Task(ctx)
52-
} else {
53-
done <- s.runFunc(ctx, m)
50+
var err error
51+
for i := 0; i < (int(m.RetryCount) + 1); i++ {
52+
if i != 0 {
53+
time.Sleep(m.RetryDelay)
54+
}
55+
if m.Task != nil {
56+
err = m.Task(ctx)
57+
} else {
58+
err = s.runFunc(ctx, m)
59+
}
60+
if err == nil {
61+
break
62+
}
5463
}
64+
done <- err
5565
}()
5666

5767
select {

consumer_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,3 +449,73 @@ func TestHandleAllJobBeforeShutdownConsumerInQueue(t *testing.T) {
449449
q.Release()
450450
assert.Len(t, messages, 2)
451451
}
452+
453+
func TestRetryCountWithNewMessage(t *testing.T) {
454+
controller := gomock.NewController(t)
455+
defer controller.Finish()
456+
457+
m := mocks.NewMockQueuedMessage(controller)
458+
m.EXPECT().Bytes().Return([]byte("test")).AnyTimes()
459+
460+
messages := make(chan string, 10)
461+
count := 1
462+
463+
w := NewConsumer(
464+
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
465+
if count%3 != 0 {
466+
count++
467+
return errors.New("count not correct")
468+
}
469+
messages <- string(m.Bytes())
470+
return nil
471+
}),
472+
)
473+
474+
q, err := NewQueue(
475+
WithLogger(NewLogger()),
476+
WithWorker(w),
477+
WithWorkerCount(1),
478+
)
479+
assert.NoError(t, err)
480+
481+
assert.NoError(t, q.Queue(
482+
m,
483+
job.WithRetryCount(3),
484+
job.WithRetryDelay(50*time.Millisecond),
485+
))
486+
assert.Len(t, messages, 0)
487+
q.Start()
488+
q.Release()
489+
assert.Len(t, messages, 1)
490+
}
491+
492+
func TestRetryCountWithNewTask(t *testing.T) {
493+
messages := make(chan string, 10)
494+
count := 1
495+
496+
w := NewConsumer()
497+
498+
q, err := NewQueue(
499+
WithLogger(NewLogger()),
500+
WithWorker(w),
501+
WithWorkerCount(1),
502+
)
503+
assert.NoError(t, err)
504+
505+
assert.NoError(t, q.QueueTask(
506+
func(ctx context.Context) error {
507+
if count%3 != 0 {
508+
count++
509+
return errors.New("count not correct")
510+
}
511+
messages <- "foobar"
512+
return nil
513+
},
514+
job.WithRetryCount(3),
515+
job.WithRetryDelay(50*time.Millisecond),
516+
))
517+
assert.Len(t, messages, 0)
518+
q.Start()
519+
q.Release()
520+
assert.Len(t, messages, 1)
521+
}

job/job.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@ type Message struct {
1717

1818
// Timeout is the duration the task can be processed by Handler.
1919
// zero if not specified
20+
// default is 60 time.Minute
2021
Timeout time.Duration `json:"timeout"`
2122

2223
// Payload is the payload data of the task.
2324
Payload []byte `json:"body"`
2425

25-
// RetryCount retry count if failure
26+
// RetryCount set count of retry
27+
// default is 10
2628
RetryCount int64 `json:"retry_count"`
2729

28-
// RetryCount retry count if failure
30+
// RetryDelay set delay between retry
31+
// default is 100ms
2932
RetryDelay time.Duration `json:"retry_delay"`
3033
}
3134

job/option.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@ func (f OptionFunc) apply(option *Options) {
2121
f(option)
2222
}
2323

24-
func NewOptions(opts ...Option) *Options {
25-
o := &Options{
24+
func newDefaultOptions() *Options {
25+
return &Options{
2626
retryCount: 0,
2727
retryDelay: 100 * time.Millisecond,
2828
timeout: 60 * time.Minute,
2929
}
30+
}
31+
32+
// NewOptions with custom parameter
33+
func NewOptions(opts ...Option) *Options {
34+
o := newDefaultOptions()
3035

3136
// Loop through each option
3237
for _, opt := range opts {

0 commit comments

Comments
 (0)