diff --git a/lib/env.js b/lib/env.js index 894ad52..9dbb9a2 100644 --- a/lib/env.js +++ b/lib/env.js @@ -1,5 +1,5 @@ const envalid = require('envalid') -const {str, url, json, bool} = envalid +const {str, url, json, bool, num} = envalid const urlArray = envalid.makeValidator(x => { const urls = json()._parse(x) @@ -16,5 +16,6 @@ module.exports = envalid.cleanEnv(process.env, { NODE_ENV: str({choices: ['development', 'staging', 'production'], devDefault: 'development'}), ROLLBAR_TOKEN_CHANGES: str({devDefault: ''}), STATSD_HOST: str({default: '172.17.0.1'}), + REGISTRY_CHANGE_DELAY: num({default: 1000 * 60, devDefault: 1000}), IS_ENTERPRISE: bool({default: false}) }) diff --git a/lib/follow.js b/lib/follow.js index 30a6972..e230fc8 100644 --- a/lib/follow.js +++ b/lib/follow.js @@ -9,6 +9,8 @@ const cleanDoc = require('normalize-registry-metadata') const env = require('./env') const rollbar = require('./rollbar') +const { asyncTimeout } = require('./util') + const statsdClient = new StatsD({ host: env.STATSD_HOST, prefix: 'changes.', @@ -75,7 +77,10 @@ async function handleChange ({channel, client, registry}, change) { } try { - await channel.sendToQueue(env.QUEUE_NAME, Buffer.from(JSON.stringify(payload)), {priority: 1}) + await asyncTimeout(env.REGISTRY_CHANGE_DELAY, async () => { + const payloadBuffer = Buffer.from(JSON.stringify(payload)) + await channel.sendToQueue(env.QUEUE_NAME, payloadBuffer, {priority: 1}) + }) } catch (err) { rollbar.error(err) } diff --git a/lib/util.js b/lib/util.js new file mode 100644 index 0000000..cd5f96f --- /dev/null +++ b/lib/util.js @@ -0,0 +1,12 @@ +// https://stackoverflow.com/questions/33289726/combination-of-async-function-await-settimeout#33292942 +function sleep (ms) { + return new Promise((resolve, reject) => setTimeout(resolve, ms)) +} + +async function asyncTimeout (ms, fn, ...args) { + await sleep(ms) + return fn(...args) +} + +module.exports.sleep = sleep +module.exports.asyncTimeout = asyncTimeout diff --git a/test/follow.js b/test/follow.js index 6b1e207..f6583b0 100644 --- a/test/follow.js +++ b/test/follow.js @@ -7,6 +7,7 @@ const {test, afterEach, tearDown} = require('tap') const proxyquire = require('proxyquire') const env = require('../lib/env') +const { sleep } = require('../lib/util') class ChangesStream extends EventEmitter { constructor (opts) { @@ -161,7 +162,18 @@ const { } } }) - const job = await channel.get(env.QUEUE_NAME) + + let job + do { + try { + job = await channel.get(env.QUEUE_NAME) + await sleep(env.REGISTRY_CHANGE_DELAY / 2) + } catch (e) { + t.error(e) + break + } + } while (job === false) + t.same(JSON.parse(job.content.toString()), { name: 'registry-change', dependency: 'package',