Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Feature/online map only #269

Merged
merged 5 commits into from
Oct 7, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ object TestGraphs {
.name("My named flatmap")
.flatMap(postJoinFn)
.sumByKey(store)

def mapOnlyJob[P <: Platform[P], T, U](
source: Producer[P, T],
sink: P#Sink[U]
)(mapOp: T => TraversableOnce[U]): Producer[P, U] =
source
.flatMap(mapOp)
.write(sink)
}

class TestGraphs[P <: Platform[P], T: Manifest: Arbitrary, K: Arbitrary, V: Arbitrary: Equiv: Monoid](platform: P)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import Constants._
class IntermediateFlatMapBolt[T](
@transient flatMapOp: FlatMapOperation[T, _],
metrics: FlatMapStormMetrics,
anchor: AnchorTuples) extends BaseBolt(metrics.metrics) {
anchor: AnchorTuples,
shouldEmit: Boolean) extends BaseBolt(metrics.metrics) {

val lockedOp = MeatLocker(flatMapOp)

Expand All @@ -45,12 +46,14 @@ class IntermediateFlatMapBolt[T](
val (time, t) = tuple.getValue(0).asInstanceOf[(Long, T)]

lockedOp.get(t).foreach { items =>
items.foreach { u =>
onCollector { col =>
val values = toValues(time, u)
if (anchor.anchor)
col.emit(tuple, values)
else col.emit(values)
if(shouldEmit) {
items.foreach { u =>
onCollector { col =>
val values = toValues(time, u)
if (anchor.anchor)
col.emit(tuple, values)
else col.emit(values)
}
}
}
ack(tuple)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config
getOrElse(stormDag, node, DEFAULT_FM_STORM_METRICS),
anchorTuples)(summerProducer.monoid.asInstanceOf[Monoid[Any]], summerProducer.store.batcher)
case None =>
new IntermediateFlatMapBolt(operation, metrics, anchorTuples)
new IntermediateFlatMapBolt(operation, metrics, anchorTuples, stormDag.dependenciesOf(node).size > 0)
}

val parallelism = getOrElse(stormDag, node, DEFAULT_FM_PARALLELISM)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.summingbird._
import com.twitter.summingbird.batch.{BatchID, Batcher}
import com.twitter.summingbird.storm.spout.TraversableSpout
import com.twitter.summingbird.memory._
import com.twitter.tormenta.spout.Spout
import com.twitter.util.Future
import java.util.{Collections, HashMap, Map => JMap, UUID}
Expand Down Expand Up @@ -144,6 +145,48 @@ object StormLaws extends Specification {
(testFn, globalState(id))
}

def memoryPlanWithoutSummer(original: List[Int])(mkJob: (Producer[Memory, Int], Memory#Sink[Int]) => Producer[Memory, Int])
: List[Int] = {
val memory = new Memory
val outputList = ArrayBuffer[Int]()
val sink: (Int) => Unit = {x: Int => outputList += x}

val job = mkJob(
Memory.toSource(original),
sink
)
val topo = memory.plan(job)
memory.run(topo)
outputList.toList
}

val outputList = new ArrayBuffer[Int] with SynchronizedBuffer[Int]

def append(x: Int):Unit = {
StormLaws.outputList += x
}

def runWithOutSummer(original: List[Int])(mkJob: (Producer[Storm, Int], Storm#Sink[Int]) => Producer[Storm, Int])
: List[Int] = {
val cluster = new LocalCluster()

val sink: () => ((Int) => Future[Unit]) = () => {x: Int =>
append(x)
Future.Unit
}
val job = mkJob(
Storm.source(TraversableSpout(original)),
sink
)

val topo = storm.plan(job)
Testing.completeTopology(cluster, topo, completeTopologyParam)
// Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667
Thread.sleep(1000)
cluster.shutdown
StormLaws.outputList.toList
}

"StormPlatform matches Scala for single step jobs" in {
val original = sample[List[Int]]
val (fn, returnedState) =
Expand Down Expand Up @@ -208,4 +251,33 @@ object StormLaws extends Specification {
.collect { case ((k, batchID), Some(v)) => (k, v) }
) must beTrue
}

"StormPlatform matches Scala for MapOnly/NoSummer" in {
val original = sample[List[Int]]
val doubler = {x: Int => List(x*2)}

val stormOutputList =
runWithOutSummer(original)(
TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler)
).sorted
val memoryOutputList =
memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted

stormOutputList must_==(memoryOutputList)
}

"StormPlatform matches Scala for MapOnly/NoSummer with dangling FM" in {
val original = sample[List[Int]]
val doubler = {x: Int => List(x*2)}

val stormOutputList =
runWithOutSummer(original)(
TestGraphs.mapOnlyJob[Storm, Int, Int](_, _)(doubler).map{x:Int => x*3}
).sorted
val memoryOutputList =
memoryPlanWithoutSummer(original) (TestGraphs.mapOnlyJob[Memory, Int, Int](_, _)(doubler)).sorted

stormOutputList must_==(memoryOutputList)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object TopologyPlannerLaws extends Properties("StormDag") {
}
}

property("The the last producer in any StormNode prior to a summer must be a KeyedProducer") = forAll { (dag: StormDag) =>
property("The last producer in any StormNode prior to a summer must be a KeyedProducer") = forAll { (dag: StormDag) =>
dag.nodes.forall{n =>
val firstP = n.members.last
firstP match {
Expand Down