diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala index 3bfd37c22..37b06711a 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala @@ -115,6 +115,16 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String]) val scald = Scalding(name, opts) .withRegistrars(ajob.registrars ++ builder.registrar.getRegistrars.asScala) + .withConfigUpdater { + // Set these before the user settings, so that the user + // can change them if needed + + // Make sure we use block compression from mappers to reducers + _.put("mapred.output.compression.type", "BLOCK") + .put("io.compression.codec.lzo.compression.level", "3") + .put("mapred.output.compress", "true") + .put("mapred.compress.map.output", "true") + } .withConfigUpdater{ c => c.updated(ajob.transformConfig(c.toMap)) } diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala index d8a738099..878c80689 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala @@ -534,6 +534,7 @@ class Scalding( Scalding.logger.debug("Removes: {}", scaldingConfig.removes) Scalding.logger.debug("Updates: {}", scaldingConfig.updates) + // now let the user do her thing: val transformedConfig = transformConfig(scaldingConfig) Scalding.logger.debug("User+Serialization config changes:") @@ -544,6 +545,7 @@ class Scalding( transformedConfig.updates.foreach(kv => conf.set(kv._1, kv._2.toString)) // Store the options used: conf.set("summingbird.options", options.toString) + conf.set("summingbird.jobname", jobName) def ifUnset(k: String, v: String) { if(null == conf.get(k)) { conf.set(k, v) } } // Set the mapside cache size, this is important to not be too small