Skip to content

antoniomo/bloque

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 

Repository files navigation

BloQue, a Block Queue of stacked items

Wait, what? What the title means is that this data structure is a block queue, that is, blocks are FIFO (First In First Out), of stacked items, that is, items are a stack LIFO (Last In First Out). A block is just a bunch of items.

Why is this is useful?

A use case I had was required adding items one by one to a data structure to act like a cache, and once enough of them where inserted, or after a pre-specified time had passed, I needed to get the first inserted X elements to batch-process them (send to a data pipeline, or bulk-write to a DB). A normal queue would do, but then I would need to loop over the first X elements, popping them one at a time. Add in concurrency, and you need to hold a lock all that looping time.

With this BloQue data structure you can just add elements one at a time, and once you are ready to consume them you just pop the front block of the queue. That's a very fast operation (just touching a few pointers) so it can be done holding the lock for very little time (or if there's only one producer, no locking needed), and then you can keep adding elements while processing this front block elsewhere.

The other "extra" stack operations such as Pop and Peek items (from the back) are added for convenience, as well.

I haven't added full deque API on blocks (from the back) and items (from the front) because it gets trickier and I don't see the use case for it, but please raise an issue if needed :)

Example usage

package main

import (
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/antoniomo/bloque"
)

const (
	elementsPerBlock = 1024
	// Just to show the last block with less than full size
	itemsToProcess = (elementsPerBlock * 10) - 3
)

func consumer(wg *sync.WaitGroup, readyBlocks <-chan bloque.BlockT) {
	// Lets keep a counter of processed blocks
	processedBlocks := 0
	totalProcessedItems := 0

	for blk := range readyBlocks {
		fmt.Printf("Processing BloQue %d, items %d\n", processedBlocks, len(blk))
		processedBlocks++
		totalProcessedItems += len(blk)
		for _, item := range blk {
			// Is this what we expect?
			_, ok := item.(int)
			if !ok {
				log.Panic("oh no, wrong type assertion!")
			}
			// Simulate consumption/processing of the item
			time.Sleep(time.Nanosecond)
		}
	}

	wg.Done()
}

func producer(wg *sync.WaitGroup, readyBlocks chan bloque.BlockT) {

	// Lets prepare a BloQue with a custom elementsPerBlock
	b := bloque.New(bloque.BlockSize(elementsPerBlock))

	for i := 0; i < itemsToProcess; i++ {
		if isCompleted := b.PushBackItem(i); isCompleted {
			// Block completed, send to the processing channel
			frontBlock, _ := b.PopFrontBlock()
			readyBlocks <- frontBlock
		}
	}
	// Last block wasn't completed but lets say we want to process it right
	// away without waiting for more data
	frontBlock, ok := b.PopFrontBlock()
	if !ok {
		log.Panic("ouch, there should be items here")
	}
	readyBlocks <- frontBlock

	close(readyBlocks) // All done, close channel

	wg.Done()
}

func main() {
	fmt.Printf("Processing %d items, block size %d\n", itemsToProcess, elementsPerBlock)

	// Channel to write ready blocks
	readyBlocks := make(chan bloque.BlockT)

	wg := &sync.WaitGroup{}
	wg.Add(2)
	go producer(wg, readyBlocks)
	go consumer(wg, readyBlocks)

	wg.Wait()
}

About

BloQue, a Block Queue of stacked items

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages