diff --git a/summingbird-core/src/test/scala/com/twitter/summingbird/DependantsTests.scala b/summingbird-core/src/test/scala/com/twitter/summingbird/DependantsTests.scala index dbdb0d276..696dfef3b 100644 --- a/summingbird-core/src/test/scala/com/twitter/summingbird/DependantsTests.scala +++ b/summingbird-core/src/test/scala/com/twitter/summingbird/DependantsTests.scala @@ -20,7 +20,7 @@ import com.twitter.summingbird.memory._ import org.scalacheck._ import Gen._ -import Arbitrary.arbitrary +import org.scalacheck.Arbitrary._ import org.scalacheck.Prop._ import scala.util.Random @@ -31,10 +31,8 @@ object DependantsTest extends Properties("Dependants") { implicit def testStore: Memory#Store[Int, Int] = MMap[Int, Int]() implicit def sink1: Memory#Sink[Int] = ((_) => Unit) implicit def sink2: Memory#Sink[(Int, Int)] = ((_) => Unit) - def genIntList = Seq.fill(50)(Random.nextInt).toList - def genPairIntList = Seq.fill(50)((Random.nextInt, Random.nextInt)).toList - implicit def genSource1: Gen[Producer[Memory, Int]] = Gen.choose(1,1000).map(x => Producer.source[Memory, Int](genIntList)) - implicit def genSource2: Gen[KeyedProducer[Memory, Int, Int]] = Gen.choose(1,1000).map(x => IdentityKeyedProducer(Producer.source[Memory, (Int, Int)](genPairIntList))) + implicit def arbSource1: Arbitrary[Producer[Memory, Int]] = Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[Int]).map(Producer.source[Memory,Int](_))) + implicit def arbSource2: Arbitrary[KeyedProducer[Memory, Int, Int]] = Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[(Int, Int)]).map(Producer.source[Memory,(Int, Int)](_))) implicit def genProducer: Arbitrary[Producer[Memory, _]] = Arbitrary(oneOf(genProd1, genProd2, summed)) diff --git a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala index a845dbe20..670ce7083 100644 --- a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala +++ b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala @@ -18,82 +18,83 @@ package com.twitter.summingbird import org.scalacheck._ import Gen._ +import Arbitrary.arbInt import Arbitrary.arbitrary object TestGraphGenerators { // Put the non-recursive calls first, otherwise you blow the stack - def genOptMap11[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genOptMap11[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { fn <- arbitrary[(Int) => Option[Int]] in <- genProd1 } yield OptionMappedProducer(in, fn) - def genOptMap12[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genOptMap12[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { fn <- arbitrary[(Int) => Option[(Int,Int)]] in <- genProd1 } yield IdentityKeyedProducer(OptionMappedProducer(in, fn)) - def genOptMap21[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genOptMap21[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { fn <- arbitrary[((Int,Int)) => Option[Int]] in <- genProd2 } yield OptionMappedProducer(in, fn) - def genOptMap22[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genOptMap22[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { fn <- arbitrary[((Int,Int)) => Option[(Int,Int)]] in <- genProd2 } yield IdentityKeyedProducer(OptionMappedProducer(in, fn)) - def aDependency[P <: Platform[P]](p: KeyedProducer[P, Int, Int])(implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[KeyedProducer[P, Int, Int]] = { + def aDependency[P <: Platform[P]](p: KeyedProducer[P, Int, Int])(implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[KeyedProducer[P, Int, Int]] = { val deps = Producer.transitiveDependenciesOf(p).collect{case x:KeyedProducer[_, _, _] => x.asInstanceOf[KeyedProducer[P,Int,Int]]} if(deps.size == 1) genProd2 else oneOf(deps) } - def genMerged2[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { - _ <- Gen.choose(0,1) + def genMerged2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + _ <- Gen.choose(0,1) p1 <- genProd2 p2 <- oneOf(genProd2, aDependency(p1)) } yield IdentityKeyedProducer(MergedProducer(p1, p2)) - def genFlatMap22[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genFlatMap22[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { fn <- arbitrary[((Int, Int)) => List[(Int, Int)]] in <- genProd2 } yield IdentityKeyedProducer(FlatMappedProducer(in, fn)) - def genFlatMap21[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genFlatMap21[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { fn <- arbitrary[((Int, Int)) => List[Int]] in <- genProd2 } yield FlatMappedProducer(in, fn) - def genFlatMap11[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genFlatMap11[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { fn <- arbitrary[(Int) => List[Int]] in <- genProd1 } yield FlatMappedProducer(in, fn) - def genMerged1[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genMerged1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { _ <- Gen.choose(0,1) p1 <- genProd1 p2 <- genProd1 } yield MergedProducer(p1, p2) - def genFlatMap12[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def genFlatMap12[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { fn <- arbitrary[(Int) => List[(Int, Int)]] in <- genProd1 } yield IdentityKeyedProducer(FlatMappedProducer(in, fn)) - def genWrite22[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], - genSource2 : Gen[KeyedProducer[P, Int, Int]], + def genWrite22[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], + genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[KeyedProducer[P, Int, Int]] = for { _ <- Gen.choose(0,1) p1 <- genProd2 } yield IdentityKeyedProducer(p1.write(sink2)) - def also1[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def also1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { _ <- Gen.choose(0, 1) // avoids blowup on self recursion out <- genProd1 ignored <- oneOf(genProd2, genProd1, oneOf(Producer.transitiveDependenciesOf(out))): Gen[Producer[P, _]] } yield ignored.also(out) - def also2[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + def also2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { _ <- Gen.choose(0, 1) // avoids blowup on self recursion out <- genProd2 ignored <- oneOf(genProd2, genProd1, oneOf(Producer.transitiveDependenciesOf(out))): Gen[Producer[P, _]] @@ -103,20 +104,20 @@ object TestGraphGenerators { // TODO (https://github.com/twitter/summingbird/issues/74): add more // nodes, abstract over Platform - def summed[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], genSource2 : Gen[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { - in <- genProd2 + def summed[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]) = for { + in <- genProd2 } yield in.sumByKey(testStore) - def genProd2[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], - genSource2 : Gen[KeyedProducer[P, Int, Int]], - testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[KeyedProducer[P, Int, Int]] = - frequency((25, genSource2), (3, genOptMap12), (3, genOptMap22), (4, genWrite22), (1, genMerged2), + def genProd2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], + genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], + testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[KeyedProducer[P, Int, Int]] = + frequency((25, genSource2.arbitrary), (3, genOptMap12), (3, genOptMap22), (4, genWrite22), (1, genMerged2), (0, also2), (3, genFlatMap22), (3, genFlatMap12)) - def genProd1[P <: Platform[P]](implicit genSource1 : Gen[Producer[P, Int]], - genSource2 : Gen[KeyedProducer[P, Int, Int]], - testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[Producer[P, Int]] = - frequency((25, genSource1), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (3, genFlatMap11), + def genProd1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], + genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], + testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[Producer[P, Int]] = + frequency((25, genSource1.arbitrary), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (3, genFlatMap11), (0, also1), (3, genFlatMap21)) } diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala index 3adc15aec..e87be96b9 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala @@ -16,36 +16,17 @@ package com.twitter.summingbird.storm - -import com.twitter.summingbird.batch.{ BatchID, Batcher } -import backtype.storm.testing.{ CompleteTopologyParam, MockedSources } -import com.twitter.algebird.{MapAlgebra, Monoid} -import com.twitter.storehaus.{ ReadableStore, JMapStore } +import com.twitter.storehaus.JMapStore import com.twitter.storehaus.algebra.MergeableStore import com.twitter.summingbird._ import com.twitter.summingbird.batch.{BatchID, Batcher} import com.twitter.summingbird.storm.spout.TraversableSpout -import com.twitter.tormenta.spout.Spout import com.twitter.util.Future -import java.util.{Collections, HashMap, Map => JMap, UUID} -import java.util.concurrent.atomic.AtomicInteger -import org.specs._ -import org.scalacheck._ -import org.scalacheck.Prop._ -import org.scalacheck.Properties - -import scala.collection.JavaConverters._ -import com.twitter.storehaus.{ ReadableStore, JMapStore } -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} -import com.twitter.summingbird.TestGraphs - import org.scalacheck._ import Gen._ -import Arbitrary.arbitrary +import Arbitrary._ import org.scalacheck.Prop._ -import scala.collection.mutable.{Map => MMap} - object TopologyPlannerLaws extends Properties("StormDag") { @@ -58,12 +39,10 @@ object TopologyPlannerLaws extends Properties("StormDag") { implicit def sink2: Storm#Sink[(Int, Int)] = (() => ((_) => Future.Unit)) implicit def testStore: Storm#Store[Int, Int] = MergeableStoreSupplier.from {MergeableStore.fromStore[(Int, BatchID), Int](new JMapStore[(Int, BatchID), Int]())} - def buildSource() = { - println("Creating source1") - - } - implicit def genSource1: Gen[Producer[Storm, Int]] = Gen.choose(1,100000000).map(x => Storm.source(TraversableSpout(List[Int]()))) - implicit def genSource2: Gen[KeyedProducer[Storm, Int, Int]] = Gen.choose(1,100000000).map(x => IdentityKeyedProducer(Storm.source(TraversableSpout(List[(Int, Int)]())))) + + implicit def arbSource1: Arbitrary[Producer[Storm, Int]] = Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[Int]).map{x: List[Int] => Storm.source(TraversableSpout(x))}) + implicit def arbSource2: Arbitrary[KeyedProducer[Storm, Int, Int]] = Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[(Int, Int)]).map{x: List[(Int, Int)] => IdentityKeyedProducer(Storm.source(TraversableSpout(x)))}) + lazy val genDag : Gen[StormDag]= for { tail <- summed