diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala index be0ec03bc..03e10c48d 100644 --- a/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala @@ -177,11 +177,16 @@ sealed trait TailProducer[P <: Platform[P], +T] extends Producer[P, T] { * This can be used to combine two independent Producers in a way that ensures * that the Platform will plan both into a single Plan. */ + + def also[R](that: TailProducer[P, R])(implicit ev: DummyImplicit): TailProducer[P, R] = + new AlsoTailProducer(this, that) + def also[R](that: Producer[P, R]): Producer[P, R] = AlsoProducer(this, that) override def name(id: String): TailProducer[P, T] = new TPNamedProducer[P, T](this, id) } +class AlsoTailProducer[P <: Platform[P], T, R](ensure: TailProducer[P, T], result: TailProducer[P, R]) extends AlsoProducer[P, T, R](ensure, result) with TailProducer[P, R] /** * This is a special node that ensures that the first argument is planned, but produces values diff --git a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala index bfddbfafc..3ad3e7c64 100644 --- a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala +++ b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphs.scala @@ -182,7 +182,8 @@ object TestGraphs { (fnR: T1 => TraversableOnce[T2], fnA: T2 => TraversableOnce[(K1, V1)], fnB: T2 => TraversableOnce[(K2, V2)]): TailProducer[P, (K2, (Option[V2], V2))] = { val combined = source.flatMap(fnR) - combined.flatMap(fnA).sumByKey(store1).also(combined).flatMap(fnB).sumByKey(store2) + val calculated = combined.flatMap(fnB).sumByKey(store2) + combined.flatMap(fnA).sumByKey(store1).also(calculated) } def mapOnlyJob[P <: Platform[P], T, U]( diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/planner/StripNamedNodes.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/planner/StripNamedNodes.scala index 0916be8e7..a69e8a9a2 100644 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/planner/StripNamedNodes.scala +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/planner/StripNamedNodes.scala @@ -51,6 +51,14 @@ object StripNamedNode { def functionize[P <: Platform[P]](node: Producer[P, Any]): ProducerF[P] = { node match { + // This case is special/different since AlsoTailProducer needs the full class maintained(unlike TailNamedProducer), + // but it is not a case class. It inherits from TailProducer so cannot be one. + case p: AlsoTailProducer[_, _ , _] => + ProducerF( + List(p.result.asInstanceOf[Producer[P, Any]], p.ensure.asInstanceOf[Producer[P, Any]]), + p, + {(newEntries): List[Producer[P, Any]] => new AlsoTailProducer[P, Any, Any](castTail(newEntries(1)), castTail(newEntries(0)))} + ) case p@AlsoProducer(_, _) => ProducerF( List(p.result, p.ensure), p,