-
Notifications
You must be signed in to change notification settings - Fork 534
Example RangePublisher.java should call onComplete #458
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Comments
No. Rule §1.8 requires the producer to stop producing signals when cancellation is requested.
The Do you want to implement some Publisher that has to deal with item lifecycle and/or resource utilization? |
No, not Since there is no One part of the spec requires
So, if |
When cancelling, the Subscriber is shutting down, i.e. it can transition into its terminal state. Further signals from the Publisher may arrive, but there is no expectation that these will be serviced. There is deliberately no synchronization regarding cancellation since this upstream signal is sent after the downstream has stopped caring. |
Why do you want to establish this type of signal pattern? There are techniques to serialize async cancellation with ongoing onNext servicing so that a thread unsafe resource can be released. For example, trampolining or queue-drain with or without thread confinement, actors, event loops, etc. |
No, the question is the other way around: why would you want to avoid notifying a terminal state in a process that knows when it terminated? :) I can live with underspecified behaviour. Just don't want to do things that aren't necessary. |
Since there is no requirement (and no mechanism) to serialize calls to Basically, the |
When the Subscriber calls |
You cannot demand a "Beginning" a cleanup is not a problem. The problem is it is required to be mutually exclusive with Such mutual exclusion is easy and cheap to achieve in a synchronous All because invocation of I am after a reason to not require the |
@olotenko I'm not sure I understand the problem. If the Subscriber is async then there already needs to exist a happens-before relationship between signalling onNext and processing that onNext. onComplete/onError is only there to signal End-of-data and terminal stream failure. Note that the only way that onNext might come in after cancel() only applies if the Subscriber cancels while there's still outstanding unmet demand signalled by it. Could you outline more in detail what you perceive the problem to be? |
Rule §3.1
The key phrase here is "The Subscriber is in control". If it needs to coordinate signals, then it has to do it on its own. The Publisher is responsible to send items as requested and stop when the Subscribers says so. The Publisher has no understanding or expectation about what and when the Subscriber wants to do those things and is limited to the protocol described by the spec. I don't understand why there is an issue here years after the spec has been finalized and adopted by many implementations without such problems.
Yes. Whenever asynchrony is involved in a Subscriber, one must account for the serialization requirement of RS as well as whatever external relationships has to be maintained concurrently to what happens through the RS protocol. If you are in control of both the Publisher and Subscriber, the spec allows onNext signals to be sent just right after a cancellation has been issued (§2.8). Although the wording is limited to onNext, I'm sure expecting the terminal events to still happen is also necessary by implementations (i.e., cancelling just after the last item and before an onComplete that would have arrived next). I have extensive experience with implementing exotic Publishers and Subscribers that includes working out correct and performant serialization and coordination algorithms. If you have something concrete you need help with implementing, I'd be glad to help. |
@viktorklang The happens-befores between The distinction of A motivating scenario A
I posit that this problem should not exist, because |
@akarnokd What is a "context"? :)
They haven't thought of a motivating scenario and I am late to the party, that's all :) |
@olotenko |
Why is the Subscriber issuing a cancel and where does it originate? The philosophy behind having only 7 methods is that the less constraints and interactions each component has to deal with the more efficient implementations there could be. Requiring cancel to trigger an onComplete adds an extra call which is unnecessary most of the cases. It would require logic in every Subscriber in the chain to defend against calling onComplete downstream multiple times in case there is a cancel propagating up, hitting each Subscriber.
When there are no more uses for it. Reference counting (possibly combined with some state machine) could work here.
The can know how many future onNext to expect as it issued the Perhaps the most relatable operator for this case is |
I am not sure why it matters what causes the
Reference counting does not work without
I take the discussions of the ways to try and solve the problem as the agreement that the spec does not allow the I appreciate the power of precedent, and that even if this change were adopted, I would still have to deal with old implementations that won't issue |
This indicates you need both the Publisher and Subscriber to be resource aware, which is not the aim of the RS specification and also wouldn't work between heterogeneous libraries unaware of those resources. I experimented with resource-aware flows a couple of years ago but it is quite tedious and there was no practical demand for such extensions. For example, a resource-aware iterable-wrapper must deal with the rest of the unconsumed items. An async consumer has to work out when to cleanup when receiving a cancel at anytime and also keep cleaning up for late items. |
Precisely why there should be two edges into clean-up ( I'll take a look at that resource-aware. |
I'm still not sure I understand the problem. The Subscriber cannot assume that the Publisher doesn't have other Subscribers. :( |
@viktorklang that's a separate issue. The issue at hand is that the spec allows (and judging by the discussion - encourages) the |
@olotenko You are posing two questions, let’s clearly separate them:
For the first question, it is crucial to consider the scope and purpose of Reactive Streams: it is a set of interfaces for interoperability of different stream processing libraries within the same JVM. As such, the notion of an open world is ingrained in these interfaces, they are meant to compose larger systems from small pieces that may be built on different foundations. As @akarnokd hinted at, building a resource-aware streaming framework is very challenging, I posit that applying such a framework is only possible in a closed world scenario where you are in control of the whole streaming pipeline—as a counter-example consider that someone might easily slip a The second question is different (as shown above such a confirmation mechanism does not solve the first problem). Here, one guiding design principle of Reactive Streams is that it does not handle confirmations at all, neither upstream nor downstream. We explicitly discussed whether |
I disagree with the second part of the statement. The "boundaries" between Non-termination is a fundamental problem. See, the chain of
From the point of view of what the API promises, asynchrony here is immaterial. In the end, However, return vs break is a fundamental difference. From the Not being able to distinguish natural end of stream from reaction to cancellation is not a fundamental problem - you can signal that distinction using a different terminating condition, even Not being able to tell that the |
@olotenko |
@viktorklang I don't buy the This issue is a grim reminder that async processing is a dual of sync processing. We waited for so long for Obviously, everyone has to deal with |
@olotenko There is nothing which prevents the Subscriber from invoking its own onComplete after issuing the cancel signal, assuming that it iself disambiguates. Otherwise there is no way to distinguish a true EOF from a cancel-induced one. |
@viktorklang It is not about invoking Of course, there is no way to distinguish between a true EOF and a cancel-induced one, but that is a problem only when the A The importance of knowing that the I thank everyone for offers of help with implementing a workaround for this design quirk, but this issue is really about the protocol quirk. |
@olotenko If you want to create this "guarantee" yourself, it's not a lot of work to wrap your Subscriber in another Subscriber which will drop all signals that are received after cancel is invoked. Changing the protocol to send onComplete after seeing cancel still means that elements could arrive inbetween, and the Subscriber would still need to ignore those. |
@viktorklang It is amazing with which speed people can churn out "workarounds" for fundamental issues. It's not like I cannot come up with something like that. Only this workaround does not work with resource management use case - because the protocol does not implement the async dual of I am tired of explaining. No one is interested. I'll just close this issue. Reopen if you think it is worthwhile getting to the bottom of how to get the async |
I'm surprised is how often people come into established projects and try to change it just because they have an idea or use case that doesn't fit into that established scheme, then get upset because the project leads refuse to change around things that would break the existing ecosystem. If the Reactive Streams protocol doesn't fit your requirements, write your own protocol, create your own ecosystem and create support libraries yourself. |
@olotenko This protocol is not intended to be used for async resource management as every component (Publisher, Processor, Subscriber) handles its own resources—especially since you can have fan-out and fan-in behavior. If onComplete would be issued as a response to observing a cancellation, it would still entail dealing with onNext being received before the onComplete coming in, and the Subscriber cannot assume that it is the sole Subscriber to the given Publisher, so it would only be able to relinquish resources it owns itself, and that it is free to do as soon as it sends the cancellation signal. |
@akarnokd I apologise unreservedly. I have no issue with whatever decision the project leads make about this - particularly because this is not my project, and because I am "late to the party", whether my suggestions are good or bad. I have closed the issue because the discussion is not about non-termination. |
@olotenko No worries, Oleksandr. I hope the reasoning in my previous message outlines why the proposed change wouldn't fix the problem you perceive. Cheers! |
@viktorklang I think you are mistaken on resource management. In a garbage-collected language it is easy to overlook the delegation of ownership of the message, because it is not concerning most of the time - when everyone drops a reference, GC collects it. But if you consider managed memory, like is the case with buffers that require pooling and releasing, then it is clear So it seems clear enough Also, when I say "fundamental problem", I don't mean the protocol is massively broken. I only mean that the problem of resource management cannot become decidable by bolting-on some predicates on the existing signals. I am not in a position to require anything of the protocol, I am only saying that resource management is not decidable now, and is decidable if |
@olotenko Since a Publisher can have many Subscriber at the same time, and can decide to both do fan-out/broadcast and do routing, the Subscriber cannot assume that it owns the elements that are passed to it. |
https://github.com/reactive-streams/reactive-streams-jvm/blob/7021f18/examples/src/main/java/org/reactivestreams/example/unicast/RangePublisher.java#L176-L178
should call
downstream.onComplete()
. OtherwiseSubscriber
will never know when it is ok to release its resources: the spec requiresSubscriber
to not assume anything upon a mere call tocancel()
, and requiresPublisher
to indicate terminal state after all intendedonNext
have been called.The text was updated successfully, but these errors were encountered: