Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Destroying parent after merge, zipAll, sequence et al #412

Open
svozza opened this issue Nov 28, 2015 · 15 comments
Open

Destroying parent after merge, zipAll, sequence et al #412

svozza opened this issue Nov 28, 2015 · 15 comments
Labels

Comments

@svozza
Copy link
Collaborator

svozza commented Nov 28, 2015

Starting a sperate issue for this rather than resurrect a closed thread. In #402 we realised that some of the non-consume based functions need to be handled differently. The choices we have are:

  • Destroy parent as well as all streams it's currently consuming from, but not any stream that it's not yet consumed.
  • Destroy parent and all streams it's consuming from regardless.
  • Only destroy the parent. This is what happens now.

Option 1 seems to be the least bad approach and it could be supplemented by implementing an equivalent of Rx's using operator.

Do we want to make the using function one of the prerequisites of the PR that addresses this?

@vqvu
Copy link
Collaborator

vqvu commented Nov 29, 2015

I think if we go with option 1, we need to have using as part of the same PR.

@svozza
Copy link
Collaborator Author

svozza commented Nov 29, 2015

Yep, I agree. Is the name using OK given its similarity to use, which we use for a completely different purpose?

@vqvu
Copy link
Collaborator

vqvu commented Nov 29, 2015

The benefit of using is matching method name with Rx, but I see your point. We could call it withResource or usingResource to make it's functionality explicit.

@svozza
Copy link
Collaborator Author

svozza commented Mar 11, 2016

I realise this is one of the issues that's holding up 3.0.0 and I have time to start tomorrow but I'm not really sure where to start with the using function (it's been a while).

@vqvu
Copy link
Collaborator

vqvu commented Mar 11, 2016

I think this is the signature we want.

/**
 * Creates a new `Stream` that depends on a particular resource. When it is first consumed, this
 * stream will construct a resource by calling `resourceFactory`, then pass the resource to
 * `streamFactory` to construct a delegate stream. It will then emit all values that are emitted
 * from the delegate stream. Finally, when it is destroyed, it will destroy the delegate stream
 * as well as the resource by calling the `destructor`.
 *
 * This is useful when constructing a stream that depends on a particular resource.
 *
 * @param {Function} resourceFactory - a function that returns some resource object.
 * @param {Function} streamFactory - a function that when passed a resource from
 * the `resourceFactory`, returns a Highland stream.
 * @param {Function} destructor - a function that will destroy the resource from `resourceFactory`.
 * @returns Stream
 */
_.using = function (resourceFactory, streamFactory, destructor) {
}

Alternatively, we can implement lazyCreate (name WIP) and make using a convenience function.

/**
 * Creates a new `Stream` that lazily constructs a *source* stream and then emits values from it.
 * The source stream will be destroyed when this stream is destroyed. This is useful when stream
 * creation is expensive and you are not sure if the stream will even be used. It is also useful
 * when the stream depends on a resource that must be released but is going to be used with a
 * [higher-level transform](#Higher-order%20Streams) that may not even consume from it.
 *
 * @param {Function} streamFactory - a function that returns a stream.
 * @returns Stream
 *
 * function makeStream() {
 *     console.log('makeStream');
 *     return _([1, 2, 3]);
 * }
 *
 * // makeStream will only be called once.
 * _([_.lazyCreate(makeStream), _.lazyCreate(makeStream)])
 *     .sequence()
 *     .take(3)
 *     .each(_.log);
 *
 * // => makeStream
 * // => 1
 * // => 2
 * // => 3
 */
_.lazyCreate = function (streamFactory) {
}

_.using = function (resourceFactory, streamFactory, destructor) {
    return _.lazyCreate(() => {
        var rsrc = resourceFactory();
        return streamFactory(rsrc)
            .onDestroy(() => destructor(rsrc));
    });
};

@svozza
Copy link
Collaborator Author

svozza commented Mar 12, 2016

Thanks @vqvu . I'll probably try to keep it simple initially and implement the first one. I should get to this tonight.

@svozza
Copy link
Collaborator Author

svozza commented Mar 13, 2016

Just so I'm sure I'm on the right track, when I add something like this onto, say, sequence then I get the expected behaviour:

.onDestroy(function() {
            self.destroy();
            if (self._consumer != null) {
                self._consumer.destroy();
            }
        });

@vqvu
Copy link
Collaborator

vqvu commented Mar 13, 2016

You don't need to destroy the _consumer. It's the consumer's responsibility to destroy the source not the other way around.

With sequence, you just need to destroy its sources, which are original and curr. No need for self.destroy() since original === self.

@svozza
Copy link
Collaborator Author

svozza commented Mar 15, 2016

Yeah, I get you. I just threw that together as a test really.

So I've got an interesting case here with parallel. If I create a destructor function like this:

.onDestroy(function() {
            source.destroy();
            for (var i = 0; i < running.length; i++) {
                running[i].stream.destroy();
            }
        });

If I try to consume from two infinite streams it still consumes from the second stream even though I only want to take one item. I then get an error because obviously the second stream doesn't exist anymore:

/home/stefano/git/highland/lib/index.js:1152
if (gen.ConsumeGenerator) {
^

TypeError: Cannot read property 'ConsumeGenerator' of null
at Stream._runGenerator (/home/stefano/git/highland/lib/index.js:1152:16)
at Immediate._onImmediate (/home/stefano/git/highland/lib/index.js:698:26)
at processImmediate as _immediateCallback

Apologies for all the questions but I've been away from this for so long that it's taking a while for things to click again.

@vqvu
Copy link
Collaborator

vqvu commented Mar 15, 2016

Yeah, looks like the engine doesn't handle premature destroys very well. I think this fixes things. I'll need to go back and make sure the rest of the engine knows about premature destroys.

diff --git a/lib/index.js b/lib/index.js
index 9dd5b24..c69defc 100755
--- a/lib/index.js
+++ b/lib/index.js
@@ -662,6 +662,10 @@ function Stream(generator) {
             throw new Error('Can not write to stream after nil');
         }

+        if (self.ended) {
+            return;
+        }
+
         if (x === nil) {
             self._nil_seen = true;

@@ -682,6 +686,10 @@ function Stream(generator) {
             throw new Error('Can not call next after nil');
         }

+        if (self.ended) {
+            return;
+        }
+
         self._generator_requested = true;
         if (xs) {
             xs = self.create(xs);
@@ -695,7 +703,11 @@ function Stream(generator) {
             }
             else {
                 _.setImmediate(function () {
-                    self._runGenerator();
+                    // It's possible that we are destroyed while waiting to
+                    // execute this callback.
+                    if (!self.ended) {
+                        self._runGenerator();
+                    }
                 });
             }
         }
@@ -922,11 +934,22 @@ Stream.prototype._onEnd = function _onEnd() {
         this._destructors[i].call(this);
     }

-    this._generator = null;
-    this._request = null;
     this._outgoing.clear();
     this._observers = [];
     this._destructors = [];
+    this._send_events = false;
+
+    this._request = null;
+    this._multiplexer = null;
+    this._consumer = null;
+
+    this._generator = null;
+    this._generator_requested = false;
+    this._defer_run_generator = false;
+    this._run_generator_deferred = false;
+
+    this.readable = false;
+    this.writable = false;
 };

 /**
@@ -1100,8 +1123,6 @@ addMethod('destroy', function () {
         return;
     }

-    this.readable = this.writable = false;
-
     this.end();
     this._onEnd();
 });

@svozza
Copy link
Collaborator Author

svozza commented Mar 15, 2016

Yep, that stopped there error but the second generator is still getting consumed.

@vqvu
Copy link
Collaborator

vqvu commented Mar 15, 2016

Is your generator synchronous? Infinite synchronous generators don't really work with parallel at all at the moment. It seems to be non-trivial to fix.

If you mean the second generator is consume once, I think that's normal. parallel doesn't lazily consume its sources.

var a = _(function (push, next) {
    setTimeout(function () {
        console.log('a');
        push(null, 1);
        next();
    }, 0);
});

var b = _(function (push, next) {
    setTimeout(function () {
        console.log('b');
        push(null, 2);
        next();
    }, 0);
});

var s = _([a, b]).parallel(2)
    .take(1)
    .each(a => console.log('abc: ' + a));

// => a
// abc: 1
// b

@svozza
Copy link
Collaborator Author

svozza commented Mar 15, 2016

Yeah, my generators are basically exactly the same as that except the setTimeout value is set to 10. I guess I'll need a different onDestroy test for parallel and merge because the current one explicitly checks that the generator functions only get called once.

@pavel
Copy link

pavel commented Oct 3, 2016

Just wondering, why sequence does not use consume as a base of implementation?
Something like this:

function consumeStreams(stream) {
    return stream.consume(function (err, data, push, next) {
        if (err != null) {
            push(err, data);
            next();
        } else if (data === Stream.nil) {
            return push(null, data);
        } else if (!Stream.isStream(data)) {
            push(null, data);
            next();
        } else {
            console.log("async stream");
            data.each(function (val) {
                push(null, val);
            }).done(next);
        }
    });
}

would be very convenient to have in the core. It just consumes Stream of Streams and it does propagate destroy through consumers/forks graph. BTW, wouldn't it be better to have consume for Stream of Streams separate from Stream of Arrays?

@vqvu
Copy link
Collaborator

vqvu commented Oct 3, 2016

The reason is laziness. We do not want to consume from a stream any more than absolutely necessary. This means we need to respect the internal backpressure protocol.

consume simplifies this protocol for the common case where you want to fetch an element, then convert it to some (or none) other elements eagerly. But if you want to do that conversion lazily, like in the case of sequence, we need to drop down to the lower-level pull. This is why all of the higher-level transforms use pull rather than consume.

Take your proposed code, for example. data.each is not lazy. You end up consuming all of data regardless of whether or not downstream needs it. If data is streaming in some large table from a database, this is clearly not what we want.

Even if we switched to using data.pull, there's still no way to say "notify me when you need more data". We can't call next because that will consume another element from stream!

BTW, wouldn't it me better to have consume for Stream of Streams separate from Stream of Arrays?

This is legacy behavior. sequence has just always worked with Streams of Arrays.

I agree with you that the API would be cleaner if it didn't special-case Arrays. It's just that no one has ever proposed changing it. I wouldn't mind changing it to only work with Stream of Streams in 3.x. There's probably not a lot of people depending on this behavior (though who knows).

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants