Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Created subscribe method for Observable spec compatibility #672

Closed
wants to merge 15 commits into from

Conversation

jaidetree
Copy link
Contributor

@jaidetree jaidetree commented Feb 27, 2019

Context

One of my favorite uses of Highland is with gulpjs:

gulp.src('*.scss')
  .pipe(_())
  .filter(f => f.filename.startsWith("_"))
  .flatMap(compileSCSS)
  .pipe(gulp.dest('./public/css'));

The combination eliminates like 12 common gulp plugin dependencies.

With the recent release V4 gulp responds to task functions that return an Observable. Looking through the source, it checks for a subscribe function https://github.com/gulpjs/async-done/blob/457ac2aa6fd04fc5620277a8095fa11e8ef61b65/index.js#L72.

This PR adds a subscribe method with the same signature so that gulp will be able to work with highland streams without having to transform into a promise, callback, or node stream.

Usage

 _([1, 2, 3, 4]).subscribe(
     function onNext (x) {
         // Called for each value that comes downstream
         console.log('Received onNext value', x);
     },
     function onError (err) {
         // Called one time with error or zero if no errors occur upstream
         console.error('Single highland stream error', err);
     },
     function onComplete () {
         // Receives no arguments
         // Called only once when stream is completed.
         console.log('Completed the page.');
     }
 );

Tasks

  • Implemented .subscribe method.
  • Add documentation
  • Created tests to ensure the subscribe method performs as expected
  • All tests are passing

Notes

This was done off the 3x master branch. I'm willing to add it to the 2x branch if desired as well.

@vqvu
Copy link
Collaborator

vqvu commented Feb 28, 2019

I'm not opposed to adding interop with RxJS, but I don't want to do it by pretending to be an Observable. Highland streams currently pretend to be a node Readable and (at times) Writable, and it's caused all kinds of problems (see #671 and #449) due to us not replicating the full observable behavior.

Even with just this subscribe method, what happens when Gulp decides to switch to using .subscribe(observer)? What happens if it decides to subscribe twice? Rx observables generally support multiple independent subscribers, while Highland doesn't.

I would much prefer to add a toRxObservable(Observable) method that takes as input the Rx Observable and calls create(). Would that work for you?

No need to implement it for the 2.x branch.

@jaidetree
Copy link
Contributor Author

jaidetree commented Feb 28, 2019

Ah hey @vqvu it’s been a few years since we last communicated, I am glad to see you’re still maintaining this project. I respect your dilligence and appreciate your time in thinking through this carefully.

This is a too naive an implementation. Additionally, I may have conflated the idea of adding a subscribe method, which I believe is a good interface for consuming values from the streams, with RxJS\Observable compatability.

I would like to add support for an observer like object just in case. In the docs we can either mention that it’s a similar approximation or not mention compatability at all like it’s just a regular consumption method as the others. As a happy side effect the subscribe method is compatible with Gulp tasks and if they do switch to the observer object the subscribe method should still support it. I feel this is important as it gives Highland a higher value proposition for Gulp users (I would like to see it become more common), without the code smell of calling toObservable and having to supply an Observable constructor requiring yet another npm dependency.

Then lets add a toObservable(Constructor) method as you suggested for official Observable spec compliance. That method should follow the spec tightly so that it works with both Rx and similar implementations.

https://tc39.github.io/proposal-observable/

Additionlly, part of the spec proposes a special tagged observable method it looks for when called like Rx.Observable.from(_([1, 2, 3])).

It’s implemeneted in Redux already:
https://github.com/reduxjs/redux/blob/792ac5ae541a7c0792908df8f4e2da334184e74f/src/createStore.js#L248

That should cover the bases of future proofing compatability with Observables and practical usage with today’s tools.

What are your thoughts?

@vqvu
Copy link
Collaborator

vqvu commented Feb 28, 2019

Hi there! I remember you too. It has been a while. I'm still a maintainer. Unfortunately, I don't have the time to make improvements, only bug fixes. That's why 3.0 is still in beta 😅.

Thanks for the pointer to the Observable proposal. I wasn't aware of it. I don't have any objections to the subscribe method if we're advertising ECMAScript Observable compatibility instead of Rx Observable.

I'm happy to accept an implementation for part 3 of that spec. There's no need to implement of or from, since we don't officially expose the constructor. I mean, technically we do via highlandStream.constructor, but my guess is no one uses it. If someone ever requests it, we can figure out a more principled way of exposing the constructor.

There's also no need for toObservable, since Highland streams will officially be Observables.

