Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix parEvalMap getting stuck on exception #1311

Merged
merged 2 commits into from
Nov 13, 2018
Merged

Fix parEvalMap getting stuck on exception #1311

merged 2 commits into from
Nov 13, 2018

Conversation

vlovgr
Copy link
Contributor

@vlovgr vlovgr commented Nov 7, 2018

Fix #1297. Explanation is in #1297 (comment).

@SystemFw
Copy link
Collaborator

SystemFw commented Nov 7, 2018

it would be good to have a test for this scenario, given that it's tricky, but if it's even trickier to write one, never mind

@vlovgr
Copy link
Contributor Author

vlovgr commented Nov 7, 2018

@SystemFw I agree a test would be good, but I've been unable to produce one given it's relying on timing (queue has to fill up after dequeuing the element with the error, and for the evalMap case, we have to attempt to enqueue another element before we reach rethrow). If you have any ideas, I'd be willing to try them.

@vlovgr
Copy link
Contributor Author

vlovgr commented Nov 10, 2018

@SystemFw Looks like there might be more issues at play here.

While trying to get some tests together, I wrote this:

import cats.effect.IO

class MapAsyncSpec extends Fs2Spec {
  "mapAsync should terminate on exceptions" in {
    Stream
      .iterate(0)(_ + 1)
      .covary[IO]
      .mapAsync(1) { n =>
        if (n == 5) IO.raiseError(new RuntimeException)
        else IO.unit
      }
      .compile
      .drain
      .unsafeRunSync
  }
}

which hangs forever, even with the fix in this pull request. It looks like neither finalizers ever get invoked. If the maxConcurrent parameter for mapAsync / parEvalMap is increased enough, I'm no longer able to reproduce it. From my short investigation, it looks like this issue occurs when queue.enqueue1(Some(value.get)) blocks while the error is raised, and for some reason, finalizers are not invoked, so we can't cancel the blocking. I'm not sure how stream errors are handled internally.

@SystemFw
Copy link
Collaborator

Thanks for all the work on this Viktor :)
My entire time is on #1312 and #1317 for now, but this one is next on my priority list.

I guess my next suggestion would be if you can reproduce with mapAsyncUnordered, which is basically parJoin, to see if the problem is there, or in the queue handling here. I suspect (and hope) the problem is here, but I guess it's worth trying if you have some time

@vlovgr
Copy link
Contributor Author

vlovgr commented Nov 10, 2018

@SystemFw Thanks for the super quick reply! :) It looks like we're getting stuck in the finalizer from concurrently. Do you know why it's getting executed before the one in parEvalMap? I thought finalizers should always be called in reverse order, but that does not seem to be the case here.

Edit: there seems to be a test case for concurrently, which confirms this finalizer behaviour.

Also, this small example:

Stream.empty
      .covary[IO]
      .onFinalize(IO(println("1")))
      .onFinalize(IO(println("2")))
      .compile
      .drain
      .unsafeRunSync

prints:

1
2

which is incorrect in my mind, as finalizers have to run in reverse order (or am I missing something?).

(And just to confirm, mapAsyncUnordered does not suffer from this issue.)

Edit 2: looking at the definition of onFinalize, it actually makes sense. So, it only means the finalizer for completing dequeueDone is in the wrong place. I've updated the pull request to move it before concurrently, and this now works as expected. I'll see if I can add some test cases.

@vlovgr
Copy link
Contributor Author

vlovgr commented Nov 12, 2018

@SystemFw I've added a test case for the finalizer behaviour, which is what previously prevented stream interruption (and which prevented it when I got the order of the finalizers wrong above). It does not test that the blocking enqueue is cancelled in evalMap, and I've not been able to write a test for it, but have manually managed to verify that it can happen, and that this fixes it.

@mpilquist mpilquist merged commit b95a354 into typelevel:series/1.0 Nov 13, 2018
@vlovgr vlovgr deleted the par-eval-map-fix branch November 13, 2018 13:31
vpavkin added a commit to vpavkin/fs2 that referenced this pull request Dec 3, 2018
vpavkin added a commit to vpavkin/fs2 that referenced this pull request Dec 3, 2018
@mpilquist mpilquist added this to the 1.0.1 milestone Dec 3, 2018
mpilquist added a commit that referenced this pull request Dec 4, 2018
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants