Skip to content

Commit a0cf321

Browse files
authored
feat: add ability to call a function after the job function runs (#125)
1 parent 0c677f4 commit a0cf321

File tree

2 files changed

+13
-0
lines changed

2 files changed

+13
-0
lines changed

options.go

+8
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,21 @@ func WithFn(fn func(context.Context, core.QueuedMessage) error) Option {
7373
})
7474
}
7575

76+
// WithAfterFn set callback function after job done
77+
func WithAfterFn(afterFn func()) Option {
78+
return OptionFunc(func(q *Options) {
79+
q.afterFn = afterFn
80+
})
81+
}
82+
7683
// Options for custom args in Queue
7784
type Options struct {
7885
workerCount int
7986
logger Logger
8087
queueSize int
8188
worker core.Worker
8289
fn func(context.Context, core.QueuedMessage) error
90+
afterFn func()
8391
metric Metric
8492
}
8593

queue.go

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type (
2929
worker core.Worker
3030
stopOnce sync.Once
3131
stopFlag int32
32+
afterFn func()
3233
}
3334
)
3435

@@ -46,6 +47,7 @@ func NewQueue(opts ...Option) (*Queue, error) {
4647
logger: o.logger,
4748
worker: o.worker,
4849
metric: &metric{},
50+
afterFn: o.afterFn,
4951
}
5052

5153
if q.worker == nil {
@@ -163,6 +165,9 @@ func (q *Queue) work(task core.QueuedMessage) {
163165
} else {
164166
q.metric.IncFailureTask()
165167
}
168+
if q.afterFn != nil {
169+
q.afterFn()
170+
}
166171
}()
167172

168173
if err = q.run(task); err != nil {

0 commit comments

Comments
 (0)