diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index ad798822e..ce254ae89 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -45,6 +45,8 @@ import org.slf4j.LoggerFactory import scala.collection.{ Map => CMap } import Constants._ +import com.twitter.summingbird.storm.StormMetric +import backtype.storm.metric.api.IMetric /* * Batchers are used for partial aggregation. We never aggregate past two items which are not in the same batch. @@ -176,14 +178,19 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val metrics = getOrElse(stormDag, node, DEFAULT_SPOUT_STORM_METRICS) - val registerMetricProvider = new Function1[TopologyContext, Unit] { + val registerAllMetrics = new Function1[TopologyContext, Unit] { def apply(context: TopologyContext) = { + // Register metrics passed in SpoutStormMetrics option. + metrics.metrics().map { + x: StormMetric[IMetric] => + context.registerMetric(x.name, x.metric, x.interval.inSeconds) + } + // Register summingbird counter metrics. StormStatProvider.registerMetrics(jobID, context, countersForSpout) SummingbirdRuntimeStats.addPlatformStatProvider(StormStatProvider) } } - - val stormSpout = tormentaSpout.registerMetricHandlers(metrics.toSpoutMetrics, registerMetricProvider).getSpout + val stormSpout = tormentaSpout.openHook(registerAllMetrics).getSpout val parallelism = getOrElse(stormDag, node, parOpt.getOrElse(DEFAULT_SOURCE_PARALLELISM)).parHint topologyBuilder.setSpout(nodeName, stormSpout, parallelism) }