Skip to content
This repository was archived by the owner on Jan 20, 2022. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ sealed trait StormService[-K, +V]
case class StoreWrapper[K, V](store: StoreFactory[K, V]) extends StormService[K, V]

sealed trait StormSource[+T]
case class SpoutSource[+T](spout: Spout[(Long, T)]) extends StormSource[T]
case class SpoutSource[+T](spout: Spout[(Long, T)], parallelism: Option[option.SpoutParallelism]) extends StormSource[T]

object Storm {
def local(options: Map[String, Options] = Map.empty): LocalStorm =
Expand All @@ -79,11 +79,15 @@ object Storm {
def store[K, V](store: => MergeableStore[(K, BatchID), V])(implicit batcher: Batcher): MergeableStoreSupplier[K, V] =
MergeableStoreSupplier.from(store)

implicit def toStormSource[T](spout: Spout[T])(implicit timeOf: TimeExtractor[T]) =
SpoutSource(spout.map(t => (timeOf(t), t)))
def toStormSource[T](spout: Spout[T], defaultSourcePar: Option[Int] = None)(implicit timeOf: TimeExtractor[T]) =
SpoutSource(spout.map(t => (timeOf(t), t)), defaultSourcePar.map(option.SpoutParallelism(_)))

implicit def source[T](spout: Spout[T])(implicit timeOf: TimeExtractor[T]) =
Producer.source[Storm, T](toStormSource(spout))
implicit def spoutAsStormSource[T](spout: Spout[T])(implicit timeOf: TimeExtractor[T]): StormSource[T] = toStormSource(spout, None)(timeOf)

def source[T](spout: Spout[T], defaultSourcePar: Option[Int] = None)(implicit timeOf: TimeExtractor[T]) =
Producer.source[Storm, T](toStormSource(spout, defaultSourcePar))

implicit def spoutAsSource[T](spout: Spout[T])(implicit timeOf: TimeExtractor[T]): Producer[Storm, T] = source(spout, None)(timeOf)
}

abstract class Storm(options: Map[String, Options], updateConf: Config => Config) extends Platform[Storm] {
Expand Down Expand Up @@ -159,7 +163,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config
}

private def scheduleSpout[K](stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {
val spout = node.members.collect { case Source(SpoutSource(s)) => s }.head
val (spout, parOpt) = node.members.collect { case Source(SpoutSource(s, parOpt)) => (s, parOpt) }.head
val nodeName = stormDag.getNodeName(node)

val stormSpout = node.members.reverse.foldLeft(spout.asInstanceOf[Spout[(Long, Any)]]) { (spout, p) =>
Expand All @@ -173,7 +177,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config
}
}.getSpout

val parallelism = getOrElse(stormDag, node, DEFAULT_SPOUT_PARALLELISM).parHint
val parallelism = getOrElse(stormDag, node, parOpt.getOrElse(DEFAULT_SPOUT_PARALLELISM)).parHint
topologyBuilder.setSpout(nodeName, stormSpout, parallelism)
}

Expand Down