Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

chore(consumer): move run model to queue main package #91

Merged
merged 2 commits into from
Dec 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 2 additions & 79 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
)

var _ core.Worker = (*Consumer)(nil)
Expand All @@ -29,84 +27,9 @@ type Consumer struct {
requestTimeout time.Duration
}

func (s *Consumer) handle(m *job.Message) error {
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), m.Timeout)
defer func() {
cancel()
}()

// run the job
go func() {
// handle panic issue
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()

// run custom process function
var err error
loop:
for {
if m.Task != nil {
err = m.Task(ctx)
} else {
err = s.runFunc(ctx, m)
}

// check error and retry count
if err == nil || m.RetryCount == 0 {
break
}
m.RetryCount--

select {
case <-time.After(m.RetryDelay): // retry delay time
case <-ctx.Done(): // timeout reached
err = ctx.Err()
break loop
}
}

done <- err
}()

select {
case p := <-panicChan:
panic(p)
case <-ctx.Done(): // timeout reached
return ctx.Err()
case <-s.stop: // shutdown service
// cancel job
cancel()

leftTime := m.Timeout - time.Since(startTime)
// wait job
select {
case <-time.After(leftTime):
return context.DeadlineExceeded
case err := <-done: // job finish
return err
case p := <-panicChan:
panic(p)
}
case err := <-done: // job finish
return err
}
}

// Run to execute new task
func (s *Consumer) Run(task core.QueuedMessage) error {
data := task.(*job.Message)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
}

return s.handle(data)
func (s *Consumer) Run(ctx context.Context, task core.QueuedMessage) error {
return s.runFunc(ctx, task)
}

// Shutdown the worker
Expand Down
115 changes: 0 additions & 115 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,121 +207,6 @@ func TestGoroutinePanic(t *testing.T) {
q.Release()
}

func TestHandleTimeout(t *testing.T) {
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
w := NewConsumer(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return nil
}),
)

err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)

m = &job.Message{
Timeout: 150 * time.Millisecond,
Payload: []byte("foo"),
}

w = NewConsumer(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return nil
}),
)

done := make(chan error)
go func() {
done <- w.handle(m)
}()

err = <-done
assert.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
}

func TestJobComplete(t *testing.T) {
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
w := NewConsumer(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
return errors.New("job completed")
}),
)

err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

m = &job.Message{
Timeout: 250 * time.Millisecond,
Payload: []byte("foo"),
}

w = NewConsumer(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
time.Sleep(200 * time.Millisecond)
return errors.New("job completed")
}),
)

done := make(chan error)
go func() {
done <- w.handle(m)
}()

err = <-done
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)
}

func TestTaskJobComplete(t *testing.T) {
m := &job.Message{
Timeout: 100 * time.Millisecond,
Task: func(ctx context.Context) error {
return errors.New("job completed")
},
}
w := NewConsumer()

err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

m = &job.Message{
Timeout: 250 * time.Millisecond,
Task: func(ctx context.Context) error {
return nil
},
}

w = NewConsumer()
done := make(chan error)
go func() {
done <- w.handle(m)
}()

err = <-done
assert.NoError(t, err)

// job timeout
m = &job.Message{
Timeout: 50 * time.Millisecond,
Task: func(ctx context.Context) error {
time.Sleep(60 * time.Millisecond)
return nil
},
}
assert.Equal(t, context.DeadlineExceeded, w.handle(m))
}

func TestIncreaseWorkerCount(t *testing.T) {
w := NewConsumer(
WithLogger(NewEmptyLogger()),
Expand Down
4 changes: 3 additions & 1 deletion core/worker.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package core

import "context"

// Worker interface
type Worker interface {
// Run is called to start the worker
Run(task QueuedMessage) error
Run(ctx context.Context, task QueuedMessage) error
// Shutdown is called if stop all worker
Shutdown() error
// Queue to send message in Queue
Expand Down
9 changes: 5 additions & 4 deletions mocks/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 82 additions & 1 deletion queue.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package queue

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
)
Expand Down Expand Up @@ -167,11 +169,90 @@ func (q *Queue) work(task core.QueuedMessage) {
}
}()

if err = q.worker.Run(task); err != nil {
if err = q.run(task); err != nil {
q.logger.Errorf("runtime error: %s", err.Error())
}
}

func (q *Queue) run(task core.QueuedMessage) error {
data := task.(*job.Message)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
}

return q.handle(data)
}

func (q *Queue) handle(m *job.Message) error {
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), m.Timeout)
defer func() {
cancel()
}()

// run the job
go func() {
// handle panic issue
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()

// run custom process function
var err error
loop:
for {
if m.Task != nil {
err = m.Task(ctx)
} else {
err = q.worker.Run(ctx, m)
}

// check error and retry count
if err == nil || m.RetryCount == 0 {
break
}
m.RetryCount--

select {
case <-time.After(m.RetryDelay): // retry delay time
case <-ctx.Done(): // timeout reached
err = ctx.Err()
break loop
}
}

done <- err
}()

select {
case p := <-panicChan:
panic(p)
case <-ctx.Done(): // timeout reached
return ctx.Err()
case <-q.quit: // shutdown service
// cancel job
cancel()

leftTime := m.Timeout - time.Since(startTime)
// wait job
select {
case <-time.After(leftTime):
return context.DeadlineExceeded
case err := <-done: // job finish
return err
case p := <-panicChan:
panic(p)
}
case err := <-done: // job finish
return err
}
}

// UpdateWorkerCount to update worker number dynamically.
func (q *Queue) UpdateWorkerCount(num int) {
q.workerCount = num
Expand Down
Loading