Skip to content

Commit

Permalink
message-store: postgres: read: support sql condition
Browse files Browse the repository at this point in the history
  • Loading branch information
mpareja committed Jul 30, 2020
1 parent 1140d2b commit a47a536
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
33 changes: 33 additions & 0 deletions message-store/postgres/test/message-store-postgres.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const createTestLog = require('../../../test/test-log')
const { asyncIterableToArray } = require('../../../test/async-iterable-to-array')
const { generateGetCategorySuite } = require('../../test/get-category-test-suite')
const { generateGetLastSuite } = require('../../test/get-last-test-suite')
const { generateGetStreamSuite } = require('../../test/get-stream-test-suite')
Expand Down Expand Up @@ -131,6 +132,38 @@ describe('message-store-postgres', () => {
})
})

describe('read', () => {
describe('sql condition', () => {
describe('when not provided', () => {
it('returns all results', async () => {
const log = createTestLog()
const messageStore = createMessageStore({ log })
const { category } = await examplePutCategory(messageStore, { count: 3 })

const results = await asyncIterableToArray(messageStore.read(category))

expect(results.length).toBe(3)
})
})

describe('when provided', () => {
it('limits the results based on given sql condition', async () => {
const log = createTestLog()
const messageStore = createMessageStore({ log })
const { category, messages } = await examplePutCategory(messageStore, { count: 3, trackMessages: true })

const expectedMessage = messages[2]

const CONDITION_SQL = `messages.id = '${expectedMessage.id}'`
const results = await asyncIterableToArray(messageStore.read(category, { condition: CONDITION_SQL }))

expect(results.length).toBe(1)
expect(results[0]).toMatchObject(expectedMessage)
})
})
})
})

describe('put', () => {
describe('connection error', () => {
it('propagates error', async () => {
Expand Down
8 changes: 6 additions & 2 deletions message-store/read.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
const { StreamName } = require('./stream-name')

// read implementation offered to message-store implementations that
// retrieve messages in batches
module.exports = ({ batchSize, get }) => {
const read = async function * (streamName, { position } = {}) {
const read = async function * (streamName, options = {}) {
let position = options.position

let results
do {
results = await get(streamName, { position })
results = await get(streamName, { ...options, position })
for (const result of results) {
position = StreamName.isCategory(streamName)
? result.globalPosition + 1
Expand Down
34 changes: 34 additions & 0 deletions message-store/test/read.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const createRead = require('../read')
const { asyncIterableToArray } = require('../../test/async-iterable-to-array')
const { exampleReadMessageData, exampleStreamName } = require('../examples')

describe('read', () => {
describe('given messages in stream', () => {
it('yields results', async () => {
const streamName = exampleStreamName()
const expected = [
exampleReadMessageData(),
exampleReadMessageData()
]

const get = jest.fn().mockResolvedValue(expected)
const { read } = createRead({ batchSize: 10, get })
const results = await asyncIterableToArray(read(streamName)) // process all records

expect(results).toEqual(expected)
})
})

describe('given a custom option', () => {
it('custom option is forwarded to get implementation', async () => {
const streamName = exampleStreamName()
const customOption = 'customValue'
const get = jest.fn().mockResolvedValue([])

const { read } = createRead({ batchSize: 10, get })
await asyncIterableToArray(read(streamName, { customOption })) // process all records

expect(get).toHaveBeenCalledWith(streamName, { customOption })
})
})
})

0 comments on commit a47a536

Please # to comment.