Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #452 from twitter/feature/hadoop_compression_defaults
Browse files Browse the repository at this point in the history
Move the hadoop defaults into scalding platform
  • Loading branch information
johnynek committed Feb 21, 2014
2 parents b9fcd52 + 201a14d commit 4c2e543
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,6 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String])

val scald = Scalding(name, opts)
.withRegistrars(ajob.registrars ++ builder.registrar.getRegistrars.asScala)
.withConfigUpdater {
// Set these before the user settings, so that the user
// can change them if needed

// Make sure we use block compression from mappers to reducers
_.put("mapred.output.compression.type", "BLOCK")
.put("io.compression.codec.lzo.compression.level", "3")
.put("mapred.output.compress", "true")
.put("mapred.compress.map.output", "true")
}
.withConfigUpdater{ c =>
c.updated(ajob.transformConfig(c.toMap))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,19 @@ class Scalding(
}

private def setIoSerializations(c: Configuration): Unit =
c.set("io.serializations", ioSerializations.map { _.getName }.mkString(","))
c.set("io.serializations", ioSerializations.map { _.getName }.mkString(","))

private val HADOOP_DEFAULTS = Map(
("mapred.output.compression.type", "BLOCK"),
("io.compression.codec.lzo.compression.level", "3"),
("mapred.output.compress", "true"),
("mapred.compress.map.output", "true")
)

private def setHadoopConfigDefaults(c: Configuration): Unit =
HADOOP_DEFAULTS.foreach { case (k, v) =>
c.set(k, v)
}

// This is a side-effect-free computation that is called by run
def toFlow(timeSpan: Interval[Timestamp], mode: Mode, pf: PipeFactory[_]): Try[(Interval[Timestamp], Flow[_])] = {
Expand Down Expand Up @@ -588,6 +600,9 @@ class Scalding(

mode match {
case Hdfs(_, conf) =>
// Set these before the user settings, so that the user
// can change them if needed
setHadoopConfigDefaults(conf)
updateConfig(conf)
setIoSerializations(conf)
case _ =>
Expand Down

0 comments on commit 4c2e543

Please # to comment.