Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #203 from twitter/feature/bijected_monoid
Browse files Browse the repository at this point in the history
Optimize for Bijected Monoids in the flow
  • Loading branch information
johnynek committed Sep 12, 2013
2 parents ea04b11 + dfd3415 commit 917aa41
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
3 changes: 2 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ object SummingbirdBuild extends Build {
lazy val summingbirdClient = module("client").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "algebird-util" % algebirdVersion,
withCross("com.twitter" %% "bijection-core" % bijectionVersion),
withCross("com.twitter" %% "storehaus-core" % storehausVersion),
withCross("com.twitter" %% "storehaus-algebra" % storehausVersion)
Expand Down Expand Up @@ -184,7 +185,7 @@ object SummingbirdBuild extends Build {
"com.backtype" % "dfs-datastores" % dfsDatastoresVersion,
"com.backtype" % "dfs-datastores-cascading" % dfsDatastoresVersion,
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "algebird-util" % algebirdVersion,
"com.twitter" %% "algebird-bijection" % algebirdVersion,
withCross("com.twitter" %% "bijection-json" % bijectionVersion),
withCross("com.twitter" %% "chill" % chillVersion),
"com.twitter" %% "scalding-core" % scaldingVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.twitter.summingbird.scalding

import com.twitter.algebird.bijection.BijectedSemigroup
import com.twitter.algebird.{Monoid, Semigroup}
import com.twitter.algebird.{ Universe, Empty, Interval, Intersection, InclusiveLower, ExclusiveUpper, InclusiveUpper }
import com.twitter.algebird.monad.{StateWithError, Reader}
import com.twitter.bijection.Bijection
import com.twitter.scalding.{Dsl, Mode, TypedPipe, Grouped, IterableSource, TupleSetter, TupleConverter}
import com.twitter.summingbird._
import com.twitter.summingbird.option._
Expand Down Expand Up @@ -125,21 +127,44 @@ trait BatchedScaldingStore[K, V] extends ScaldingStore[K, V] { self =>
case NonCommutative => sorted
}

val redFn: (((Long, V), (Long, V)) => (Long, V)) = { (left, right) =>
/**
* If the supplied Semigroup[V] is an instance of
* BijectedSemigroup[U, V], Returns the backing Bijection[U, V]
* and Semigroup[U]; else None.
*/
def unpackBijectedSemigroup[U]: Option[(Bijection[V, U], Semigroup[U])] =
sg match {
case innerSg: BijectedSemigroup[U, V] => Some(
innerSg.bijection -> innerSg.sg
)
case _ => None
}

def redFn[T: Semigroup]
: (((Long, T), (Long, T)) => (Long, T)) = { (left, right) =>
val (tl, vl) = left
val (tr, vr) = right
(tl max tr, sg.plus(vl, vr))
(tl max tr, Semigroup.plus(vl, vr))
}

// We strip the times
val nextSummed = toKVPipe(maybeSorted.reduce(redFn)).values
def sumNext[U] =
unpackBijectedSemigroup[U].map { case (bij, semi) =>
maybeSorted.mapValues { case (l, v) => (l, bij(v)) }
.reduce(redFn[U](semi))
.map { case (k, (l, v)) => (k, (l, (bij.invert(v)))) }
}.getOrElse {
maybeSorted.reduce(redFn(sg))
}
val nextSummed = toKVPipe(sumNext).values
// could be an empty method, in which case scalding will do nothing here
writeLast(head, nextSummed)

// Make the incremental stream
val stream = toKVPipe(sorted.scanLeft(None: Option[(Long, V)]) { (old, item) =>
old match {
case None => Some(item)
case Some(prev) => Some(redFn(prev, item))
case Some(prev) => Some(redFn(sg)(prev, item))
}
}
.mapValueStream { _.flatten /* unbox the option */ }
Expand Down

0 comments on commit 917aa41

Please # to comment.