Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[SNAP-3270] removing streaming query listener in finalize method #195

Merged
merged 6 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import org.apache.spark.util.Utils
* .getOrCreate()
* }}}
*/
// scalastyle:off no.finalize
@InterfaceStability.Stable
class SparkSession private(
@transient val sparkContext: SparkContext,
Expand Down Expand Up @@ -718,9 +719,9 @@ class SparkSession private(
* All session instances have their own SnappyStreamingQueryListener but shares same UI tab.
*/
protected def updateUIWithStructuredStreamingTab() = {
val ssqListener = new SnappyStreamingQueryListener(sparkContext)
this.streams.addListener(ssqListener)

val listener = new SnappyStreamingQueryListener()
this.streams.addListener(listener)
sessionState.registerStreamingQueryListener(listener)
if (sparkContext.ui.isDefined) {
logInfo("Updating Web UI to add structure streaming tab.")
sparkContext.ui.foreach(ui => {
Expand All @@ -740,14 +741,20 @@ class SparkSession private(
// Streaming web service
ui.attachHandler(SnappyStreamingApiRootResource.getServletHandler(ui))
// Streaming tab
new SnappyStreamingTab(ui, ssqListener)
new SnappyStreamingTab(ui, listener)
}
})
logInfo("Updating Web UI to add structure streaming tab is Done.")
}
}

// Doing this clean up in finalize method as lifecycle of the listener is aligned with session's
// lifecycle. After this the listener object will be eligible for GC in the next cycle.
// Also the memory footprint of the listener object is not much hence it should be ok if the
// listener object is remain alive for one extra GC cycle as compared to the session.
override def finalize(): Unit = sessionState.removeStreamingQueryListener()
}
// scalastyle:on no.finalize


@InterfaceStability.Stable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.File

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
Expand All @@ -32,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.AnalyzeTableCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager}
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryListener, StreamingQueryManager}
import org.apache.spark.sql.util.ExecutionListenerManager


Expand Down Expand Up @@ -153,6 +152,19 @@ private[sql] class SessionState(sparkSession: SparkSession) {
new StreamingQueryManager(sparkSession)
}

/**
* Listener for streaming query UI
*/
private var streamingQueryListener: StreamingQueryListener = _

def registerStreamingQueryListener(streamingQueryListener: StreamingQueryListener): Unit = {
this.streamingQueryListener = streamingQueryListener
}

def removeStreamingQueryListener(): Unit = {
streamingQueryManager.removeListener(streamingQueryListener)
}

private val jarClassLoader: NonClosableMutableURLClassLoader =
sparkSession.sharedState.jarClassLoader

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

package org.apache.spark.sql.streaming

import org.apache.spark.SparkContext
class SnappyStreamingQueryListener extends StreamingQueryListener {

class SnappyStreamingQueryListener(sparkContext: SparkContext) extends StreamingQueryListener {

val streamingRepo = StreamingRepository.getInstance
private val streamingRepo = StreamingRepository.getInstance

override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
val queryName = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

after {
spark.streams.active.foreach(_.stop())
// finalize method removes the StreamingQueryListener registered for structured streaming UI.
spark.finalize()
assert(spark.streams.active.isEmpty)
assert(addedListeners().isEmpty)
// Make sure we don't leak any events to the next test
Expand Down