Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

less memory leak #9

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 23 additions & 50 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { Transform } = require('stream');
const { Transform } = require("stream");

/**
* The ReadlineTransform is reading String or Buffer content from a Readable stream
Expand All @@ -8,64 +8,37 @@ const { Transform } = require('stream');
* @param {Boolean} opts.ignoreEndOfBreak - if content ends with line break, ignore last empty line (default: true)
* @param {Boolean} opts.skipEmpty - if line is empty string, skip it (default: false)
*/
const NEWLINE = 0x0a;
class ReadlineTransform extends Transform {
constructor(options) {
const opts = options || {};
opts.objectMode = true;
super(opts);
this._brRe = opts.breakMatcher || /\r?\n/;
this._ignoreEndOfBreak = 'ignoreEndOfBreak' in opts ? Boolean(opts.ignoreEndOfBreak) : true;
this._skipEmpty = Boolean(opts.skipEmpty);
this._buf = null;
constructor() {
super();
this._buf = Buffer.alloc(1024);
this.wptr = 0;
}

_transform(chunk, encoding, cb) {
let str;
if (Buffer.isBuffer(chunk) || encoding === 'buffer') {
str = chunk.toString('utf8');
} else {
str = chunk;
_transform(chunk, enc, cb) {
let lb;
while ((lb = chunk.indexOf(NEWLINE)) >= 0 && chunk.length) {
this._emitData(chunk.slice(0, lb + 1));
chunk = chunk.slice(lb + 1);
}

try {
if (this._buf !== null) {
this._buf += str;
} else {
this._buf = str;
}

const lines = this._buf.split(this._brRe);
const lastIndex = lines.length - 1;
for (let i = 0; i < lastIndex; i++) {
this._writeItem(lines[i]);
}

const lastLine = lines[lastIndex];
if (lastLine.length) {
this._buf = lastLine;
} else if (!this._ignoreEndOfBreak) {
this._buf = '';
} else {
this._buf = null;
}
cb();
} catch(err) {
cb(err); // invalid data type;
if (chunk.length) {
chunk.copy(this._buf, this.wptr, 0, chunk.length);
this.wptr += chunk.length;
}
cb(null, null);
}

_flush(cb) {
if (this._buf !== null) {
this._writeItem(this._buf);
this._buf = null;
_emitData(slice) {
if (this.wptr) {
this.emit("data", Buffer.concat([this._buf.slice(0, this.wptr), slice]));
this.wptr = 0;
} else {
this.emit("data", slice);
}
cb();
}

_writeItem(line) {
if (line.length > 0 || !this._skipEmpty) {
this.push(line);
}
_flush(cb) {
cb(this._buf.slice(0, this.wptr));
}
}

Expand Down
178 changes: 42 additions & 136 deletions test/index.test.js
Original file line number Diff line number Diff line change
@@ -1,139 +1,45 @@
const assert = require('assert');
const { PassThrough } = require('stream');
const ReadlineTransform = require('../');
const MemoryWriteStream = require('./memory_write_stream');

describe('ReadlineTransform', () => {

context('data ends without line break', () => {
it('transforms all lines', (done) => {
const readStream = new PassThrough();
const transform = new ReadlineTransform();
const writeStream = new MemoryWriteStream();

writeStream.on('finish', () => {
assert.deepEqual(writeStream.data, ['foo', 'bar', 'baz']);
done();
});

readStream.pipe(transform).pipe(writeStream);

readStream.write(Buffer.from('foo\nba'));
readStream.write('r\r');
readStream.end(Buffer.from('\nbaz'));
});

context('data contains empty lines and skipEmpty option is true', () => {
it('transforms with dropping empty lines', (done) => {
const readStream = new PassThrough();
const transform = new ReadlineTransform({ skipEmpty: true });
const writeStream = new MemoryWriteStream();

writeStream.on('finish', () => {
assert.deepEqual(writeStream.data, ['foo', 'bar', 'baz']);
done();
});

readStream.pipe(transform).pipe(writeStream);

readStream.write('foo\nba');
readStream.write(Buffer.from('r\r\n\n\r'));
readStream.end(Buffer.from('\nbaz'));
});
})
})

context('data ends with line break', () => {
it('transforms all lines except last empty line', (done) => {
const readStream = new PassThrough();
const transform = new ReadlineTransform();
const writeStream = new MemoryWriteStream();

writeStream.on('finish', () => {
assert.deepEqual(writeStream.data, ['foo', 'bar', '', 'baz']);
done();
});

readStream.pipe(transform).pipe(writeStream);

readStream.write(Buffer.from('foo\r\nbar\n'));
readStream.end('\r\nbaz\r\n');
const assert = require("assert");
const { PassThrough } = require("stream");
const ReadlineTransform = require("../");
const MemoryWriteStream = require("./memory_write_stream");

// let describe = (str, cb) => console.log(str) && cb();
// let it = (str, cb) => console.log(str) && cb();

describe("ReadlineTransform", () => {
it("should work", () => {
const { Transform, Stream } = require("stream");
let t = new Stream.PassThrough();
const g = new ReadlineTransform();
const quote = Buffer.from('"');

const inputFormatter = new Transform({
transform: (buf, enc, cb) => {
cb(
null,
Buffer.from('\ninput: "' + buf.toString().replace("\n", "\\n") + '"')
);
},
});

context('ignoreEndOfBreak is false', () => {
it('transforms all lines', (done) => {
const readStream = new PassThrough();
const transform = new ReadlineTransform({ ignoreEndOfBreak: false });
const writeStream = new MemoryWriteStream();

writeStream.on('finish', () => {
assert.deepEqual(writeStream.data, ['foo', 'bar', '', 'baz', '']);
done();
});

readStream.pipe(transform).pipe(writeStream);

readStream.write(Buffer.from('foo\r\nbar\n'));
readStream.end('\r\nbaz\r\n');
});
})

context('skipEmpty option is true', () => {
it('transforms with dropping empty lines', (done) => {
const readStream = new PassThrough();
const transform = new ReadlineTransform({ skipEmpty: true });
const writeStream = new MemoryWriteStream();

writeStream.on('finish', () => {
assert.deepEqual(writeStream.data, ['foo', 'bar', 'baz']);
done();
});

readStream.pipe(transform).pipe(writeStream);

readStream.write('foo\r\nbar\n');
readStream.end(Buffer.from('\r\nbaz\r\n'));
});
})

context('ignoreEndOfBreak is false and skipEmpty option is true', () => {
it('works with dropping all empty lines', (done) => {
const readStream = new PassThrough();
const transform = new ReadlineTransform({ ignoreEndOfBreak: false, skipEmpty: true });
const writeStream = new MemoryWriteStream();

writeStream.on('finish', () => {
assert.deepEqual(writeStream.data, ['foo', ' ', 'bar']);
done();
});

readStream.pipe(transform).pipe(writeStream);

readStream.write(Buffer.from('foo\n \n'));
readStream.write('\n\n');
readStream.write(Buffer.from('bar\n'));
readStream.end();
});
})

})

context('line break is special', () => {
it('transforms with dropping last empty line', (done) => {
const readStream = new PassThrough();
const transform = new ReadlineTransform({ breakMatcher: '_\n' });
const writeStream = new MemoryWriteStream();

writeStream.on('finish', () => {
assert.deepEqual(writeStream.data, ['', 'foo', 'bar', 'baz', '']);
done();
});

readStream.pipe(transform).pipe(writeStream);

readStream.write(Buffer.from('_\nfoo_\nbar_\nbaz_\n_\n'));
readStream.end();
const outputFormatter = new Transform({
transform: (buf, enc, cb) => {
cb(
null,
Buffer.from('\nOutput: "' + buf.toString().replace("\n", "\\n") + '"')
);
},
});
})

})
t.pipe(inputFormatter).pipe(process.stdout);
t.pipe(g).pipe(outputFormatter).pipe(process.stdout);
t.write("line1 hello!\n");
console.log("buf:" + g._buf);
t.write("line2 \n line3 yadayada");
console.log("buf:" + g._buf);

t.write("yada \n line4");
console.log("buf:" + g._buf);
console.log("flushing");
g._flush(console.log);
});
});