Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Queue (consistently) misses an element on certain usage #1293

Closed
svalaskevicius opened this issue Oct 16, 2018 · 10 comments
Closed

Queue (consistently) misses an element on certain usage #1293

svalaskevicius opened this issue Oct 16, 2018 · 10 comments

Comments

@svalaskevicius
Copy link
Contributor

svalaskevicius commented Oct 16, 2018

When a dequeue stream is manipulated (merged with another stream, and split into two via pull), then dequeuing misses an element (always the first element on a subsequent pull). an example of the code showing this can be seen here: https://scastie.scala-lang.org/LBTK7fSnQIC0kLEkvk9cYA

Not only the element (4) is missing from the second printed list, but it is also not shown in the dequeue's evalTap logging - there is no "deq: 4" line.

This behaviour is also consistent - all repeated runs result in the same error.

run1 in the example is added as a passing example - if the stream is split by amount, dequeuing works ok.

related conversation (in case there are more replies) on gitter starts here: https://gitter.im/functional-streams-for-scala/fs2?at=5bc642ed82893a2f3bf0cef9

fs2 version: 1.0.0
The same function (splitByTimeout) was working ok with an earlier fs2 release (0.10.x)

@mpilquist
Copy link
Member

mpilquist commented Oct 17, 2018

Here's a minimization:

Queue.unbounded[IO, Int].flatMap { q =>
  q.dequeue.evalMap(i => IO(println("deq1: " + i))).either(Stream.sleep(1.second)).takeThrough(_.isLeft).compile.drain *>
    q.enqueue1(1) *>
    q.enqueue1(2) *>
    q.dequeue1.flatMap(i => IO(println("deq2: " + i)))
}.unsafeRunSync
// Prints deq2: 2

The issue isn't related to cancelation of dequeue1, as that works ok:

Queue.unbounded[IO, Int].flatMap { q =>
    q.dequeue1.flatMap(i => IO(println("deq1: " + i))).timeout(1.second).attempt *>
      q.enqueue1(1) *>
      q.enqueue1(2) *>
      q.dequeue1.flatMap(i => IO(println("deq2: " + i)))
  }.unsafeRunSync
// Prints deq2: 1

The issue does occur with parJoinUnbounded in place of merge:

Queue.unbounded[IO, Int].flatMap { q =>
    Stream(
      q.dequeue.evalMap(i => IO(println("deq1: " + i))).map(Left(_)),
      Stream.sleep(1.second).map(Right(_))
    ).covary[IO].parJoinUnbounded.takeThrough(_.isLeft).compile.drain *>
      q.enqueue1(1) *>
      q.enqueue1(2) *>
      q.dequeue1.flatMap(i => IO(println("deq2: " + i)))
  }.unsafeRunSync
// Prints deq2: 2

The issue does occur with mergeHaltBoth in place of either + takeThrough:

Queue.unbounded[IO, Int].flatMap { q =>
    q.dequeue.evalMap(i => IO(println("deq1: " + i))).mergeHaltBoth(Stream.sleep(1.second).drain).compile.drain *>
      q.enqueue1(1) *>
      q.enqueue1(2) *>
      q.dequeue1.flatMap(i => IO(println("deq2: " + i)))
  }.unsafeRunSync
// Prints deq2: 2

The issue does occur with interruptAfter:

Queue.unbounded[IO, Int].flatMap { q =>
    q.dequeue.evalMap(i => IO(println("deq1: " + i))).interruptAfter(1.second).compile.drain *>
      q.enqueue1(1) *>
      q.enqueue1(2) *>
      q.dequeue1.flatMap(i => IO(println("deq2: " + i)))
  }.unsafeRunSync
// Prints deq2: 2

@mpilquist
Copy link
Member

mpilquist commented Oct 17, 2018

@pchlupacek I think this is a bug in dequeueChunk which is defined as:

def dequeueChunk(maxSize: Int): Stream[F, A] =
  Stream.evalUnChunk(pubSub.get(maxSize)).repeat