For the actual implementation, there are a few considerations that I can think of

  • Observable.prototype[Symbol.observable]. From what I can tell, some libraries use symbol-observable, but RxJS doesn't. I generally agree with the RxJS folks when they say in feat(Symbol.observable): is no longer polyfilled ReactiveX/rxjs#3387 that a library shouldn't polyfill Symbol.observable. Given that symbol-observable does actually polyfill I think we should follow RxJS's lead here.
  • Multiple subscriptions. Implicit to the observable model seems to be a notion of reusabillity. You should be able to call subscribe multiple times. I think this means we need to fork the stream for each subscription. unsubscribe would only destroy the fork, and not the original stream. Fortunately, Observables are push-based, so we don't have to worry about backpressure.
  • Subscription after end. What happens if someone subscribes after the source stream has already ended? I don't think it's acceptable for the observer to never receive anything. The choice is whether to call complete or error. My gut feeling is to call fail-fast and call error, since it's unlikely that the caller intended to subscribe after the stream ends. What are your thoughts?

@jaidetree
Copy link
Contributor Author

jaidetree commented Feb 28, 2019

That’s unfortunate to hear highland 3 is stuck in beta, but I have been trying to change my work habits so I do about 20 min - 1.5 hours of sideprojects a day as opposed to larger sprints. I’ve got a couple of small projects lined up for the month but after that I may be able to take a look at what’s left and see if I can contribute.

How have you been? I’ve really taken to streams, FRP, and functional programming and have been transitioning into making Clojure my predominant language. Even launched a blog to cover those topics https://eccentric-j.com. Anyway, I was writing a ClojureScript monitoring service the other day and ended up using interop with Highland JS. It worked really well and may write some articles on using highlandjs with gulp and ClojureScript in the next month.

That overall sounds like a great plan to me.

  1. For the symbol observable that means we should resolve the symbol with a snippet like the following correct?
    export const observable = typeof Symbol === 'function' && Symbol.observable || '@@observable';
  2. Forking the stream as part of subscribing makes sense to me. I think that means it would behave like subscribing to an RxJS subject where new values are emitted to each subscriber before the next value is processed.
  3. Completely agree about throwing an error on subscribing to a completed stream. I believe that’s also how RxJS works, I specifically recall that behavior on Subjects, but I will write a quick codepen test when I get to the office. Even if it turns out not to work like that I’m onboard for the behavior as you wouldn’t get anything useful from doing that.

@jaidetree
Copy link
Contributor Author

  • Implemented .subscribe method.
  • Add documentation
  • Created tests to ensure the subscribe method performs as expected
  • All tests are passing
  • Write tests for updated subscriber behavior
  • Support observer objects
  • Support multiple subscriptions
  • Throw error when subscribing to completed stream
  • Write test for observable\Symbol.observable behavior
  • Implement observable\Symbol.observable behavior

Coming together so far. Ran out of time to setup the observable\symbol observable behavior 😞 but I should be able to get to that tonight or tomorrow morning.

@jaidetree
Copy link
Contributor Author

jaidetree commented Mar 1, 2019

  • Implemented .subscribe method.
  • Add documentation
  • Created tests to ensure the subscribe method performs as expected
  • All tests are passing
  • Write tests for updated subscriber behavior
  • Support observer objects
  • Support multiple subscriptions
  • Throw error when subscribing to completed stream
  • Write test for observable\Symbol.observable behavior
  • Implement observable\Symbol.observable behavior

I think this is good to go 😄

Copy link
Collaborator

@vqvu vqvu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. Thanks!

I just have a few minor comments.

lib/index.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
@vqvu
Copy link
Collaborator

vqvu commented Mar 2, 2019

How have you been? I’ve really taken to streams, FRP, and functional programming and have been transitioning into making Clojure my predominant language. Anyway, I was writing a ClojureScript monitoring service the other day and ended up using interop with Highland JS. It worked really well and may write some articles on using highlandjs with gulp and ClojureScript in the next month.

I've been good. Can't complain. Good to hear that you're liking streams/FRP. It's such a powerful abstraction. I've never used Clojure seriously before, but I have a friend who does, and he loves it. I think most people use Highland in node, so it's pretty cool to hear that it works in other environments too.

I’ve got a couple of small projects lined up for the month but after that I may be able to take a look at what’s left and see if I can contribute.

I'd love to have your help if you have the free time. Off the top of my head, what's remaining is

lib/index.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
@jaidetree
Copy link
Contributor Author

  • Implemented .subscribe method.
  • Add documentation
  • Created tests to ensure the subscribe method performs as expected
  • All tests are passing
  • Write tests for updated subscriber behavior
  • Support observer objects
  • Support multiple subscriptions
  • Throw error when subscribing to completed stream
  • Write test for observable\Symbol.observable behavior
  • Implement observable\Symbol.observable behavior
  • Write ObservableSubscription tests
  • Implement ObservableSubscription class
  • All tests are passing

