Skip to content

Commit

Permalink
feat!: Switch to streamx (#9)
Browse files Browse the repository at this point in the history
chore: Update README to reflect implementation
chore: Update example to show streamx

Co-authored-by: Blaine Bublitz <blaine.bublitz@gmail.com>
  • Loading branch information
coreyfarrell and phated authored Sep 7, 2022
1 parent 0d3eb42 commit cf656cf
Show file tree
Hide file tree
Showing 4 changed files with 465 additions and 135 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@

[![NPM version][npm-image]][npm-url] [![Downloads][downloads-image]][npm-url] [![Build Status][ci-image]][ci-url] [![Coveralls Status][coveralls-image]][coveralls-url]

Wrap a ReadableStream in a TransformStream.
Wrap a `Readable` stream in a `Transform` stream.

## Usage

```js
var from = require('from2');
var { Readable } = require('streamx');
var concat = require('concat-stream');
var toThrough = require('to-through');

var readable = from([' ', 'hello', ' ', 'world']);
var readable = Readable.from([' ', 'hello', ' ', 'world']);

// Can be used as a Readable or Transform
var maybeTransform = toThrough(readable);

from(['hi', ' ', 'there', ','])
Readable.from(['hi', ' ', 'there', ','])
.pipe(maybeTransform)
.pipe(
concat(function (result) {
// result.toString() === 'hi there, hello world'
// result === 'hi there, hello world'
})
);
```
Expand All @@ -35,7 +35,8 @@ from(['hi', ' ', 'there', ','])

### `toThrough(readableStream)`

Takes a `readableStream` as the only argument and returns a `through2` stream. If the returned stream is piped before `nextTick`, the wrapped `readableStream` will not flow until the upstream is flushed. If the stream is not piped before `nextTick`, it is ended and flushed (acting as a proper readable).
Takes a `Readable` stream as the only argument and returns a `Transform` stream wrapper. Any data
piped into the `Transform` stream is piped passed along before any data from the wrapped `Readable` is injected into the stream.

## License

Expand Down
111 changes: 93 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,118 @@
'use strict';

var through = require('through2');
var Transform = require('streamx').Transform;

function forward(chunk, enc, cb) {
cb(null, chunk);
}
// Based on help from @mafintosh via https://gist.github.com/mafintosh/92836a8d03df0ef41356e233e0f06382

function toThrough(readable) {
var opts = {
objectMode: readable._readableState.objectMode,
highWaterMark: readable._readableState.highWaterMark,
};
var highWaterMark = readable._readableState.highWaterMark;

// Streamx uses 16384 as the default highWaterMark for everything and then
// divides it by 1024 for objects
// However, node's objectMode streams the number of objects as highWaterMark, so we need to
// multiply the objectMode highWaterMark by 1024 to make it streamx compatible
if (readable._readableState.objectMode) {
highWaterMark = readable._readableState.highWaterMark * 1024;
}

var destroyedByError = false;
var readableClosed = false;
var readableEnded = false;

function flush(cb) {
var self = this;

readable.on('readable', onReadable);
readable.on('end', cb);
// Afer all writes have drained, we change the `_read` implementation
self._read = function (cb) {
readable.resume();
cb();
};

readable.on('data', onData);
readable.once('error', onError);
readable.once('end', onEnd);

function cleanup() {
readable.off('data', onData);
readable.off('error', onError);
readable.off('end', onEnd);
}

function onReadable() {
var chunk;
while ((chunk = readable.read())) {
self.push(chunk);
function onData(data) {
var drained = self.push(data);
// When the stream is not drained, we pause it because `_read` will be called later
if (!drained) {
readable.pause();
}
}

function onError(err) {
cleanup();
cb(err);
}

function onEnd() {
cleanup();
cb();
}
}

var wrapper = through(opts, forward, flush);
// Handle the case where a user destroyed the returned stream
function predestroy() {
// Only call destroy on the readable if this `predestroy` wasn't
// caused via the readable having an `error` or `close` event
if (destroyedByError) {
return;
}
if (readableClosed) {
return;
}
readable.destroy(new Error('Wrapper destroyed'));
}

var wrapper = new Transform({
highWaterMark: highWaterMark,
flush: flush,
predestroy: predestroy,
});

// Forward errors from the underlying stream
readable.once('error', onError);
readable.once('end', onEnd);
readable.once('close', onClose);

function onError(err) {
destroyedByError = true;
wrapper.destroy(err);
}

function onEnd() {
readableEnded = true;
}

function onClose() {
readableClosed = true;
// Only destroy the wrapper if the readable hasn't ended successfully
if (!readableEnded) {
wrapper.destroy();
}
}

var shouldFlow = true;
wrapper.once('pipe', onPipe);
wrapper.on('piping', onPiping);
wrapper.on('newListener', onListener);
readable.on('error', wrapper.emit.bind(wrapper, 'error'));

function onPiping() {
maybeFlow();
wrapper.off('piping', onPiping);
wrapper.off('newListener', onListener);
}

function onListener(event) {
// Once we've seen the data or readable event, check if we need to flow
if (event === 'data' || event === 'readable') {
maybeFlow();
this.removeListener('newListener', onListener);
onPiping();
}
}

Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "to-through",
"version": "2.0.0",
"description": "Wrap a ReadableStream in a TransformStream.",
"description": "Wrap a Readable stream in a Transform stream.",
"author": "Gulp Team <team@gulpjs.com> (https://gulpjs.com/)",
"contributors": [
"Blaine Bublitz <blaine.bublitz@gmail.com>"
Expand All @@ -22,16 +22,16 @@
"test": "nyc mocha --async-only"
},
"dependencies": {
"through2": "^2.0.3"
"streamx": "^2.12.5"
},
"devDependencies": {
"eslint": "^7.32.0",
"eslint-config-gulp": "^5.0.1",
"eslint-plugin-node": "^11.1.0",
"expect": "^27.4.2",
"mississippi": "^4.0.0",
"mocha": "^8.4.0",
"nyc": "^15.1.0"
"nyc": "^15.1.0",
"readable-stream": "^3.6.0"
},
"nyc": {
"reporter": [
Expand Down
Loading

0 comments on commit cf656cf

Please # to comment.