diff --git a/algebird-caliper/src/test/scala/com/twitter/algebird/caliper/CMSHashingBenchmark.scala b/algebird-caliper/src/test/scala/com/twitter/algebird/caliper/CMSHashingBenchmark.scala new file mode 100644 index 000000000..c355a8ac8 --- /dev/null +++ b/algebird-caliper/src/test/scala/com/twitter/algebird/caliper/CMSHashingBenchmark.scala @@ -0,0 +1,100 @@ +package com.twitter.algebird.caliper + +import com.google.caliper.{Param, SimpleBenchmark} + +/** + * Benchmarks the hashing algorithms used by Count-Min sketch for CMS[BigInt]. + * + * The input values are generated ahead of time to ensure that each trial uses the same input (and that the RNG is not + * influencing the runtime of the trials). + * + * More details available at https://github.com/twitter/algebird/issues/392. + */ +// Once we can convince cappi (https://github.com/softprops/capp) -- the sbt plugin we use to run +// caliper benchmarks -- to work with the latest caliper 1.0-beta-1, we would: +// - Let `CMSHashingBenchmark` extend `Benchmark` (instead of `SimpleBenchmark`) +// - Annotate `timePlus` with `@MacroBenchmark`. +class CMSHashingBenchmark extends SimpleBenchmark { + + /** + * The `a` parameter for CMS' default ("legacy") hashing algorithm: `h_i(x) = a_i * x + b_i (mod p)`. + */ + @Param(Array("5123456")) + val a: Int = 0 + + /** + * The `b` parameter for CMS' default ("legacy") hashing algorithm: `h_i(x) = a_i * x + b_i (mod p)`. + * + * Algebird's CMS implementation hard-codes `b` to `0`. + */ + @Param(Array("0")) + val b: Int = 0 + + /** + * Width of the counting table. + */ + @Param(Array("11" /* eps = 0.271 */ , "544" /* eps = 0.005 */ , "2719" /* eps = 1E-3 */ , "271829" /* eps = 1E-5 */)) + val width: Int = 0 + + /** + * Number of operations per benchmark repetition. + */ + @Param(Array("100000")) + val operations: Int = 0 + + /** + * Maximum number of bits for randomly generated BigInt instances. + */ + @Param(Array("128", "1024", "2048")) + val maxBits: Int = 0 + + var random: scala.util.Random = _ + var inputs: Seq[BigInt] = _ + + override def setUp() { + random = new scala.util.Random + // We draw numbers randomly from a 2^maxBits address space. + inputs = (1 to operations).view.map { _ => scala.math.BigInt(maxBits, random)} + } + + private def murmurHashScala(a: Int, b: Int, width: Int)(x: BigInt) = { + val hash: Int = scala.util.hashing.MurmurHash3.arrayHash(x.toByteArray, a) + val h = { + // We only want positive integers for the subsequent modulo. This method mimics Java's Hashtable + // implementation. The Java code uses `0x7FFFFFFF` for the bit-wise AND, which is equal to Int.MaxValue. + val positiveHash = hash & Int.MaxValue + positiveHash % width + } + assert(h >= 0, "hash must not be negative") + h + } + + private val PRIME_MODULUS = (1L << 31) - 1 + + private def brokenCurrentHash(a: Int, b: Int, width: Int)(x: BigInt) = { + val unModded: BigInt = (x * a) + b + val modded: BigInt = (unModded + (unModded >> 32)) & PRIME_MODULUS + val h = modded.toInt % width + assert(h >= 0, "hash must not be negative") + h + } + + def timeBrokenCurrentHashWithRandomMaxBitsNumbers(operations: Int): Int = { + var dummy = 0 + while (dummy < operations) { + inputs.foreach { input => brokenCurrentHash(a, b, width)(input)} + dummy += 1 + } + dummy + } + + def timeMurmurHashScalaWithRandomMaxBitsNumbers(operations: Int): Int = { + var dummy = 0 + while (dummy < operations) { + inputs.foreach { input => murmurHashScala(a, b, width)(input)} + dummy += 1 + } + dummy + } + +} \ No newline at end of file diff --git a/algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala index a2fdd0263..97519553d 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala @@ -479,7 +479,7 @@ case class CMSInstance[K: Ordering](countsTable: CMSInstance.CountsTable[K], * Let X be a CMS, and let count_X[j, k] denote the value in X's 2-dimensional count table at row j and column k. * Then the Count-Min sketch estimate of the inner product between A and B is the minimum inner product between their * rows: - * estimatedInnerProduct = min_j (\sum_k count_A[j, k] * count_B[j, k]) + * estimatedInnerProduct = min_j (\sum_k count_A[j, k] * count_B[j, k]|) */ def innerProduct(other: CMS[K]): Approximate[Long] = { other match { @@ -491,7 +491,8 @@ case class CMSInstance[K: Ordering](countsTable: CMSInstance.CountsTable[K], }.sum val est = (0 to (depth - 1)).iterator.map { innerProductAtDepth }.min - Approximate(est - (eps * totalCount * other.totalCount).toLong, est, est, 1 - delta) + val minimum = math.max(est - (eps * totalCount * other.totalCount).toLong, 0) + Approximate(minimum, est, est, 1 - delta) case _ => other.innerProduct(this) } } @@ -663,14 +664,19 @@ case class TopCMSItem[K: Ordering](item: K, override val cms: CMS[K], params: To override val heavyHitters: Set[K] = Set(item) - override def +(x: K, count: Long): TopCMS[K] = TopCMSInstance(cms, params) + item + (x, count) + override def +(x: K, count: Long): TopCMS[K] = toCMSInstance + (x, count) override def ++(other: TopCMS[K]): TopCMS[K] = other match { case other: TopCMSZero[_] => this - case other: TopCMSItem[K] => TopCMSInstance[K](cms, params) + item + other.item + case other: TopCMSItem[K] => toCMSInstance + other.item case other: TopCMSInstance[K] => other + item } + private def toCMSInstance: TopCMSInstance[K] = { + val hhs = HeavyHitters.from(HeavyHitter(item, 1L)) + TopCMSInstance(cms, hhs, params) + } + } object TopCMSInstance { @@ -798,6 +804,8 @@ object HeavyHitters { def from[K: Ordering](hhs: Set[HeavyHitter[K]]): HeavyHitters[K] = HeavyHitters(hhs.foldLeft(emptyHhs)(_ + _)) + def from[K: Ordering](hh: HeavyHitter[K]): HeavyHitters[K] = HeavyHitters(emptyHhs + hh) + } case class HeavyHitter[K](item: K, count: Long) extends java.io.Serializable @@ -850,7 +858,7 @@ class TopPctCMSMonoid[K: Ordering](cms: CMS[K], heavyHittersPct: Double = 0.01) /** * Creates a sketch out of a single item. */ - def create(item: K): TopCMS[K] = TopCMSItem[K](item, cms, params) + def create(item: K): TopCMS[K] = TopCMSItem[K](item, cms + item, params) /** * Creates a sketch out of multiple items. @@ -964,7 +972,7 @@ class TopNCMSMonoid[K: Ordering](cms: CMS[K], heavyHittersN: Int = 100) extends /** * Creates a sketch out of a single item. */ - def create(item: K): TopCMS[K] = TopCMSItem[K](item, cms, params) + def create(item: K): TopCMS[K] = TopCMSItem[K](item, cms + item, params) /** * Creates a sketch out of multiple items. @@ -1022,7 +1030,7 @@ case class TopNCMSAggregator[K](cmsMonoid: TopNCMSMonoid[K]) */ trait CMSHasher[K] extends java.io.Serializable { - val PRIME_MODULUS = (1L << 31) - 1 + val PRIME_MODULUS = Int.MaxValue /** * Returns `a * x + b (mod p) (mod width)`. @@ -1047,7 +1055,7 @@ object CMSHasherImplicits { implicit object CMSHasherLong extends CMSHasher[Long] { - def hash(a: Int, b: Int, width: Int)(x: Long) = { + override def hash(a: Int, b: Int, width: Int)(x: Long): Int = { val unModded: Long = (x * a) + b // Apparently a super fast way of computing x mod 2^p-1 // See page 149 of http://www.cs.princeton.edu/courses/archive/fall09/cos521/Handouts/universalclasses.pdf @@ -1061,32 +1069,55 @@ object CMSHasherImplicits { implicit object CMSHasherShort extends CMSHasher[Short] { - def hash(a: Int, b: Int, width: Int)(x: Short) = CMSHasherInt.hash(a, b, width)(x) + override def hash(a: Int, b: Int, width: Int)(x: Short): Int = CMSHasherInt.hash(a, b, width)(x) } implicit object CMSHasherInt extends CMSHasher[Int] { - def hash(a: Int, b: Int, width: Int)(x: Int) = { + override def hash(a: Int, b: Int, width: Int)(x: Int): Int = { val unModded: Int = (x * a) + b val modded: Long = (unModded + (unModded >> 32)) & PRIME_MODULUS - val h = modded.toInt % width - assert(h >= 0, "hash must not be negative") - h + modded.toInt % width } } implicit object CMSHasherBigInt extends CMSHasher[BigInt] { - def hash(a: Int, b: Int, width: Int)(x: BigInt) = { - val unModded: BigInt = (x * a) + b - val modded: BigInt = (unModded + (unModded >> 32)) & PRIME_MODULUS - val h = modded.toInt % width - assert(h >= 0, "hash must not be negative") - h + /** + * =Implementation details= + * + * This hash function is based upon Murmur3. Note that the original CMS paper requires + * `d` (depth) pair-wise independent hash functions; in the specific case of Murmur3 we argue that it is sufficient + * to pass `d` different seed values to Murmur3 to achieve a similar effect. + * + * To seed Murmur3 we use only `a`, which is a randomly drawn `Int` via [[scala.util.Random]] in the CMS code. + * What is important to note is that we intentionally ignore `b`. Why? We need to ensure that we seed Murmur3 with + * a random value, notably one that is uniformly distributed. Somewhat surprisingly, combining two random values + * (such as `a` and `b` in our case) typically worsens the "randomness" of the combination, i.e. the combination is + * less uniformly distributed as either of its original inputs. Hence the combination of two random values is + * discouraged in this context, notably if the two random inputs were generated from the same source anyways, which + * is the case for us because we use Scala's PRNG only. + * + * For further details please refer to the discussion + * [[http://stackoverflow.com/questions/3956478/understanding-randomness Understanding Randomness]] on + * StackOverflow. + * + * @param a Must be a random value, typically created via [[scala.util.Random]]. + * @param b Ignored by this particular hash function, see the reasoning above for the justification. + * @param width Width of the CMS counting table, i.e. the width/size of each row in the counting table. + * @param x Item to be hashed. + * @return Slot assigned to item `x` in the vector of size `width`, where `x in [0, width)`. + */ + override def hash(a: Int, b: Int, width: Int)(x: BigInt): Int = { + val hash: Int = scala.util.hashing.MurmurHash3.arrayHash(x.toByteArray, a) + // We only want positive integers for the subsequent modulo. This method mimics Java's Hashtable + // implementation. The Java code uses `0x7FFFFFFF` for the bit-wise AND, which is equal to Int.MaxValue. + val positiveHash = hash & Int.MaxValue + positiveHash % width } } -} +} \ No newline at end of file diff --git a/algebird-test/src/test/scala/com/twitter/algebird/CountMinSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/CountMinSketchTest.scala index 742036839..367418534 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/CountMinSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/CountMinSketchTest.scala @@ -1,11 +1,13 @@ package com.twitter.algebird import org.scalatest.{ PropSpec, Matchers, WordSpec } -import org.scalatest.prop.PropertyChecks +import org.scalatest.prop.{ GeneratorDrivenPropertyChecks, PropertyChecks } import org.scalacheck.{ Gen, Arbitrary } import CMSHasherImplicits._ +import scala.util.Random + class CmsLaws extends PropSpec with PropertyChecks with Matchers { import BaseProperties._ @@ -92,7 +94,9 @@ class CMSIntTest extends CMSTest[Int] class CMSLongTest extends CMSTest[Long] class CMSBigIntTest extends CMSTest[BigInt] -abstract class CMSTest[K: Ordering: CMSHasher: Numeric] extends WordSpec with Matchers { +abstract class CMSTest[K: Ordering: CMSHasher: Numeric] extends WordSpec with Matchers with GeneratorDrivenPropertyChecks { + + import TestImplicits._ val DELTA = 1E-10 val EPS = 0.001 @@ -108,21 +112,6 @@ abstract class CMSTest[K: Ordering: CMSHasher: Numeric] extends WordSpec with Ma val RAND = new scala.util.Random - // Convenience methods to convert from `Int` to the actual `K` type, and we prefer these conversions to be explicit - // (cf. JavaConverters vs. JavaConversions). We use the name `toK` to clarify the intent and to prevent name conflicts - // with the existing `to[Col]` method in Scala. - implicit class IntCast(x: Int) { - def toK[A: Numeric]: A = implicitly[Numeric[A]].fromInt(x) - } - - implicit class SeqCast(xs: Seq[Int]) { - def toK[A: Numeric]: Seq[A] = xs map { _.toK[A] } - } - - implicit class SetCast(xs: Set[Int]) { - def toK[A: Numeric]: Set[A] = xs map { _.toK[A] } - } - /** * Returns the exact frequency of {x} in {data}. */ @@ -136,7 +125,7 @@ abstract class CMSTest[K: Ordering: CMSHasher: Numeric] extends WordSpec with Ma val counts1 = data1.groupBy(x => x).mapValues(_.size) val counts2 = data2.groupBy(x => x).mapValues(_.size) - (counts1.keys.toSet & counts2.keys.toSet).map { k => counts1(k) * counts2(k) }.sum + (counts1.keys.toSet & counts2.keys.toSet).toSeq.map { k => counts1(k) * counts2(k) }.sum } /** @@ -148,12 +137,24 @@ abstract class CMSTest[K: Ordering: CMSHasher: Numeric] extends WordSpec with Ma counts.filter { _._2 >= heavyHittersPct * totalCount }.keys.toSet } + /** + * Creates a random data stream. + * + * @param size Number of stream elements. + * @param range Elements are randomly drawn from [0, range). + * @return + */ + def createRandomStream(size: Int, range: Int, rnd: Random = RAND): Seq[K] = { + require(size > 0) + (1 to size).map { _ => rnd.nextInt(range) }.toK[K] + } + "A Count-Min sketch implementing CMSCounting" should { "count total number of elements in a stream" in { val totalCount = 1243 val range = 234 - val data = (0 to (totalCount - 1)).map { _ => RAND.nextInt(range) }.toK[K] + val data = createRandomStream(totalCount, range) val cms = COUNTING_CMS_MONOID.create(data) cms.totalCount should be(totalCount) @@ -162,7 +163,7 @@ abstract class CMSTest[K: Ordering: CMSHasher: Numeric] extends WordSpec with Ma "estimate frequencies" in { val totalCount = 5678 val range = 897 - val data = (0 to (totalCount - 1)).map { _ => RAND.nextInt(range) }.toK[K] + val data = createRandomStream(totalCount, range) val cms = COUNTING_CMS_MONOID.create(data) (0 to 100).foreach { _ => @@ -180,7 +181,11 @@ abstract class CMSTest[K: Ordering: CMSHasher: Numeric] extends WordSpec with Ma "exactly compute frequencies in a small stream" in { val one = COUNTING_CMS_MONOID.create(1.toK[K]) + one.frequency(1.toK[K]).estimate should be(1) + one.frequency(2.toK[K]).estimate should be(0) val two = COUNTING_CMS_MONOID.create(2.toK[K]) + two.frequency(1.toK[K]).estimate should be(0) + two.frequency(2.toK[K]).estimate should be(1) val cms = COUNTING_CMS_MONOID.plus(COUNTING_CMS_MONOID.plus(one, two), two) cms.frequency(0.toK[K]).estimate should be(0) @@ -196,23 +201,29 @@ abstract class CMSTest[K: Ordering: CMSHasher: Numeric] extends WordSpec with Ma } "estimate inner products" in { - val totalCount = 5234 - val range = 1390 - val data1 = (0 to (totalCount - 1)).map { _ => RAND.nextInt(range) }.toK[K] - val data2 = (0 to (totalCount - 1)).map { _ => RAND.nextInt(range) }.toK[K] - val cms1 = COUNTING_CMS_MONOID.create(data1) - val cms2 = COUNTING_CMS_MONOID.create(data1) + val totalCounts = Gen.choose(1, 10000) + val ranges = Gen.choose(100, 2000) + + forAll((totalCounts, "totalCount"), (ranges, "range"), minSuccessful(50)) { (totalCount: Int, range: Int) => + val data1 = createRandomStream(totalCount, range) + val data2 = createRandomStream(totalCount, range) + val cms1 = COUNTING_CMS_MONOID.create(data1) + val cms2 = COUNTING_CMS_MONOID.create(data2) + + val approxA = cms1.innerProduct(cms2) + val approx = approxA.estimate + val exact = exactInnerProduct(data1, data2) + val estimationError = approx - exact + val maxError = approx - approxA.min + val beWithinTolerance = be >= 0L and be <= maxError - val approxA = cms1.innerProduct(cms2) - val approx = approxA.estimate - val exact = exactInnerProduct(data1, data2) - val estimationError = approx - exact - val maxError = approx - approxA.min - val beWithinTolerance = be >= 0L and be <= maxError + // We do not support negative counts, hence the lower limit of a frequency is 0 but never negative. + approxA.min should be >= 0L - approx should be(cms2.innerProduct(cms1).estimate) - approx should be >= exact - estimationError should beWithinTolerance + approx should be(cms2.innerProduct(cms1).estimate) + approx should be >= exact + estimationError should beWithinTolerance + } } "exactly compute inner product of small streams" in { @@ -272,6 +283,44 @@ abstract class CMSTest[K: Ordering: CMSHasher: Numeric] extends WordSpec with Ma "A Top-% Count-Min sketch implementing CMSHeavyHitters" should { + "create correct sketches out of a single item" in { + forAll{ (x: Int) => + val data = x.toK[K] + val cmsMonoid = { + val anyHeavyHittersPct = 0.1 // exact setting not relevant for this test + TopPctCMS.monoid[K](EPS, DELTA, SEED, anyHeavyHittersPct) + } + val topCms = cmsMonoid.create(data) + topCms.totalCount should be(1) + topCms.cms.totalCount should be(1) + topCms.frequency(x.toK[K]).estimate should be(1) + // Poor man's way to come up with an item that is not x and that is very unlikely to hash to the same slot. + val otherItem = x + 1 + topCms.frequency(otherItem.toK[K]).estimate should be(0) + // The following assert indirectly verifies whether the counting table is not all-zero (cf. GH-393). + topCms.innerProduct(topCms).estimate should be(1) + } + } + + "create correct sketches out of a single-item stream" in { + forAll{ (x: Int) => + val data = Seq(x).toK[K] + val cmsMonoid = { + val anyHeavyHittersPct = 0.1 // exact setting not relevant for this test + TopPctCMS.monoid[K](EPS, DELTA, SEED, anyHeavyHittersPct) + } + val topCms = cmsMonoid.create(data) + topCms.totalCount should be(1) + topCms.cms.totalCount should be(1) + topCms.frequency(x.toK[K]).estimate should be(1) + // Poor man's way to come up with an item that is not x and that is very unlikely to hash to the same slot. + val otherItem = x + 1 + topCms.frequency(otherItem.toK[K]).estimate should be(0) + // The following assert indirectly verifies whether the counting table is not all-zero (cf. GH-393). + topCms.innerProduct(topCms).estimate should be(1) + } + } + "estimate heavy hitters" in { // Simple way of making some elements appear much more often than others. val data1 = (1 to 3000).map { _ => RAND.nextInt(3) }.toK[K] @@ -405,6 +454,44 @@ abstract class CMSTest[K: Ordering: CMSHasher: Numeric] extends WordSpec with Ma // merge heavy hitters correctly. This is because merging top-N based heavy hitters is not an associative // operation. + "create correct sketches out of a single item" in { + forAll{ (x: Int) => + val data = x.toK[K] + val cmsMonoid = { + val anyHeavyHittersN = 2 // exact setting not relevant for this test + TopNCMS.monoid[K](EPS, DELTA, SEED, anyHeavyHittersN) + } + val topCms = cmsMonoid.create(data) + topCms.totalCount should be(1) + topCms.cms.totalCount should be(1) + topCms.frequency(x.toK[K]).estimate should be(1) + // Poor man's way to come up with an item that is not x and that is very unlikely to hash to the same slot. + val otherItem = x + 1 + topCms.frequency(otherItem.toK[K]).estimate should be(0) + // The following assert indirectly verifies whether the counting table is not all-zero (cf. GH-393). + topCms.innerProduct(topCms).estimate should be(1) + } + } + + "create correct sketches out of a single-item stream" in { + forAll{ (x: Int) => + val data = Seq(x).toK[K] + val cmsMonoid = { + val anyHeavyHittersN = 2 // exact setting not relevant for this test + TopNCMS.monoid[K](EPS, DELTA, SEED, anyHeavyHittersN) + } + val topCms = cmsMonoid.create(data) + topCms.totalCount should be(1) + topCms.cms.totalCount should be(1) + topCms.frequency(x.toK[K]).estimate should be(1) + // Poor man's way to come up with an item that is not x and that is very unlikely to hash to the same slot. + val otherItem = x + 1 + topCms.frequency(otherItem.toK[K]).estimate should be(0) + // The following assert indirectly verifies whether the counting table is not all-zero (cf. GH-393). + topCms.innerProduct(topCms).estimate should be(1) + } + } + // This test involves merging of top-N CMS instances, which is not an associative operation. This means that the // success or failure of this test depends on the merging order and/or the test data characteristics. "(when adding CMS instances) drop old heavy hitters when new heavy hitters replace them, if merge order matches data" in { @@ -642,6 +729,26 @@ class CMSParamsSpec extends PropSpec with PropertyChecks with Matchers { } +class CMSHasherShortSpec extends CMSHasherSpec[Short] +class CMSHasherIntSpec extends CMSHasherSpec[Int] +class CMSHasherLongSpec extends CMSHasherSpec[Long] +class CMSHasherBigIntSpec extends CMSHasherSpec[BigInt] + +abstract class CMSHasherSpec[K: CMSHasher: Numeric] extends PropSpec with PropertyChecks with Matchers { + + import TestImplicits._ + + property("returns positive hashes (i.e. slots) only") { + forAll { (a: Int, b: Int, width: Int, x: Int) => + whenever (width > 0) { + val hash = CMSHash[K](a, b, width) + hash(x.toK[K]) should be >= 0 + } + } + } + +} + /** * This spec verifies that we provide legacy types for the CMS and CountMinSketchMonoid classes we had in Algebird * versions < 0.8.1. Note that this spec is not meant to verify their actual functionality. @@ -679,4 +786,23 @@ class LegacyCMSSpec extends WordSpec with Matchers { } +} + +object TestImplicits { + + // Convenience methods to convert from `Int` to the actual `K` type, and we prefer these conversions to be explicit + // (cf. JavaConverters vs. JavaConversions). We use the name `toK` to clarify the intent and to prevent name conflicts + // with the existing `to[Col]` method in Scala. + implicit class IntCast(x: Int) { + def toK[A: Numeric]: A = implicitly[Numeric[A]].fromInt(x) + } + + implicit class SeqCast(xs: Seq[Int]) { + def toK[A: Numeric]: Seq[A] = xs map { _.toK[A] } + } + + implicit class SetCast(xs: Set[Int]) { + def toK[A: Numeric]: Set[A] = xs map { _.toK[A] } + } + } \ No newline at end of file