Notes:

  • Your desire to split up the index.js file makes a lot of sense. Atom, VSCode, and Spacemacs really chug when editing the test.js and index.js files. Fortunately I'm well versed in vim 😆
  • This is my first time really implementing code to a spec document such as this and found myself a bit uncertain how closely I need to match its proposed implementation.
  • Because of the setup I found I had to set the SubscriptionObservable's source property after the consume call to make the linter happy. The spec details that the cleanup function gets called on error and complete.
  • Ideally we should move the stream consumption into the ObservableSubscription class to clean it up more but I would need access to the nil value. How do you feel about creating a nil.js file which does the resolution of a global nil and exports it for files like ObservableSubscription to import?

Anyway this implementation works and the tests pass so it's ready for a review.

lib/observableSubscription.js Outdated Show resolved Hide resolved
lib/observableSubscription.js Outdated Show resolved Hide resolved
lib/observableSubscription.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
@vqvu
Copy link
Collaborator

vqvu commented Mar 3, 2019

This is my first time really implementing code to a spec document such as this and found myself a bit uncertain how closely I need to match its proposed implementation.

I don't have much experience implementing code to a spec document like this either. However, I think the intent of proposed implementation is to unambiguously specify the expected behavior of the classes. As long as the externally observable behavior is the same, we don't need to replicate it directly.

It now occurs to me that you probably created the static create and cleanup methods to match the CreateSubscription and CleanupSubscription specs. I don't think it's necessary to so closely follow the spec. CreateSubscription and CleanupSubscription are abstract operations, so they don't necessarily need to materialize as actual functions with the same signature to adhere to the spec.

Ideally we should move the stream consumption into the ObservableSubscription class to clean it up more but I would need access to the nil value. How do you feel about creating a nil.js file which does the resolution of a global nil and exports it for files like ObservableSubscription to import?

I don't think you need to move the stream consumption code into the ObservableSubscription class, but I don't feel that strongly about it. Feel free to do it if you'd like. A nil.js file is good regardless.

@jaidetree
Copy link
Contributor Author

jaidetree commented Mar 3, 2019

  • Implemented .subscribe method.
  • Add documentation
  • Created tests to ensure the subscribe method performs as expected
  • All tests are passing
  • Write tests for updated subscriber behavior
  • Support observer objects
  • Support multiple subscriptions
  • Throw error when subscribing to completed stream
  • Write test for observable\Symbol.observable behavior
  • Implement observable\Symbol.observable behavior
  • Write ObservableSubscription tests
  • Implement ObservableSubscription class
  • All tests are passing
  • Create nil.js to resolve and export nil value
  • Instantiate ObservableSubscription with the fork instead of the consume
  • Remove create method from ObservableSubscription
  • Move ObservableSubscription cleanup into private prototype _cleanup method
  • Add .closed check to unsubscribe function to ensure destroy is only called once
  • Experiment with moving consume logic into ObservableSubscription
  • Ensure passing tests

Will do! I was mentally wrestling with whether it is better to align the interface or both interface and implementation in accordance to the spec.

@jaidetree
Copy link
Contributor Author

  • Implemented .subscribe method.
  • Add documentation
  • Created tests to ensure the subscribe method performs as expected
  • All tests are passing
  • Write tests for updated subscriber behavior
  • Support observer objects
  • Support multiple subscriptions
  • Throw error when subscribing to completed stream
  • Write test for observable\Symbol.observable behavior
  • Implement observable\Symbol.observable behavior
  • Write ObservableSubscription tests
  • Implement ObservableSubscription class
  • All tests are passing
  • Instantiate ObservableSubscription with the fork instead of the consume
  • Remove create method from ObservableSubscription
  • Move ObservableSubscription cleanup into private prototype _cleanup method
  • Add .closed check to unsubscribe function to ensure destroy is only called once
  • Ensure passing tests
  • Create nil.js to resolve and export nil value
  • Experiment with moving consume logic into ObservableSubscription
  • Ensure passing tests

Making a push now since I'm not sure the outcome of the nil separation and moving the consume logic. Up to you if you would like to wait until those changes are pushed or discarded, or if you would like to review what should be a pretty stable state.