There's no bracketing occurring here, so when the stream is canceled, the outstanding call to get isn't finalized. I confirmed the following change fixes it:

def dequeueChunk(maxSize: Int): Stream[F, A] =
  Stream.bracket(pubSub.get(maxSize).start)(_.cancel).
    flatMap(f => Stream.evalUnChunk(f.join)).repeat

Or using our alias for eval + cancel:

def dequeueChunk(maxSize: Int): Stream[F, A] =
  Stream.supervise(pubSub.get(maxSize)).flatMap(f => Stream.evalUnChunk(f.join)).repeat

Probably similar issues on other uses of PubSub#get coupled with Stream.eval or variants.

@pchlupacek
Copy link
Contributor

@mpilquist yeah seems so will take a look on this.

@pchlupacek
Copy link
Contributor

@mpilquist its strange. the pubsub get actually performs cleanup in subscribers on every get. so there shall not be a need for that bracket imho.

@pchlupacek
Copy link
Contributor

ah i see the issue. the get is not cancelled, as it is not treated as resource in stream context. I think I will come with some generic fix for that :)

@mpilquist
Copy link
Member

mpilquist commented Oct 17, 2018 via email

@mpilquist
Copy link
Member

@pchlupacek Any luck? I'm fine with the simple fiber based solution and can PR that if you are busy.

@pchlupacek
Copy link
Contributor

@mpilquist could you give me a day or two? I have solution in head, essentially I want to add subscribe: Stream[F, A] to PubSub for these situations, is quite simple actually, its only that you bracket the stream instead every get.

@mpilquist
Copy link
Member

👍 no problem at all

@guersam
Copy link
Contributor

guersam commented Oct 26, 2018

I have a trouble with queue as well.

Program

import cats.effect.{ContextShift, IO, Timer}
import cats.implicits._
import fs2.Stream
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._


implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)

val prog: IO[Unit] = for {
  queue <- fs2.concurrent.InspectableQueue.unbounded[IO, Char]
  
  sizeWatch = queue.size
   .evalTap(s => IO(println(s"Queue size: $s")))
   .filterWithPrevious { (prev, curr) =>
     println(s"Filtering with Prev: $prev, Current: $curr")
     prev > curr
   }
   .evalMap(s => IO(println(s"Filtered queue size: $s")))

  queueOps =
    Stream('a', 'b', 'c').to[IO](queue.enqueue) ++ 
     Stream.repeatEval[IO, Unit] {
       IO.sleep(500.millis) >>
       queue.dequeue1.flatMap(c => IO(println(s"dequeued: $c")))
     }

  _ <- (sizeWatch concurrently queueOps).compile.drain
} yield ()

prog.unsafeRunSync()

Expected output

Queue size: 0
Filtered queue size: 0
Queue size: 3
Filtering with Prev: 0, Current: 3
dequeued: a
Queue size: 2
Filtering with Prev: 3, Current: 2
Filtered queue size: 2
dequeued: b
Queue size: 1
Filtering with Prev: 2, Current: 1
Filtered queue size: 1
dequeued: c
Queue size: 0
Filtering with Prev: 1, Current: 0
Filtered queue size: 0

Actual output

Queue size: 0
Filtered queue size: 0
Queue size: 3
Filtering with Prev: 0, Current: 3
Filtering with Prev: 0, Current: 3
dequeued: a
Queue size: 2
Filtering with Prev: 0, Current: 2
Filtering with Prev: 0, Current: 2
dequeued: b
Queue size: 1
Filtering with Prev: 0, Current: 1
Filtering with Prev: 0, Current: 1
dequeued: c
Queue size: 0
Filtering with Prev: 0, Current: 0
Filtering with Prev: 0, Current: 0

It is related to this issue, or am I just doing something wrong?

EDIT

Nevermind, filterWithPrevious compares current element with previously emitted values... Sorry for the noise.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants