Skip to content

zlib: do not coalesce multiple .flush() calls #28520

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions lib/zlib.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const {
} = require('buffer');
const { owner_symbol } = require('internal/async_hooks').symbols;

const kFlushFlag = Symbol('kFlushFlag');

const constants = internalBinding('constants').zlib;
const {
// Zlib flush levels
Expand Down Expand Up @@ -261,7 +263,6 @@ function ZlibBase(opts, mode, handle, { flush, finishFlush, fullFlush }) {
this._chunkSize = chunkSize;
this._defaultFlushFlag = flush;
this._finishFlushFlag = finishFlush;
this._nextFlush = -1;
this._defaultFullFlushFlag = fullFlush;
this.once('end', this.close);
this._info = opts && opts.info;
Expand Down Expand Up @@ -308,21 +309,35 @@ ZlibBase.prototype._flush = function(callback) {

// If a flush is scheduled while another flush is still pending, a way to figure
// out which one is the "stronger" flush is needed.
// This is currently only used to figure out which flush flag to use for the
// last chunk.
// Roughly, the following holds:
// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH <
// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH
const flushiness = [];
let i = 0;
for (const flushFlag of [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH]) {
const kFlushFlagList = [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH];
for (const flushFlag of kFlushFlagList) {
flushiness[flushFlag] = i++;
}

function maxFlush(a, b) {
return flushiness[a] > flushiness[b] ? a : b;
}

const flushBuffer = Buffer.alloc(0);
// Set up a list of 'special' buffers that can be written using .write()
// from the .flush() code as a way of introducing flushing operations into the
// write sequence.
const kFlushBuffers = [];
{
const dummyArrayBuffer = new ArrayBuffer();
for (const flushFlag of kFlushFlagList) {
kFlushBuffers[flushFlag] = Buffer.from(dummyArrayBuffer);
kFlushBuffers[flushFlag][kFlushFlag] = flushFlag;
}
}

ZlibBase.prototype.flush = function(kind, callback) {
const ws = this._writableState;

Expand All @@ -337,13 +352,8 @@ ZlibBase.prototype.flush = function(kind, callback) {
} else if (ws.ending) {
if (callback)
this.once('end', callback);
} else if (this._nextFlush !== -1) {
// This means that there is a flush currently in the write queue.
// We currently coalesce this flush into the pending one.
this._nextFlush = maxFlush(this._nextFlush, kind);
} else {
this._nextFlush = kind;
this.write(flushBuffer, '', callback);
this.write(kFlushBuffers[kind], '', callback);
}
};

Expand All @@ -361,9 +371,8 @@ ZlibBase.prototype._transform = function(chunk, encoding, cb) {
var flushFlag = this._defaultFlushFlag;
// We use a 'fake' zero-length chunk to carry information about flushes from
// the public API to the actual stream implementation.
if (chunk === flushBuffer) {
flushFlag = this._nextFlush;
this._nextFlush = -1;
if (typeof chunk[kFlushFlag] === 'number') {
flushFlag = chunk[kFlushFlag];
}

// For the last chunk, also apply `_finishFlushFlag`.
Expand Down
39 changes: 0 additions & 39 deletions test/parallel/test-zlib-flush-multiple-scheduled.js

This file was deleted.

57 changes: 57 additions & 0 deletions test/parallel/test-zlib-flush-write-sync-interleaved.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { createGzip, createGunzip, Z_PARTIAL_FLUSH } = require('zlib');

// Verify that .flush() behaves like .write() in terms of ordering, e.g. in
// a sequence like .write() + .flush() + .write() + .flush() each .flush() call
// only affects the data written before it.
// Refs: https://github.com/nodejs/node/issues/28478

const compress = createGzip();
const decompress = createGunzip();
decompress.setEncoding('utf8');

const events = [];
const compressedChunks = [];

for (const chunk of ['abc', 'def', 'ghi']) {
compress.write(chunk, common.mustCall(() => events.push({ written: chunk })));
compress.flush(Z_PARTIAL_FLUSH, common.mustCall(() => {
events.push('flushed');
const chunk = compress.read();
if (chunk !== null)
compressedChunks.push(chunk);
}));
}

compress.end(common.mustCall(() => {
events.push('compress end');
writeToDecompress();
}));

function writeToDecompress() {
// Write the compressed chunks to a decompressor, one by one, in order to
// verify that the flushes actually worked.
const chunk = compressedChunks.shift();
if (chunk === undefined) return decompress.end();
decompress.write(chunk, common.mustCall(() => {
events.push({ read: decompress.read() });
writeToDecompress();
}));
}

process.on('exit', () => {
assert.deepStrictEqual(events, [
{ written: 'abc' },
'flushed',
{ written: 'def' },
'flushed',
{ written: 'ghi' },
'flushed',
'compress end',
{ read: 'abc' },
{ read: 'def' },
{ read: 'ghi' }
]);
});
1 change: 0 additions & 1 deletion test/parallel/test-zlib-write-after-flush.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ for (const [ createCompress, createDecompress ] of [
gunz.on('data', (c) => output += c);
gunz.on('end', common.mustCall(() => {
assert.strictEqual(output, input);
assert.strictEqual(gzip._nextFlush, -1);
}));

// Make sure that flush/write doesn't trigger an assert failure
Expand Down