diff --git a/project/Build.scala b/project/Build.scala index 732d3d44e..cab13096e 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -145,6 +145,7 @@ object SummingbirdBuild extends Build { lazy val summingbirdClient = module("client").settings( libraryDependencies ++= Seq( "com.twitter" %% "algebird-core" % algebirdVersion, + "com.twitter" %% "algebird-util" % algebirdVersion, withCross("com.twitter" %% "bijection-core" % bijectionVersion), withCross("com.twitter" %% "storehaus-core" % storehausVersion), withCross("com.twitter" %% "storehaus-algebra" % storehausVersion) @@ -184,7 +185,7 @@ object SummingbirdBuild extends Build { "com.backtype" % "dfs-datastores" % dfsDatastoresVersion, "com.backtype" % "dfs-datastores-cascading" % dfsDatastoresVersion, "com.twitter" %% "algebird-core" % algebirdVersion, - "com.twitter" %% "algebird-util" % algebirdVersion, + "com.twitter" %% "algebird-bijection" % algebirdVersion, withCross("com.twitter" %% "bijection-json" % bijectionVersion), withCross("com.twitter" %% "chill" % chillVersion), "com.twitter" %% "scalding-core" % scaldingVersion, diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingStore.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingStore.scala index 72b3ba71e..59a612b14 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingStore.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingStore.scala @@ -16,9 +16,11 @@ package com.twitter.summingbird.scalding +import com.twitter.algebird.bijection.BijectedSemigroup import com.twitter.algebird.{Monoid, Semigroup} import com.twitter.algebird.{ Universe, Empty, Interval, Intersection, InclusiveLower, ExclusiveUpper, InclusiveUpper } import com.twitter.algebird.monad.{StateWithError, Reader} +import com.twitter.bijection.Bijection import com.twitter.scalding.{Dsl, Mode, TypedPipe, Grouped, IterableSource, TupleSetter, TupleConverter} import com.twitter.summingbird._ import com.twitter.summingbird.option._ @@ -125,13 +127,36 @@ trait BatchedScaldingStore[K, V] extends ScaldingStore[K, V] { self => case NonCommutative => sorted } - val redFn: (((Long, V), (Long, V)) => (Long, V)) = { (left, right) => + /** + * If the supplied Semigroup[V] is an instance of + * BijectedSemigroup[U, V], Returns the backing Bijection[U, V] + * and Semigroup[U]; else None. + */ + def unpackBijectedSemigroup[U]: Option[(Bijection[V, U], Semigroup[U])] = + sg match { + case innerSg: BijectedSemigroup[U, V] => Some( + innerSg.bijection -> innerSg.sg + ) + case _ => None + } + + def redFn[T: Semigroup] + : (((Long, T), (Long, T)) => (Long, T)) = { (left, right) => val (tl, vl) = left val (tr, vr) = right - (tl max tr, sg.plus(vl, vr)) + (tl max tr, Semigroup.plus(vl, vr)) } + // We strip the times - val nextSummed = toKVPipe(maybeSorted.reduce(redFn)).values + def sumNext[U] = + unpackBijectedSemigroup[U].map { case (bij, semi) => + maybeSorted.mapValues { case (l, v) => (l, bij(v)) } + .reduce(redFn[U](semi)) + .map { case (k, (l, v)) => (k, (l, (bij.invert(v)))) } + }.getOrElse { + maybeSorted.reduce(redFn(sg)) + } + val nextSummed = toKVPipe(sumNext).values // could be an empty method, in which case scalding will do nothing here writeLast(head, nextSummed) @@ -139,7 +164,7 @@ trait BatchedScaldingStore[K, V] extends ScaldingStore[K, V] { self => val stream = toKVPipe(sorted.scanLeft(None: Option[(Long, V)]) { (old, item) => old match { case None => Some(item) - case Some(prev) => Some(redFn(prev, item)) + case Some(prev) => Some(redFn(sg)(prev, item)) } } .mapValueStream { _.flatten /* unbox the option */ }