diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala index 8ae16fb8d..1fa081bdf 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala @@ -19,6 +19,7 @@ package com.twitter.summingbird.scalding import com.twitter.bijection.Conversion.asMethod import com.twitter.chill.ScalaKryoInstantiator import com.twitter.chill.java.IterableRegistrar +import com.twitter.chill.{Kryo, IKryoRegistrar, toRich } import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst, JavaMapConfig } import com.twitter.scalding.{ Tool => STool, _ } import com.twitter.summingbird.scalding.store.HDFSMetadata @@ -26,6 +27,7 @@ import com.twitter.summingbird.{ Env, Unzip2, Summer, Producer, TailProducer, Ab import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp } import com.twitter.summingbird.builder.{ SourceBuilder, Reducers, CompletedBuilder } import com.twitter.summingbird.storm.Storm + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.util.ToolRunner import org.apache.hadoop.util.GenericOptionsParser @@ -105,6 +107,14 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String]) new ScalaKryoInstantiator() .withRegistrar(builder.registrar) .withRegistrar(new IterableRegistrar(ajob.registrars)) + .withRegistrar(new IKryoRegistrar { + def apply(k: Kryo) { + List(classOf[BatchID], classOf[Timestamp]) + .foreach { cls => + if(!k.alreadyRegistered(cls)) k.register(cls) + } + } + }) ) fromMap(ajob.transformConfig(jConf.as[Map[String, AnyRef]])) } diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala index 63ea679b4..6492c75b6 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala @@ -18,10 +18,11 @@ package com.twitter.summingbird.storm import backtype.storm.Config import com.twitter.scalding.Args -import com.twitter.chill.ScalaKryoInstantiator +import com.twitter.chill.{ScalaKryoInstantiator, Kryo, toRich, IKryoRegistrar} import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst, JavaMapConfig } import com.twitter.chill.java.IterableRegistrar import com.twitter.summingbird.{ Env, Unzip2, Producer, TailProducer } +import com.twitter.summingbird.batch.{ BatchID, Timestamp } import com.twitter.summingbird.scalding.Scalding import scala.collection.JavaConverters._ @@ -57,6 +58,14 @@ case class StormEnv(override val jobName: String, override val args: Args) new ScalaKryoInstantiator() .withRegistrar(builder.registrar) .withRegistrar(new IterableRegistrar(ajob.registrars)) + .withRegistrar(new IKryoRegistrar { + def apply(k: Kryo) { + List(classOf[BatchID], classOf[Timestamp]) + .foreach { cls => + if(!k.alreadyRegistered(cls)) k.register(cls) + } + } + }) ) transformed }.run(