lib/index.js Outdated Show resolved Hide resolved
@jaidetree
Copy link
Contributor Author

  • Implemented .subscribe method.
  • Add documentation
  • Created tests to ensure the subscribe method performs as expected
  • All tests are passing
  • Write tests for updated subscriber behavior
  • Support observer objects
  • Support multiple subscriptions
  • Throw error when subscribing to completed stream
  • Write test for observable\Symbol.observable behavior
  • Implement observable\Symbol.observable behavior
  • Write ObservableSubscription tests
  • Implement ObservableSubscription class
  • All tests are passing
  • Instantiate ObservableSubscription with the fork instead of the consume
  • Remove create method from ObservableSubscription
  • Move ObservableSubscription cleanup into private prototype _cleanup method
  • Add .closed check to unsubscribe function to ensure destroy is only called once
  • Ensure passing tests
  • Return early on consumed subscribe error
  • Create nil.js to resolve and export nil value
  • Experiment with moving consume logic into ObservableSubscription
  • Ensure passing tests

Making a push now since I'm not sure the outcome of the nil separation and moving the consume logic. Up to you if you would like to wait until those changes are pushed or discarded, or if you would like to review what should be a pretty stable state.

@jaidetree
Copy link
Contributor Author

This build failed due to an inconsistent return type I introduced into subscribe. Fixed it with return this; when emitting the error to the observer.

@vqvu
Copy link
Collaborator

vqvu commented Mar 4, 2019

Fixed it with return this; when emitting the error to the observer.

Better to return a subscriber that is already closed. The caller will not be expecting a Highland stream.

lib/observableSubscription.js Outdated Show resolved Hide resolved
Copy link
Contributor Author

@jaidetree jaidetree left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Implemented .subscribe method.
  • Add documentation
  • Created tests to ensure the subscribe method performs as expected
  • All tests are passing
  • Write tests for updated subscriber behavior
  • Support observer objects
  • Support multiple subscriptions
  • Throw error when subscribing to completed stream
  • Write test for observable\Symbol.observable behavior
  • Implement observable\Symbol.observable behavior
  • Write ObservableSubscription tests
  • Implement ObservableSubscription class
  • All tests are passing
  • Instantiate ObservableSubscription with the fork instead of the consume
  • Remove create method from ObservableSubscription
  • Move ObservableSubscription cleanup into private prototype _cleanup method
  • Add .closed check to unsubscribe function to ensure destroy is only called once
  • Ensure passing tests
  • Return early on consumed subscribe error
  • Rename ObservableSubscription.source to _source
  • Return closed ObservableSubscription on consumed subscribe error
  • Ensure passing tests
  • Create nil.js to resolve and export nil value
  • Experiment with moving consume logic into ObservableSubscription
  • Ensure passing tests

Keep them coming 😄

@jaidetree
Copy link
Contributor Author

Also, do you prefer it if I draft solutions and have you review it or would you prefer I ask more questions and discuss options first before implementing?

I've been drafting solutions first as I figure it's more concrete but I realized that may be wasting more of your time.

@vqvu
Copy link
Collaborator

vqvu commented Mar 4, 2019

Whichever one works better for you. It doesn't take me that long to respond to your proposed solution, and having code to comment one is usually easier for me.

lib/index.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
lib/index.js Outdated Show resolved Hide resolved
@jaidetree
Copy link
Contributor Author

jaidetree commented Mar 4, 2019

  • Implemented .subscribe method.
  • Add documentation
  • Created tests to ensure the subscribe method performs as expected
  • All tests are passing
  • Write tests for updated subscriber behavior
  • Support observer objects
  • Support multiple subscriptions
  • Throw error when subscribing to completed stream
  • Write test for observable\Symbol.observable behavior
  • Implement observable\Symbol.observable behavior
  • Write ObservableSubscription tests
  • Implement ObservableSubscription class
  • All tests are passing
  • Instantiate ObservableSubscription with the fork instead of the consume
  • Remove create method from ObservableSubscription
  • Move ObservableSubscription cleanup into private prototype _cleanup method
  • Add .closed check to unsubscribe function to ensure destroy is only called once
  • Ensure passing tests
  • Return early on consumed subscribe error
  • Rename ObservableSubscription.source to _source
  • Return closed ObservableSubscription on consumed subscribe error
  • Ensure passing tests
  • Fix typo in subscribe jsdocs
  • Subscribe early return should call ._cleanup to reduce duplication
  • Fix subscribe ended logic to not depend on observer.error being present 😵
  • Confirm passing tests
  • Create nil.js to resolve and export nil value
  • Experiment with moving consume logic into ObservableSubscription
  • Ensure passing tests

@jaidetree
Copy link
Contributor Author

That process works for me, though there's been a few sloppy mistakes I've let through the cracks and solutions that seemed obvious in your code review I missed in implementation. Perhaps practice will steer my intuition?

@jaidetree jaidetree changed the title Created subscribe method for RxJS\Observable compatibility Created subscribe method for Observable spec compatibility Mar 4, 2019
@vqvu
Copy link
Collaborator

vqvu commented Mar 8, 2019

Closing this in favor of #672.

@vqvu vqvu closed this Mar 8, 2019
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants