@@ -15,9 +15,13 @@ var errMaxCapacity = errors.New("max capacity reached")
15
15
16
16
// Consumer for simple queue using buffer channel
17
17
type Consumer struct {
18
- taskQueue chan core.QueuedMessage
18
+ sync.Mutex
19
+ taskQueue []core.QueuedMessage
19
20
runFunc func (context.Context , core.QueuedMessage ) error
20
- stop chan struct {}
21
+ capacity int
22
+ count int
23
+ head int
24
+ tail int
21
25
exit chan struct {}
22
26
logger Logger
23
27
stopOnce sync.Once
@@ -36,52 +40,80 @@ func (s *Consumer) Shutdown() error {
36
40
}
37
41
38
42
s .stopOnce .Do (func () {
39
- close (s .stop )
40
- close (s .taskQueue )
41
- if len (s .taskQueue ) > 0 {
43
+ if s .count > 0 {
42
44
<- s .exit
43
45
}
44
46
})
45
47
return nil
46
48
}
47
49
48
50
// Queue send task to the buffer channel
49
- func (s * Consumer ) Queue (task core.QueuedMessage ) error {
51
+ func (s * Consumer ) Queue (task core.QueuedMessage ) error { //nolint:stylecheck
50
52
if atomic .LoadInt32 (& s .stopFlag ) == 1 {
51
53
return ErrQueueShutdown
52
54
}
53
-
54
- select {
55
- case s .taskQueue <- task :
56
- return nil
57
- default :
55
+ if s .count >= s .capacity {
58
56
return errMaxCapacity
59
57
}
58
+
59
+ s .Lock ()
60
+ if s .count == len (s .taskQueue ) {
61
+ s .resize (s .count * 2 )
62
+ }
63
+ s .taskQueue [s .tail ] = task
64
+ s .tail = (s .tail + 1 ) % len (s .taskQueue )
65
+ s .count ++
66
+ s .Unlock ()
67
+
68
+ return nil
60
69
}
61
70
62
71
// Request a new task from channel
63
72
func (s * Consumer ) Request () (core.QueuedMessage , error ) {
64
- select {
65
- case task , ok := <- s .taskQueue :
66
- if ! ok {
67
- select {
68
- case s .exit <- struct {}{}:
69
- default :
70
- }
71
- return nil , ErrQueueHasBeenClosed
73
+ if atomic .LoadInt32 (& s .stopFlag ) == 1 && s .count == 0 {
74
+ select {
75
+ case s .exit <- struct {}{}:
76
+ default :
72
77
}
73
- return task , nil
74
- default :
78
+ return nil , ErrQueueHasBeenClosed
79
+ }
80
+
81
+ if s .count == 0 {
75
82
return nil , ErrNoTaskInQueue
76
83
}
84
+ s .Lock ()
85
+ data := s .taskQueue [s .head ]
86
+ s .head = (s .head + 1 ) % len (s .taskQueue )
87
+ s .count --
88
+
89
+ if n := len (s .taskQueue ) / 2 ; n > 2 && s .count <= n {
90
+ s .resize (n )
91
+ }
92
+ s .Unlock ()
93
+
94
+ return data , nil
95
+ }
96
+
97
+ func (q * Consumer ) resize (n int ) {
98
+ nodes := make ([]core.QueuedMessage , n )
99
+ if q .head < q .tail {
100
+ copy (nodes , q .taskQueue [q .head :q .tail ])
101
+ } else {
102
+ copy (nodes , q .taskQueue [q .head :])
103
+ copy (nodes [len (q .taskQueue )- q .head :], q .taskQueue [:q .tail ])
104
+ }
105
+
106
+ q .tail = q .count % n
107
+ q .head = 0
108
+ q .taskQueue = nodes
77
109
}
78
110
79
- // NewConsumer for create new consumer instance
111
+ // NewConsumer for create new Consumer instance
80
112
func NewConsumer (opts ... Option ) * Consumer {
81
113
o := NewOptions (opts ... )
82
114
w := & Consumer {
83
- taskQueue : make (chan core.QueuedMessage , o . queueSize ),
84
- stop : make ( chan struct {}) ,
115
+ taskQueue : make ([] core.QueuedMessage , 2 ),
116
+ capacity : o . queueSize ,
85
117
exit : make (chan struct {}),
86
118
logger : o .logger ,
87
119
runFunc : o .fn ,
0 commit comments