Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Adjusted signature of fold and scan on Segment so the original result… #1040

Merged
merged 1 commit into from
Dec 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions core/shared/src/main/scala/fs2/Segment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -323,16 +323,16 @@ abstract class Segment[+O,+R] { self =>
*
* @example {{{
* scala> Segment(1,2,3,4,5).fold(0)(_ + _).force.run
* res0: Int = 15
* res0: (Unit, Int) = ((),15)
* }}}
*/
final def fold[B](z: B)(f: (B,O) => B): Segment[Nothing,B] = new Segment[Nothing,B] {
def stage0(depth: Depth, defer: Defer, emit: Nothing => Unit, emits: Chunk[Nothing] => Unit, done: B => Unit) = {
final def fold[B](z: B)(f: (B,O) => B): Segment[Nothing,(R,B)] = new Segment[Nothing,(R,B)] {
def stage0(depth: Depth, defer: Defer, emit: Nothing => Unit, emits: Chunk[Nothing] => Unit, done: ((R,B)) => Unit) = {
var b = z
self.stage(depth.increment, defer,
o => b = f(b, o),
os => { var i = 0; while (i < os.size) { b = f(b, os(i)); i += 1 } },
r => done(b)).map(_.mapRemainder(_.fold(b)(f)))
r => done(r -> b)).map(_.mapRemainder(_.fold(b)(f)))
}
override def toString = s"($self).fold($z)(<f1>)"
}
Expand Down Expand Up @@ -499,13 +499,13 @@ abstract class Segment[+O,+R] { self =>
* res0: Vector[Int] = Vector(0, 1, 3, 6, 10, 15)
* }}}
*/
final def scan[B](z: B, emitFinal: Boolean = true)(f: (B,O) => B): Segment[B,B] = new Segment[B,B] {
def stage0(depth: Depth, defer: Defer, emit: B => Unit, emits: Chunk[B] => Unit, done: B => Unit) = {
final def scan[B](z: B, emitFinal: Boolean = true)(f: (B,O) => B): Segment[B,(R,B)] = new Segment[B,(R,B)] {
def stage0(depth: Depth, defer: Defer, emit: B => Unit, emits: Chunk[B] => Unit, done: ((R,B)) => Unit) = {
var b = z
self.stage(depth.increment, defer,
o => { emit(b); b = f(b, o) },
os => { var i = 0; while (i < os.size) { emit(b); b = f(b, os(i)); i += 1 } },
r => { if (emitFinal) emit(b); done(b) }).map(_.mapRemainder(_.scan(b, emitFinal)(f)))
r => { if (emitFinal) emit(b); done(r -> b) }).map(_.mapRemainder(_.scan(b, emitFinal)(f)))
}
override def toString = s"($self).scan($z)($f)"
}
Expand Down Expand Up @@ -1448,7 +1448,7 @@ object Segment {
Traverse[List].traverse(fa.force.toList)(f).map(Segment.seq)

def foldLeft[A, B](fa: Segment[A, Unit], b: B)(f: (B, A) => B): B =
fa.fold(b)(f).force.run
fa.fold(b)(f).force.run._2

def foldRight[A, B](fa: Segment[A, Unit], lb: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] =
Foldable[List].foldRight(fa.force.toList, lb)(f)
Expand Down Expand Up @@ -1477,4 +1477,4 @@ object Segment {

def pure[A](x: A): Segment[T, A] = Segment.pure(x)
}
}
}
18 changes: 9 additions & 9 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,14 +454,14 @@ final class Stream[+F[_],+O] private(private val free: FreeC[Algebra[Nothing,Not
case None => Pull.pure(None)
case Some((hd, tl)) =>
// Check if we can emit this chunk unmodified
Pull.segment(hd.fold((true, last)) { case ((acc, last), o) => (acc && f(last, o), o) }).flatMap { case (allPass, newLast) =>
Pull.segment(hd.fold((true, last)) { case ((acc, last), o) => (acc && f(last, o), o) }.mapResult(_._2)).flatMap { case (allPass, newLast) =>
if (allPass) {
Pull.output(hd) >> go(newLast, tl)
} else {
Pull.segment(hd.fold((Vector.empty[O], last)) { case ((acc, last), o) =>
if (f(last, o)) (acc :+ o, o)
else (acc, last)
}).flatMap { case (acc, newLast) =>
}.mapResult(_._2)).flatMap { case (acc, newLast) =>
Pull.output(Segment.vector(acc)) >> go(newLast, tl)
}
}
Expand Down Expand Up @@ -774,7 +774,7 @@ final class Stream[+F[_],+O] private(private val free: FreeC[Algebra[Nothing,Not
this.pull.uncons.flatMap {
case None => Pull.done
case Some((hd,tl)) =>
hd.scan(z)(f).force.uncons1 match {
hd.scan(z)(f).mapResult(_._2).force.uncons1 match {
case Left(acc) => tl.scan_(acc)(f)
case Right((_, out)) => Pull.segment(out).flatMap{acc => tl.scan_(acc)(f)}
}
Expand Down Expand Up @@ -865,7 +865,7 @@ final class Stream[+F[_],+O] private(private val free: FreeC[Algebra[Nothing,Not
s.pull.uncons.flatMap {
case None => Pull.done
case Some((hd,tl)) =>
hd.scan(window)((w, i) => w.dequeue._2.enqueue(i)).force.drop(1) match {
hd.scan(window)((w, i) => w.dequeue._2.enqueue(i)).mapResult(_._2).force.drop(1) match {
case Left((w2,_)) => go(w2, tl)
case Right(out) => Pull.segment(out).flatMap { window => go(window, tl) }
}
Expand All @@ -874,7 +874,7 @@ final class Stream[+F[_],+O] private(private val free: FreeC[Algebra[Nothing,Not
this.pull.unconsN(n, true).flatMap {
case None => Pull.done
case Some((hd, tl)) =>
val window = hd.fold(collection.immutable.Queue.empty[O])(_.enqueue(_)).force.run
val window = hd.fold(collection.immutable.Queue.empty[O])(_.enqueue(_)).force.run._2
Pull.output1(window) >> go(window, tl)
}.stream
}
Expand Down Expand Up @@ -1996,7 +1996,7 @@ object Stream {
case Left(None) => Pull.pure(None)
case Right(None) => Pull.pure(None)
case Left(Some((s, controlStream))) =>
Pull.segment(s.fold(false)(_ || _)).flatMap { p =>
Pull.segment(s.fold(false)(_ || _).mapResult(_._2)).flatMap { p =>
if (p) paused(controlStream, srcFuture)
else controlStream.pull.unconsAsync.flatMap(unpaused(_, srcFuture))
}
Expand Down Expand Up @@ -2073,7 +2073,7 @@ object Stream {
if (partitions.isEmpty) Segment.chunk(partitions) -> None
else if (partitions.size == 1) Segment.empty -> partitions.last
else Segment.chunk(partitions.take(partitions.size - 1)) -> partitions.last
}.flatMap { case (out, carry) => out }.mapResult { case ((out, carry), unit) => carry }
}.mapResult(_._2).flatMap { case (out, carry) => out }.mapResult { case ((out, carry), unit) => carry }
}.flatMap { case Some(carry) => Pull.output1(carry); case None => Pull.done }.stream
}

Expand Down Expand Up @@ -2563,7 +2563,7 @@ object Stream {
def fold[O2](z: O2)(f: (O2, O) => O2): Pull[F,Nothing,O2] =
uncons.flatMap {
case None => Pull.pure(z)
case Some((hd,tl)) => Pull.segment(hd.fold(z)(f)).flatMap { z => tl.pull.fold(z)(f) }
case Some((hd,tl)) => Pull.segment(hd.fold(z)(f).mapResult(_._2)).flatMap { z => tl.pull.fold(z)(f) }
}

/**
Expand Down Expand Up @@ -2593,7 +2593,7 @@ object Stream {
def go(prev: Option[O], s: Stream[F,O]): Pull[F,Nothing,Option[O]] =
s.pull.uncons.flatMap {
case None => Pull.pure(prev)
case Some((hd,tl)) => Pull.segment(hd.fold(prev)((_,o) => Some(o))).flatMap(go(_,tl))
case Some((hd,tl)) => Pull.segment(hd.fold(prev)((_,o) => Some(o)).mapResult(_._2)).flatMap(go(_,tl))
}
go(None, self)
}
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/internal/Algebra.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private[fs2] object Algebra {
case None => F.pure(acc)
case Some((hd, tl)) =>
F.suspend {
try compileLoop[F,O,B](scope, hd.fold(acc)(g).force.run, g, uncons(tl).viewL)
try compileLoop[F,O,B](scope, hd.fold(acc)(g).force.run._2, g, uncons(tl).viewL)
catch { case NonFatal(e) => compileLoop[F,O,B](scope, acc, g, uncons(tl.asHandler(e)).viewL) }
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/test/scala/fs2/SegmentSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class SegmentSpec extends Fs2Spec {

"fold" in {
forAll { (s: Segment[Int,Unit], init: Int, f: (Int, Int) => Int) =>
s.fold(init)(f).force.run shouldBe s.force.toVector.foldLeft(init)(f)
s.fold(init)(f).mapResult(_._2).force.run shouldBe s.force.toVector.foldLeft(init)(f)
}
}

Expand Down