diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala index 6496ca3da..26a5e59a4 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala @@ -115,16 +115,6 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String]) val scald = Scalding(name, opts) .withRegistrars(ajob.registrars ++ builder.registrar.getRegistrars.asScala) - .withConfigUpdater { - // Set these before the user settings, so that the user - // can change them if needed - - // Make sure we use block compression from mappers to reducers - _.put("mapred.output.compression.type", "BLOCK") - .put("io.compression.codec.lzo.compression.level", "3") - .put("mapred.output.compress", "true") - .put("mapred.compress.map.output", "true") - } .withConfigUpdater{ c => c.updated(ajob.transformConfig(c.toMap)) } diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala index 9f356d90a..5cae2f9ba 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala @@ -559,7 +559,19 @@ class Scalding( } private def setIoSerializations(c: Configuration): Unit = - c.set("io.serializations", ioSerializations.map { _.getName }.mkString(",")) + c.set("io.serializations", ioSerializations.map { _.getName }.mkString(",")) + + private val HADOOP_DEFAULTS = Map( + ("mapred.output.compression.type", "BLOCK"), + ("io.compression.codec.lzo.compression.level", "3"), + ("mapred.output.compress", "true"), + ("mapred.compress.map.output", "true") + ) + + private def setHadoopConfigDefaults(c: Configuration): Unit = + HADOOP_DEFAULTS.foreach { case (k, v) => + c.set(k, v) + } // This is a side-effect-free computation that is called by run def toFlow(timeSpan: Interval[Timestamp], mode: Mode, pf: PipeFactory[_]): Try[(Interval[Timestamp], Flow[_])] = { @@ -588,6 +600,9 @@ class Scalding( mode match { case Hdfs(_, conf) => + // Set these before the user settings, so that the user + // can change them if needed + setHadoopConfigDefaults(conf) updateConfig(conf) setIoSerializations(conf) case _ =>