-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.go
149 lines (126 loc) · 2.74 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// gspooling is an asynchronous, thread-safe
// fixed-size, buffered and easy to use fifo queue.
//
// func main() {
// queue := gspooling.NewQueue(2)
// queue.Put(10)
// queue.Put(2)
//
// v1, _ := queue.Get()
// v2, _ := queue.Get()
//
// fmt.Println("data 1: ", v1, " data 2: ", v2)
// }
//
package gspooling
import "errors"
var (
// error throwed if the queue is closed.
QueueAlreadyClosedErr = errors.New("Queue already closed.")
NilDataErr = errors.New("Data cannot be nil.")
)
type Queue struct {
closed bool
buffer *stackBuffer
sh *signalHandler
eput chan error
eget chan error
put chan interface{}
get chan interface{}
}
// Returns a new fixed size Queue
// otherwise returns an error.
func NewQueue(size int) (*Queue, error) {
bff, err := newStackBuffer(size)
if err != nil {
return nil, err
}
// error channels
eput := make(chan error, 1)
eget := make(chan error, 1)
// input and output channel
put := make(chan interface{}, 1)
get := make(chan interface{}, 1)
queue := &Queue{
put: put,
get: get,
eput: eput,
eget: eget,
buffer: bff,
closed: false,
sh: newSignalHandler(),
}
queue.parallelizeStackBuffer()
return queue, nil
}
// Put the buffer in a go thread
func (sq *Queue) parallelizeStackBuffer() {
go func() {
for {
sg := sq.sh.handleNotification()
if sg != nil {
if sg.isPut() {
err := sq.buffer.put(<-sq.put)
sq.eput <- err
} else if sg.isGet() {
d, err := sq.buffer.get()
sq.eget <- err
sq.get <- d
} else if sg.isClose() {
break
}
}
}
}()
}
// Put data into the buffered queue.
// Return an error if buffered queue is full.
//
// Posible errors:
// - "queue already closed." -> queue was closed already.
// - "Buffer is full." -> queue is full of data.
//
func (sq *Queue) Put(data interface{}) error {
if sq.IsClosed() {
return QueueAlreadyClosedErr
}
if data == nil {
return NilDataErr
}
sq.sh.notifyPut()
sq.put <- data
return <-sq.eput
}
// Get data from the buffered queue.
// Return an error if buffered queue is empty.
//
// Posible errors:
// - "queue already closed." -> queue was closed already.
// - "Buffer is empty." -> queue is empty
func (sq *Queue) Get() (interface{}, error) {
if sq.IsClosed() {
return nil, QueueAlreadyClosedErr
}
sq.sh.notifyGet()
return <-sq.get, <-sq.eget
}
// It will try to close the Queue.
func (sq *Queue) Close() error {
if !sq.IsClosed() {
// notify the buffer
// that need to be closed.
sq.sh.notifyClose()
close(sq.eget)
close(sq.eput)
close(sq.get)
close(sq.put)
close(sq.sh.sg)
sq.closed = true
return nil
}
return QueueAlreadyClosedErr
}
//verify if queue is closed.
func (sq *Queue) IsClosed() bool {
return sq.closed
}