diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 6ffc9b79fe..7b315e5a9b 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -654,10 +654,9 @@ object Pull extends PullLowPriority { def cont(r: Terminal[Unit]): Pull[Pure, INothing, Unit] = r } - private abstract class Bind[+F[_], +O, X, +R](misstep: Pull[F, O, X]) + private abstract class Bind[+F[_], +O, X, +R](val step: Pull[F, O, X]) extends Pull[F, O, R] with ContP[X, F, O, R] { - def step: Pull[F, O, X] = misstep def cont(r: Terminal[X]): Pull[F, O, R] def delegate: Bind[F, O, X, R] = this } @@ -694,12 +693,12 @@ object Pull extends PullLowPriority { // This class is not created by combinators in public Pull API, only during compilation private class BindBind[F[_], O, X, Y]( - var innerBind: Bind[F, O, X, Y], - var endBind: Bind[F, O, Y, Unit] - ) extends Bind[F, O, X, Unit](null) { - override def step: Pull[F, O, X] = innerBind.step - def cont(xterm: Terminal[X]): Pull[F, O, Unit] = - try bindBindAux(xterm, this) + step: Pull[F, O, X], + val bb: Bind[F, O, X, Y], + val del: Bind[F, O, Y, Unit] + ) extends Bind[F, O, X, Unit](step) { + def cont(tx: Terminal[X]): Pull[F, O, Unit] = + try bindBindAux(bb.cont(tx), del) catch { case NonFatal(e) => Fail(e) } } @@ -712,12 +711,7 @@ object Pull extends PullLowPriority { case ty: Terminal[_] => del match { case cici: BindBind[F, O, r, Y] => - val innerBind = cici.innerBind - val endBind = cici.endBind - cici.innerBind = null - cici.endBind = null - val nextStep = innerBind.cont(ty) - bindBindAux[F, O, r, Y](nextStep, endBind) + bindBindAux[F, O, r, Y](cici.bb.cont(ty), cici.del) case _ => del.cont(ty) } case x => new DelegateBind(x, del) @@ -871,7 +865,7 @@ object Pull extends PullLowPriority { case b: Bind[G, X, y, Unit] => b.step match { case c: Bind[G, X, x, _] => - viewL(new BindBind[G, X, x, y](c, b.delegate)) + viewL(new BindBind[G, X, x, y](c.step, c.delegate, b.delegate)) case e: Action[G, X, y2] => contP = b.delegate e @@ -910,6 +904,21 @@ object Pull extends PullLowPriority { def interrupted(inter: Interrupted): End def fail(e: Throwable): End } + type CallRun[+G[_], +X, End] = Run[G, X, End] => End + + object TheBuildR extends Run[Pure, INothing, F[CallRun[Pure, Nothing, F[INothing]]]] { + type TheRun = Run[Pure, INothing, F[INothing]] + def fail(e: Throwable) = F.raiseError(e) + def done(scope: Scope[F]) = + F.pure((cont: TheRun) => cont.done(scope)) + def out(head: Chunk[INothing], scope: Scope[F], tail: Pull[Pure, INothing, Unit]) = + F.pure((cont: TheRun) => cont.out(head, scope, tail)) + def interrupted(i: Interrupted) = + F.pure((cont: TheRun) => cont.interrupted(i)) + } + + def buildR[G[_], X, End]: Run[G, X, F[CallRun[G, X, F[End]]]] = + TheBuildR.asInstanceOf[Run[G, X, F[CallRun[G, X, F[End]]]]] def go[G[_], X, End]( scope: Scope[F], @@ -1205,13 +1214,17 @@ object Pull extends PullLowPriority { case u: Uncons[G, y] @unchecked => val v = getCont[Option[(Chunk[y], Pull[G, y, Unit])], G, X] // a Uncons is run on the same scope, without shifting. - F.unit >> go(scope, extendedTopLevelScope, translation, new UnconsRunR(v), u.stream) + val runr = buildR[G, y, End] + F.unit >> go(scope, extendedTopLevelScope, translation, runr, u.stream).attempt + .flatMap(_.fold(goErr(_, v), _.apply(new UnconsRunR(v)))) case s: StepLeg[G, y] @unchecked => val v = getCont[Option[Stream.StepLeg[G, y]], G, X] + val runr = buildR[G, y, End] scope .shiftScope(s.scope, s.toString) - .flatMap(go(_, extendedTopLevelScope, translation, new StepLegRunR(v), s.stream)) + .flatMap(go(_, extendedTopLevelScope, translation, runr, s.stream).attempt) + .flatMap(_.fold(goErr(_, v), _.apply(new StepLegRunR(v)))) case _: GetScope[_] => go(scope, extendedTopLevelScope, translation, runner, getCont(Succeeded(scope)))