Skip to content

Commit

Permalink
Address Patrick's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 10, 2014
1 parent 69d1b41 commit b158d98
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils

/**
* A web server that renders SparkUIs of finished applications.
* A web server that renders SparkUIs of completed applications.
*
* For the standalone mode, MasterWebUI already achieves this functionality. Thus, the
* main use case of the HistoryServer is in other deploy modes (e.g. Yarn or Mesos).
Expand Down Expand Up @@ -61,8 +61,8 @@ class HistoryServer(
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTime = -1L

// Number of complete applications found in this directory
private var numApplicationsTotal = 0
// Number of completed applications found in this directory
private var numCompletedApplications = 0

@volatile private var stopped = false

Expand Down Expand Up @@ -125,11 +125,11 @@ class HistoryServer(
* Check for any updates to event logs in the base directory. This is only effective once
* the server has been bound.
*
* If a new finished application is found, the server renders the associated SparkUI
* If a new completed application is found, the server renders the associated SparkUI
* from the application's event logs, attaches this UI to itself, and stores metadata
* information for this application.
*
* If the logs for an existing finished application are no longer found, the server
* If the logs for an existing completed application are no longer found, the server
* removes all associated information and detaches the SparkUI.
*/
def checkForLogs() = synchronized {
Expand Down Expand Up @@ -164,8 +164,8 @@ class HistoryServer(
}
}

// Track the total number of complete applications observed this round
numApplicationsTotal = logInfos.size
// Track the total number of completed applications observed this round
numCompletedApplications = logInfos.size

} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
Expand All @@ -176,10 +176,10 @@ class HistoryServer(
}

/**
* Render a new SparkUI from the event logs if the associated application is finished.
* Render a new SparkUI from the event logs if the associated application is completed.
*
* HistoryServer looks for a special file that indicates application completion in the given
* directory. If this file exists, the associated application is regarded to be complete, in
* directory. If this file exists, the associated application is regarded to be completed, in
* which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
*/
private def renderSparkUI(logDir: FileStatus, logInfo: EventLoggingInfo) {
Expand All @@ -200,7 +200,7 @@ class HistoryServer(
val startTime = appListener.startTime
val endTime = appListener.endTime
val lastUpdated = getModificationTime(logDir)
ui.setAppName(appName + " (finished)")
ui.setAppName(appName + " (completed)")
appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime,
lastUpdated, sparkUser, path, ui)
}
Expand All @@ -216,14 +216,14 @@ class HistoryServer(
/** Return the address of this server. */
def getAddress: String = "http://" + publicHost + ":" + boundPort

/** Return the total number of application logs found, whether or not the UI is retained. */
def getNumApplications: Int = numApplicationsTotal
/** Return the number of completed applications found, whether or not the UI is rendered. */
def getNumApplications: Int = numCompletedApplications

/** Return when this directory was last modified. */
private def getModificationTime(dir: FileStatus): Long = {
try {
val logFiles = fileSystem.listStatus(dir.getPath)
if (logFiles != null) {
if (logFiles != null && !logFiles.isEmpty) {
logFiles.map(_.getModificationTime).max
} else {
dir.getModificationTime
Expand Down Expand Up @@ -283,5 +283,5 @@ private[spark] case class ApplicationHistoryInfo(
logDirPath: Path,
ui: SparkUI) {
def started = startTime != -1
def finished = endTime != -1
def completed = endTime != -1
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ private[spark] class HistoryServerArguments(args: Array[String]) {
}
val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
val path = new Path(logDir)
if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) {
System.err.println("Logging directory specified is invalid: %s".format(logDir))
if (!fileSystem.exists(path)) {
System.err.println("Logging directory specified does not exist: %s".format(logDir))
printUsageAndExit(1)
}
if (!fileSystem.getFileStatus(path).isDir) {
System.err.println("Logging directory specified is not a directory: %s".format(logDir))
printUsageAndExit(1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ private[spark] class IndexPage(parent: HistoryServer) {
val appName = if (info.started) info.name else info.logDirPath.getName
val uiAddress = parent.getAddress + info.ui.basePath
val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started"
val endTime = if (info.finished) WebUI.formatDate(info.endTime) else "Not finished"
val difference = if (info.started && info.finished) info.endTime - info.startTime else -1L
val endTime = if (info.completed) WebUI.formatDate(info.endTime) else "Not finished"
val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
val logDirectory = info.logDirPath.getName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ private[spark] class Master(
if (!eventLogPaths.isEmpty) {
try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(replayBus, appName + " (finished)", "/history/" + app.id)
val ui = new SparkUI(replayBus, appName + " (completed)", "/history/" + app.id)
ui.start()
replayBus.replay()
app.desc.appUiUrl = ui.basePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ package org.apache.spark.scheduler
/**
* A simple listener for application events.
*
* This listener assumes at most one of each of SparkListenerApplicationStart and
* SparkListenerApplicationEnd will be received. Otherwise, only the latest event
* of each type will take effect.
* This listener expects to hear events from a single application only. If events
* from multiple applications are seen, the behavior is unspecified.
*/
private[spark] class ApplicationEventListener extends SparkListener {
var appName = "<Not Started>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import scala.collection.mutable

import org.apache.hadoop.fs.{FileSystem, Path}
import org.json4s.jackson.JsonMethods._

Expand Down Expand Up @@ -118,6 +120,9 @@ private[spark] object EventLoggingListener extends Logging {
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]

def isEventLogFile(fileName: String): Boolean = {
fileName.startsWith(LOG_PREFIX)
}
Expand Down Expand Up @@ -174,11 +179,11 @@ private[spark] object EventLoggingListener extends Logging {
compressionCodec = filePaths
.find { path => isCompressionCodecFile(path.getName) }
.map { path =>
val codec = EventLoggingListener.parseCompressionCodec(path.getName)
val conf = new SparkConf
conf.set("spark.io.compression.codec", codec)
CompressionCodec.createCodec(conf)
},
val codec = EventLoggingListener.parseCompressionCodec(path.getName)
val conf = new SparkConf
conf.set("spark.io.compression.codec", codec)
codecMap.getOrElseUpdate(codec, CompressionCodec.createCodec(conf))
},
applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) }
)
} catch {
Expand Down

0 comments on commit b158d98

Please # to comment.