Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
openHook to do all metric registration in one function
Browse files Browse the repository at this point in the history
  • Loading branch information
Katya Gonina committed Oct 17, 2014
1 parent dda66a7 commit b2a7f82
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import org.slf4j.LoggerFactory
import scala.collection.{ Map => CMap }

import Constants._
import com.twitter.summingbird.storm.StormMetric
import backtype.storm.metric.api.IMetric

/*
* Batchers are used for partial aggregation. We never aggregate past two items which are not in the same batch.
Expand Down Expand Up @@ -176,14 +178,19 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird

val metrics = getOrElse(stormDag, node, DEFAULT_SPOUT_STORM_METRICS)

val registerMetricProvider = new Function1[TopologyContext, Unit] {
val registerAllMetrics = new Function1[TopologyContext, Unit] {
def apply(context: TopologyContext) = {
// Register metrics passed in SpoutStormMetrics option.
metrics.metrics().map {
x: StormMetric[IMetric] =>
context.registerMetric(x.name, x.metric, x.interval.inSeconds)
}
// Register summingbird counter metrics.
StormStatProvider.registerMetrics(jobID, context, countersForSpout)
SummingbirdRuntimeStats.addPlatformStatProvider(StormStatProvider)
}
}

val stormSpout = tormentaSpout.registerMetricHandlers(metrics.toSpoutMetrics, registerMetricProvider).getSpout
val stormSpout = tormentaSpout.openHook(registerAllMetrics).getSpout
val parallelism = getOrElse(stormDag, node, parOpt.getOrElse(DEFAULT_SOURCE_PARALLELISM)).parHint
topologyBuilder.setSpout(nodeName, stormSpout, parallelism)
}
Expand Down

0 comments on commit b2a7f82

Please # to comment.