Skip to content

Commit

Permalink
Merge pull request twitter#321 from twitter/feature/autoFormatterRan
Browse files Browse the repository at this point in the history
Feature/auto formatter ran
  • Loading branch information
johnynek committed Jun 19, 2014
2 parents aab9b0c + 93b397e commit a3452bf
Show file tree
Hide file tree
Showing 76 changed files with 2,595 additions and 2,614 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,36 @@ package com.twitter.algebird

import scala.annotation.tailrec

/** Some functions to create or convert AdaptiveVectors
/**
* Some functions to create or convert AdaptiveVectors
*/
object AdaptiveVector {
/** When density >= this value * size, we switch to dense vectors
/**
* When density >= this value * size, we switch to dense vectors
*/
val THRESHOLD = 0.25
def fill[V](size: Int)(sparse: V): AdaptiveVector[V] = SparseVector(Map.empty[Int,V], sparse, size)
def fill[V](size: Int)(sparse: V): AdaptiveVector[V] = SparseVector(Map.empty[Int, V], sparse, size)

def fromVector[V](v: Vector[V], sparseVal: V): AdaptiveVector[V] = {
if(v.size == 0) {
if (v.size == 0) {
fill[V](0)(sparseVal)
}
else {
} else {
val denseCount = v.count { _ != sparseVal }
val sz = v.size
if(denseCount < sz * THRESHOLD)
if (denseCount < sz * THRESHOLD)
SparseVector(toMap(v, sparseVal), sparseVal, sz)
else
DenseVector(v, sparseVal, denseCount)
}
}
def fromMap[V](m: Map[Int, V], sparseVal: V, sizeOfDense: Int): AdaptiveVector[V] = {
if(m.size == 0) {
if (m.size == 0) {
fill[V](sizeOfDense)(sparseVal)
}
else {
} else {
val maxIdx = m.keys.max
require(maxIdx < sizeOfDense, "Max key (" + maxIdx +") exceeds valid for size (" + sizeOfDense +")")
require(maxIdx < sizeOfDense, "Max key (" + maxIdx + ") exceeds valid for size (" + sizeOfDense + ")")
val denseCount = m.count { _._2 != sparseVal }
if(denseCount < sizeOfDense * THRESHOLD)
if (denseCount < sizeOfDense * THRESHOLD)
SparseVector(m, sparseVal, sizeOfDense)
else
DenseVector(toVector(m, sparseVal, sizeOfDense), sparseVal, denseCount)
Expand All @@ -63,12 +63,12 @@ object AdaptiveVector {
def toMap[V](iseq: IndexedSeq[V], sparse: V): Map[Int, V] =
iseq.view.zipWithIndex.filter { _._1 != sparse }.map { _.swap }.toMap

def toVector[V](m: Map[Int,V], sparse: V, size: Int): Vector[V] = {
def toVector[V](m: Map[Int, V], sparse: V, size: Int): Vector[V] = {
// Mutable local variable to optimize performance
import scala.collection.mutable.Buffer
val buf = Buffer.fill[V](size)(sparse)
m.foreach { case (idx, v) => buf(idx) = v }
Vector( buf :_* )
Vector(buf: _*)
}

def toVector[V](v: AdaptiveVector[V]): Vector[V] =
Expand All @@ -78,34 +78,33 @@ object AdaptiveVector {
}

private def withSparse[V](v: AdaptiveVector[V], sv: V): AdaptiveVector[V] =
if(v.sparseValue == sv) v
if (v.sparseValue == sv) v
else fromVector(toVector(v), sv)

private class AVSemigroup[V:Semigroup] extends Semigroup[AdaptiveVector[V]] {
private class AVSemigroup[V: Semigroup] extends Semigroup[AdaptiveVector[V]] {
private def valueIsNonZero(v: V): Boolean = implicitly[Semigroup[V]] match {
case m: Monoid[_] => m.isNonZero(v)
case _ => true
}

def plus(left: AdaptiveVector[V], right: AdaptiveVector[V]) = {
if(left.sparseValue != right.sparseValue) {
if(left.denseCount > right.denseCount) plus(withSparse(left, right.sparseValue), right)
if (left.sparseValue != right.sparseValue) {
if (left.denseCount > right.denseCount) plus(withSparse(left, right.sparseValue), right)
else plus(left, withSparse(right, left.sparseValue))
}
else {
} else {
// they have the same sparse value
val maxSize = Ordering[Int].max(left.size, right.size)
(left, right) match {
case (DenseVector(lv, ls, ld), DenseVector(rv, rs, rd)) =>
val vec = Semigroup.plus[IndexedSeq[V]](lv, rv) match {
case v: Vector[_] => v.asInstanceOf[Vector[V]]
case notV => Vector(notV : _*)
case notV => Vector(notV: _*)
}
fromVector(vec, ls)

case _ if valueIsNonZero(left.sparseValue) =>
fromVector(Vector(Semigroup.plus(toVector(left):IndexedSeq[V],
toVector(right):IndexedSeq[V]):_*),
fromVector(Vector(Semigroup.plus(toVector(left): IndexedSeq[V],
toVector(right): IndexedSeq[V]): _*),
left.sparseValue)
case _ => // sparse is zero:
fromMap(Semigroup.plus(toMap(left), toMap(right)),
Expand All @@ -115,36 +114,36 @@ object AdaptiveVector {
}
}
}
private class AVMonoid[V:Monoid] extends AVSemigroup[V] with Monoid[AdaptiveVector[V]] {
private class AVMonoid[V: Monoid] extends AVSemigroup[V] with Monoid[AdaptiveVector[V]] {
val zero = AdaptiveVector.fill[V](0)(Monoid.zero[V])
override def isNonZero(v: AdaptiveVector[V]) = !isZero(v)

def isZero(v: AdaptiveVector[V]) = (v.size == 0) || {
val sparseAreZero = if(Monoid.isNonZero(v.sparseValue)) (v.denseCount == v.size) else true
val sparseAreZero = if (Monoid.isNonZero(v.sparseValue)) (v.denseCount == v.size) else true
sparseAreZero &&
v.denseIterator.forall { idxv => !Monoid.isNonZero(idxv._2) }
}
}
private class AVGroup[V:Group] extends AVMonoid[V] with Group[AdaptiveVector[V]] {
private class AVGroup[V: Group] extends AVMonoid[V] with Group[AdaptiveVector[V]] {
override def negate(v: AdaptiveVector[V]) =
fromVector(toVector(v).map(Group.negate(_)), Group.negate(v.sparseValue))
}

implicit def semigroup[V:Semigroup]: Semigroup[AdaptiveVector[V]] = new AVSemigroup[V]
implicit def monoid[V:Monoid]: Monoid[AdaptiveVector[V]] = new AVMonoid[V]
implicit def group[V:Group]: Group[AdaptiveVector[V]] = new AVGroup[V]
implicit def semigroup[V: Semigroup]: Semigroup[AdaptiveVector[V]] = new AVSemigroup[V]
implicit def monoid[V: Monoid]: Monoid[AdaptiveVector[V]] = new AVMonoid[V]
implicit def group[V: Group]: Group[AdaptiveVector[V]] = new AVGroup[V]

/*
* Equality when considering only the dense values (so size doesn't matter)
*/
def denseEquiv[V:Equiv]: Equiv[AdaptiveVector[V]] = Equiv.fromFunction[AdaptiveVector[V]] { (l, r) =>
def denseEquiv[V: Equiv]: Equiv[AdaptiveVector[V]] = Equiv.fromFunction[AdaptiveVector[V]] { (l, r) =>
val (lit, rit) = (l.denseIterator, r.denseIterator)
@tailrec
def iteq: Boolean =
(lit.hasNext, rit.hasNext) match {
case (true, true) =>
val (lnext, rnext) = (lit.next, rit.next)
if(lnext._1 == rnext._1 && Equiv[V].equiv(lnext._2, rnext._2))
if (lnext._1 == rnext._1 && Equiv[V].equiv(lnext._2, rnext._2))
iteq
else
false
Expand All @@ -154,14 +153,15 @@ object AdaptiveVector {
Equiv[V].equiv(l.sparseValue, r.sparseValue) && iteq
}

implicit def equiv[V:Equiv]: Equiv[AdaptiveVector[V]] =
implicit def equiv[V: Equiv]: Equiv[AdaptiveVector[V]] =
Equiv.fromFunction[AdaptiveVector[V]] { (l, r) =>
(l.size == r.size) && (denseEquiv[V].equiv(l, r) ||
toVector(l).view.zip(toVector(r)).forall { case (lv, rv) => Equiv[V].equiv(lv, rv) })
}
}

/** An IndexedSeq that automatically switches representation between dense and sparse depending on sparsity
/**
* An IndexedSeq that automatically switches representation between dense and sparse depending on sparsity
* Should be an efficient representation for all sizes, and it should not be necessary to special case
* immutable algebras based on the sparsity of the vectors.
*/
Expand All @@ -176,7 +176,7 @@ sealed trait AdaptiveVector[V] extends IndexedSeq[V] {
/** Grow by adding count sparse values to the end */
def extend(count: Int): AdaptiveVector[V]
/** Iterator of indices and values of all non-sparse values */
def denseIterator: Iterator[(Int,V)]
def denseIterator: Iterator[(Int, V)]
/*
* Note that IndexedSeq provides hashCode and equals that
* work correctly based on length and apply.
Expand All @@ -189,24 +189,22 @@ case class DenseVector[V](iseq: Vector[V], override val sparseValue: V, override
override def size = iseq.size
def apply(idx: Int) = iseq(idx)
def updated(idx: Int, v: V): AdaptiveVector[V] = {
val oldIsSparse = if(iseq(idx) == sparseValue) 1 else 0
val newIsSparse = if(v == sparseValue) 1 else 0
val oldIsSparse = if (iseq(idx) == sparseValue) 1 else 0
val newIsSparse = if (v == sparseValue) 1 else 0
val newCount = denseCount - newIsSparse + oldIsSparse
if( denseCount < size * AdaptiveVector.THRESHOLD ) {
if (denseCount < size * AdaptiveVector.THRESHOLD) {
// Go sparse
SparseVector(AdaptiveVector.toMap(iseq, sparseValue), sparseValue, size)
}
else {
} else {
DenseVector(iseq.updated(idx, v), sparseValue, newCount)
}
}
def extend(cnt: Int) = {
val newSize = size + cnt
if(denseCount < newSize * AdaptiveVector.THRESHOLD) {
if (denseCount < newSize * AdaptiveVector.THRESHOLD) {
// Go sparse
SparseVector(AdaptiveVector.toMap(iseq, sparseValue), sparseValue, newSize)
}
else {
} else {
// Stay dense
DenseVector(iseq ++ Vector.fill(cnt)(sparseValue), sparseValue, denseCount)
}
Expand All @@ -216,25 +214,22 @@ case class DenseVector[V](iseq: Vector[V], override val sparseValue: V, override
iseq.view.zipWithIndex.filter { _._1 != sparseValue }.map { _.swap }.iterator
}

case class SparseVector[V](map: Map[Int, V], override val sparseValue: V, override val size: Int) extends
AdaptiveVector[V] {
case class SparseVector[V](map: Map[Int, V], override val sparseValue: V, override val size: Int) extends AdaptiveVector[V] {

def denseCount: Int = map.size
def apply(idx: Int) = {
require(idx >= 0 && idx < size, "Index out of range")
map.getOrElse(idx, sparseValue)
}
def updated(idx: Int, v: V): AdaptiveVector[V] = {
if(v == sparseValue) {
if (v == sparseValue) {
SparseVector(map - idx, sparseValue, size)
}
else {
} else {
val newM = map + (idx -> v)
if(newM.size >= size * AdaptiveVector.THRESHOLD) {
if (newM.size >= size * AdaptiveVector.THRESHOLD) {
// Go dense:
DenseVector(AdaptiveVector.toVector(newM, sparseValue, size), sparseValue, newM.size)
}
else {
} else {
SparseVector(newM, sparseValue, size)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package com.twitter.algebird
* f(x) = slope * x + intercept
*/
case class AffineFunction[R](slope: R, intercept: R) extends java.io.Serializable {
def toFn(implicit ring: Ring[R]): Function1[R,R] = {x => this.apply(x)(ring) }
def apply(x:R)(implicit ring: Ring[R]) = ring.plus(ring.times(slope, x), intercept)
def toFn(implicit ring: Ring[R]): Function1[R, R] = { x => this.apply(x)(ring) }
def apply(x: R)(implicit ring: Ring[R]) = ring.plus(ring.times(slope, x), intercept)
}

/**
Expand All @@ -46,5 +46,5 @@ class AffineFunctionMonoid[R](implicit ring: Ring[R]) extends Monoid[AffineFunct
}

object AffineFunction extends java.io.Serializable {
implicit def monoid[R:Ring]: Monoid[AffineFunction[R]] = new AffineFunctionMonoid[R]
implicit def monoid[R: Ring]: Monoid[AffineFunction[R]] = new AffineFunctionMonoid[R]
}
89 changes: 45 additions & 44 deletions algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,79 +8,80 @@ package com.twitter.algebird
* GeneratedTupleAggregator.from2((agg1, agg2))
*/
object Aggregator extends java.io.Serializable {
/** Using Aggregator.prepare,present you can add to this aggregator
/**
* Using Aggregator.prepare,present you can add to this aggregator
*/
def fromReduce[T](red: (T,T) => T): Aggregator[T,T,T] = new Aggregator[T,T,T] {
def prepare(input : T) = input
def reduce(l : T, r : T) = red(l, r)
def present(reduction : T) = reduction
def fromReduce[T](red: (T, T) => T): Aggregator[T, T, T] = new Aggregator[T, T, T] {
def prepare(input: T) = input
def reduce(l: T, r: T) = red(l, r)
def present(reduction: T) = reduction
}
def fromSemigroup[T](implicit sg: Semigroup[T]): Aggregator[T,T,T] = new Aggregator[T,T,T] {
def prepare(input : T) = input
def reduce(l : T, r : T) = sg.plus(l, r)
def present(reduction : T) = reduction
def fromSemigroup[T](implicit sg: Semigroup[T]): Aggregator[T, T, T] = new Aggregator[T, T, T] {
def prepare(input: T) = input
def reduce(l: T, r: T) = sg.plus(l, r)
def present(reduction: T) = reduction
}
def fromMonoid[T](implicit mon: Monoid[T]): MonoidAggregator[T,T,T] = fromMonoid[T,T](mon,identity[T])
def fromMonoid[T](implicit mon: Monoid[T]): MonoidAggregator[T, T, T] = fromMonoid[T, T](mon, identity[T])
// Uses the product from the ring
def fromRing[T](implicit rng: Ring[T]): RingAggregator[T,T,T] = fromRing[T,T](rng,identity[T])
def fromRing[T](implicit rng: Ring[T]): RingAggregator[T, T, T] = fromRing[T, T](rng, identity[T])

def fromMonoid[F,T](implicit mon: Monoid[T],prep: F=>T): MonoidAggregator[F,T,T] = new MonoidAggregator[F,T,T] {
def prepare(input : F) = prep(input)
def fromMonoid[F, T](implicit mon: Monoid[T], prep: F => T): MonoidAggregator[F, T, T] = new MonoidAggregator[F, T, T] {
def prepare(input: F) = prep(input)
def monoid = mon
def present(reduction : T) = reduction
def present(reduction: T) = reduction
}
// Uses the product from the ring
def fromRing[F,T](implicit rng: Ring[T],prep: F=>T): RingAggregator[F,T,T] = new RingAggregator[F,T,T] {
def prepare(input : F) = prep(input)
def fromRing[F, T](implicit rng: Ring[T], prep: F => T): RingAggregator[F, T, T] = new RingAggregator[F, T, T] {
def prepare(input: F) = prep(input)
def ring = rng
def present(reduction : T) = reduction
def present(reduction: T) = reduction
}
}

trait Aggregator[-A,B,+C] extends java.io.Serializable { self =>
def prepare(input : A) : B
def reduce(l : B, r : B) : B
def present(reduction : B) : C
trait Aggregator[-A, B, +C] extends java.io.Serializable { self =>
def prepare(input: A): B
def reduce(l: B, r: B): B
def present(reduction: B): C

def reduce(items : TraversableOnce[B]) : B = items.reduce{reduce(_,_)}
def apply(inputs : TraversableOnce[A]) : C = present(reduce(inputs.map{prepare(_)}))
def reduce(items: TraversableOnce[B]): B = items.reduce{ reduce(_, _) }
def apply(inputs: TraversableOnce[A]): C = present(reduce(inputs.map{ prepare(_) }))

def append(l: B,r: A): B=reduce(l,prepare(r))
def append(l: B, r: A): B = reduce(l, prepare(r))

def appendAll(old: B, items: TraversableOnce[A]): B =
if (items.isEmpty) old else reduce(old, reduce(items.map(prepare)))

/** Like calling andThen on the present function */
def andThenPresent[D](present2: C => D): Aggregator[A,B,D] =
new Aggregator[A,B,D] {
def prepare(input : A) = self.prepare(input)
def reduce(l : B, r : B) = self.reduce(l, r)
def present(reduction : B) = present2(self.present(reduction))
def andThenPresent[D](present2: C => D): Aggregator[A, B, D] =
new Aggregator[A, B, D] {
def prepare(input: A) = self.prepare(input)
def reduce(l: B, r: B) = self.reduce(l, r)
def present(reduction: B) = present2(self.present(reduction))
}
/** Like calling compose on the prepare function */
def composePrepare[A1](prepare2: A1 => A): Aggregator[A1,B,C] =
new Aggregator[A1,B,C] {
def prepare(input : A1) = self.prepare(prepare2(input))
def reduce(l : B, r : B) = self.reduce(l, r)
def present(reduction : B) = self.present(reduction)
def composePrepare[A1](prepare2: A1 => A): Aggregator[A1, B, C] =
new Aggregator[A1, B, C] {
def prepare(input: A1) = self.prepare(prepare2(input))
def reduce(l: B, r: B) = self.reduce(l, r)
def present(reduction: B) = self.present(reduction)
}
}

trait MonoidAggregator[-A,B,+C] extends Aggregator[A,B,C] {
def monoid : Monoid[B]
final def reduce(l : B, r : B) : B = monoid.plus(l, r)
final override def reduce(items : TraversableOnce[B]) : B =
trait MonoidAggregator[-A, B, +C] extends Aggregator[A, B, C] {
def monoid: Monoid[B]
final def reduce(l: B, r: B): B = monoid.plus(l, r)
final override def reduce(items: TraversableOnce[B]): B =
monoid.sum(items)

def appendAll(items:TraversableOnce[A]): B=appendAll(monoid.zero,items)
def appendAll(items: TraversableOnce[A]): B = appendAll(monoid.zero, items)
}

trait RingAggregator[-A,B,+C] extends Aggregator[A,B,C] {
trait RingAggregator[-A, B, +C] extends Aggregator[A, B, C] {
def ring: Ring[B]
final def reduce(l: B, r: B): B = ring.times(l,r)
final override def reduce(items : TraversableOnce[B]) : B =
if(items.isEmpty) ring.one // There are several pseudo-rings, so avoid one if you can
final def reduce(l: B, r: B): B = ring.times(l, r)
final override def reduce(items: TraversableOnce[B]): B =
if (items.isEmpty) ring.one // There are several pseudo-rings, so avoid one if you can
else items.reduceLeft(reduce _)

def appendAll(items:TraversableOnce[A]): B=appendAll(ring.one,items)
def appendAll(items: TraversableOnce[A]): B = appendAll(ring.one, items)
}
Loading

0 comments on commit a3452bf

Please # to comment.