Skip to content

Commit

Permalink
fix compile problem under go 1.3 version and add some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xiemalin committed Jun 30, 2020
1 parent fab4ed6 commit 4f93c6e
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 9 deletions.
26 changes: 21 additions & 5 deletions filequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
fileSuffix = ".dat"
)

// default options
var DefaultOptions = &Options{
DataPageSize: DefaultDataPageSize,
indexPageSize: defaultIndexPageSize,
Expand Down Expand Up @@ -68,7 +69,7 @@ type FileQueue struct {
// lock for enqueue state management
enqueueLock sync.Mutex

// locks for queue front write management S
// locks for queue front write management
queueFrontWriteLock sync.Mutex

path string
Expand All @@ -78,9 +79,11 @@ type FileQueue struct {
metaFile *DB
frontFile *DB

// queue options
options *Options
}

// Open the queue files
func (q *FileQueue) Open(dir string, queueName string, options *Options) error {
if len(dir) == 0 {
return errors.New("parameter 'dir' can not be blank.")
Expand All @@ -94,7 +97,7 @@ func (q *FileQueue) Open(dir string, queueName string, options *Options) error {
options = DefaultOptions
}
q.options = options
q.options.itemsPerPage = 1 << q.options.IndexItemsPerPage
q.options.itemsPerPage = 1 << uint(q.options.IndexItemsPerPage)
q.options.indexPageSize = defaultIndexItemLen * q.options.itemsPerPage

path := dir + "/" + queueName
Expand Down Expand Up @@ -147,10 +150,12 @@ func (q *FileQueue) Open(dir string, queueName string, options *Options) error {
return nil
}

// Determines whether a queue is empty
func (q *FileQueue) IsEmpty() bool {
return q.FrontIndex >= q.HeadIndex
}

// Total number of items available in the queue.
func (q *FileQueue) Size() int64 {
sz := q.HeadIndex - q.FrontIndex
if sz < 0 {
Expand All @@ -159,6 +164,8 @@ func (q *FileQueue) Size() int64 {
return int64(sz)
}

// Adds an item at the queue and HeadIndex will increase
// Asynchouous mode will call back with fn function
func (q *FileQueue) EnqueueAsync(data []byte, fn func(int64, error)) {
go q.doEnqueueAsync(data, fn)
}
Expand All @@ -168,6 +175,7 @@ func (q *FileQueue) doEnqueueAsync(data []byte, fn func(int64, error)) {
fn(index, err)
}

// Adds an item at the queue and HeadIndex will increase
func (q *FileQueue) Enqueue(data []byte) (int64, error) {
sz := len(data)
if sz == 0 {
Expand Down Expand Up @@ -200,7 +208,7 @@ func (q *FileQueue) Enqueue(data []byte) (int64, error) {
q.headDataItemOffset = q.headDataItemOffset + int64(sz)

toAppendArrayIndex := q.HeadIndex
toAppendIndexPageIndex := toAppendArrayIndex >> q.options.IndexItemsPerPage
toAppendIndexPageIndex := toAppendArrayIndex >> uint(q.options.IndexItemsPerPage)

indexDB, err := q.indexFile.acquireDB(toAppendIndexPageIndex)
if err != nil {
Expand Down Expand Up @@ -235,6 +243,7 @@ func (q *FileQueue) Enqueue(data []byte) (int64, error) {
return toAppendArrayIndex, nil
}

// Retrieves and removes the front of a queue
func (q *FileQueue) Dequeue() (int64, []byte, error) {

if q.IsEmpty() {
Expand All @@ -250,6 +259,8 @@ func (q *FileQueue) Dequeue() (int64, []byte, error) {
return index, bb, err
}

// Retrieves the item at the front of a queue
// if item exist return with index id, item data
func (q *FileQueue) Peek() (int64, []byte, error) {
if q.IsEmpty() {
return -1, nil, nil
Expand Down Expand Up @@ -407,7 +418,7 @@ func (q *FileQueue) initDataPageIndex() error {

func (q *FileQueue) getIndexItemArray(index int64) ([]byte, error) {
// calc index page no
previousIndexPageIndex := index >> q.options.IndexItemsPerPage
previousIndexPageIndex := index >> uint(q.options.IndexItemsPerPage)

indexDB, err := q.indexFile.acquireDB(previousIndexPageIndex)
if err != nil {
Expand Down Expand Up @@ -473,6 +484,11 @@ func (q *FileQueue) Close() error {
return nil
}

// Delete all used data files to free disk space.
//
// BigQueue will persist enqueued data in disk files, these data files will remain even after
// the data in them has been dequeued later, so your application is responsible to periodically call
// this method to delete all used data files and free disk space.
func (q *FileQueue) Gc() error {
frontIndex := q.FrontIndex

Expand All @@ -487,7 +503,7 @@ func (q *FileQueue) Gc() error {
return err
}

indexPageIndex := frontIndex >> q.options.IndexItemsPerPage
indexPageIndex := frontIndex >> uint(q.options.IndexItemsPerPage)
bb, err := q.getIndexItemArray(frontIndex)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion mmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (db *DB) Open(mode os.FileMode) error {
// Open data file and separate sync handler for metadata writes.
var err error

if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil {
if db.file, err = os.OpenFile(db.Path(), flag|os.O_CREATE, mode); err != nil {
_ = db.close()
return err
}
Expand Down
3 changes: 3 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ type Queue interface {
Skip(count int64) error

Close() error

// Delete all used data files to free disk space.
Gc() error
}
7 changes: 4 additions & 3 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func GetFileName(prefix string, suffix string, index int64) string {
return prefix + strconv.Itoa(int(index)) + suffix
}

//整形转换成字节
//int64 to byte array
func IntToBytes(n int64) []byte {
x := int64(n)
bytesBuffer := bytes.NewBuffer([]byte{})
binary.Write(bytesBuffer, binary.BigEndian, x)
return bytesBuffer.Bytes()
}

//字节转换成整形
//byte to int64
func BytesToInt(b []byte) int64 {
bytesBuffer := bytes.NewBuffer(b)

Expand All @@ -65,6 +65,7 @@ func BytesToInt(b []byte) int64 {
return int64(x)
}

// bytes to int32
func BytesToInt32(b []byte) int32 {
bytesBuffer := bytes.NewBuffer(b)

Expand All @@ -75,7 +76,7 @@ func BytesToInt32(b []byte) int32 {
}

func Mod(val int64, bits int) int64 {
return val - ((val >> bits) << bits)
return val - ((val >> uint(bits)) << uint(bits))
}

func GetFiles(pathname string) (*list.List, error) {
Expand Down

0 comments on commit 4f93c6e

Please # to comment.