Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Registering summingbird counters with tormenta spouts #553

Merged
merged 2 commits into from
Feb 11, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import org.slf4j.LoggerFactory
import scala.collection.{ Map => CMap }

import Constants._
import com.twitter.summingbird.storm.StormMetric
import backtype.storm.metric.api.IMetric

/*
* Batchers are used for partial aggregation. We never aggregate past two items which are not in the same batch.
Expand Down Expand Up @@ -157,7 +159,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
}
}

private def scheduleSpout[K](stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {
private def scheduleSpout[K](jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {
val (spout, parOpt) = node.members.collect { case Source(SpoutSource(s, parOpt)) => (s, parOpt) }.head
val nodeName = stormDag.getNodeName(node)

Expand All @@ -172,9 +174,23 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
}
}

val countersForSpout: Seq[(Group, Name)] = JobCounters.getCountersForJob(jobID).getOrElse(Nil)

val metrics = getOrElse(stormDag, node, DEFAULT_SPOUT_STORM_METRICS)

val stormSpout = tormentaSpout.registerMetrics(metrics.toSpoutMetrics).getSpout
val registerAllMetrics = new Function1[TopologyContext, Unit] {
def apply(context: TopologyContext) = {
// Register metrics passed in SpoutStormMetrics option.
metrics.metrics().map {
x: StormMetric[IMetric] =>
context.registerMetric(x.name, x.metric, x.interval.inSeconds)
}
// Register summingbird counter metrics.
StormStatProvider.registerMetrics(jobID, context, countersForSpout)
SummingbirdRuntimeStats.addPlatformStatProvider(StormStatProvider)
}
}
val stormSpout = tormentaSpout.openHook(registerAllMetrics).getSpout
val parallelism = getOrElse(stormDag, node, parOpt.getOrElse(DEFAULT_SOURCE_PARALLELISM)).parHint
topologyBuilder.setSpout(nodeName, stormSpout, parallelism)
}
Expand Down Expand Up @@ -316,7 +332,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
node match {
case _: SummerNode[_] => scheduleSummerBolt(jobID, stormDag, node)
case _: FlatMapNode[_] => scheduleFlatMapper(jobID, stormDag, node)
case _: SourceNode[_] => scheduleSpout(stormDag, node)
case _: SourceNode[_] => scheduleSpout(jobID, stormDag, node)
}
}
PlannedTopology(config, topologyBuilder.createTopology)
Expand Down