Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

observe could be simplified #2778

Closed
zainab-ali opened this issue Dec 28, 2021 · 4 comments
Closed

observe could be simplified #2778

zainab-ali opened this issue Dec 28, 2021 · 4 comments

Comments

@zainab-ali
Copy link
Contributor

zainab-ali commented Dec 28, 2021

This is a follow up of #2765.

I think it's possible to simplify the implementation of observe to use a single channel instead of two, as demonstrated in this branch.

@diesalbla also raised the question of how observe should behave for non-standard pipes, and whether the current implementation was intuitive.

A brief history of observe

From trawling through the code and issues, this is how I gather the current version came into being.

I'm not sure why two queues were used in the first place.

@diesalbla
Copy link
Contributor

diesalbla commented Dec 29, 2021

So, some doubts about observe that may be clarified:

In the definition of the sinkOut stream, to which the pipe is applied, we have Pull.output(ch) >> Pull.eval(outChan.send(ch)) >> go(rest); so ch is sent to the output channel after it is sent to the pipe, and after the pipe is done processing ch. Thus, it is the observer pipe that can pull chunks ahead of the output (up to the given limit), but not the reverse. The output needs to wait on the observer.

Regarding the change from ++ to onFinalize in the output stream, in Stream.chunk(chunk).onFinalize(guard.release): this makes a difference if the stream has an error or interruption. However, since observe handles none of those, the whole stream would stop. If so, perhaps it should not "release" an extra space to allow it to pull from the source a chunk that it is most likely not going to process.

The main point, to simplify the implementation to remove one queue, is very valuable. A Pull Request would be welcome :)

@zainab-ali
Copy link
Contributor Author

Thanks for clarifying @diesalbla , I'll leave out the onFinalize changes.

I'm a bit cautious of the refactoring, so am taking a bit of a digression to model this in PlusCal to find the interesting edge cases. I'll let you know what I find.

@zainab-ali
Copy link
Contributor Author

Apologies for the delay, I've had a lot of fun modeling this with TLA+ and PlusCal. It took a bit of effort to model the different state changes, including interruptions and error states, but I think it was worthwhile.

It turns out that there's at least one difference in behaviour between the proposed and current code. The proposed code swallows errors if the observer handles them. In the following snippet, the error isn't propagated to the output stream:

val output = input.observe(_.handleErrorWith(handler))

There are also a few odd behaviours relating to interruption. The following code causes the input stream to be cancelled, but the output stream to succeed:

val output = input.observe(_.interruptAfter(1.second))

These behaviours both have the same cause ⸺ by removing the extra channel, we introduce a relationship between the scope of the input stream and that of the observer stream. The observer can affect error handling, cancellation and resource cleanup in the input. By putting a channel between them and running them with concurrently instead, we keep the scopes separate with well-defined error propagation and cancellation semantics.

With that in mind, I think it would be better to leave the code as is.

Other interesting behaviours

The model checker also picked up on a few incorrect assumptions I had. In particular, I assumed that if the output and observer stream terminated successfully, all elements pulled from the input would be received by the output. This is not the case for finite observers.

The following code succeeds, but never prints "Pulled through output" and results in an empty list:

val input: Stream[IO, Int] = Stream(1).covary[IO].debug(_ => "Pulled from input")
val observer: Pipe[IO, Int, Nothing] = _.take(1).debug(_ => "Pulled through observer").drain
input.observe(observer)
.take(1).debug(_ => "Pulled through output")
.compile.toList

There's a test in StreamObserverSuite to this effect.

 group("handle finite observing sink") {
   test("2") {
     forAllF { (s: Stream[Pure, Int]) =>
       observer(Stream(1, 2) ++ s.covary[IO])(_.take(1).drain).compile.toList
         .assertEquals(Nil)
     }
   }
 }

Is this behaviour intended? If not, it would be possible to avoid it by sending the element to the output channel before it is output to the observer.

Termination

Somewhat reassuringly, the model checker proved that the code always terminates. It also proved a bunch of other invariants, at least for the simplified system I coded up. If you're interested, you can have a look at the PlusCal code (I'm fairly new to it, and so the code isn't as clean as it could be, and likely has a few bugs).

@SystemFw
Copy link
Collaborator

SystemFw commented Feb 3, 2022

@zainab-ali Agreed on leaving the code as is given your discoveries.
I'm going to close this for now, but thank you so much, and I'd love to watch a talk/read a blog about the exploration with PlusCal :)

@SystemFw SystemFw closed this as completed Feb 3, 2022
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants