diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala index 8241672d5107b..48492bac62344 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala @@ -29,7 +29,7 @@ import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.util.control.NonFatal import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{DURATION, NEW_VALUE, OLD_VALUE, QUERY_CACHE_VALUE, QUERY_ID, SESSION_ID} +import org.apache.spark.internal.LogKeys.{DURATION, NEW_VALUE, OLD_VALUE, QUERY_CACHE_VALUE, QUERY_ID, QUERY_RUN_ID, SESSION_ID} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} @@ -158,7 +158,8 @@ private[connect] class SparkConnectStreamingQueryCache( if (v.userId.equals(sessionHolder.userId) && v.sessionId.equals(sessionHolder.sessionId)) { if (v.query.isActive && Option(v.session.streams.get(k.queryId)).nonEmpty) { logInfo( - log"Stopping the query with id ${MDC(QUERY_ID, k.queryId)} " + + log"Stopping the query with id: ${MDC(QUERY_ID, k.queryId)} " + + log"runId: ${MDC(QUERY_RUN_ID, k.runId)} " + log"since the session has timed out") try { if (blocking) { @@ -170,7 +171,8 @@ private[connect] class SparkConnectStreamingQueryCache( } catch { case NonFatal(ex) => logWarning( - log"Failed to stop the query ${MDC(QUERY_ID, k.queryId)}. " + + log"Failed to stop the with id: ${MDC(QUERY_ID, k.queryId)} " + + log"runId: ${MDC(QUERY_RUN_ID, k.runId)} " + log"Error is ignored.", ex) } @@ -238,17 +240,20 @@ private[connect] class SparkConnectStreamingQueryCache( for ((k, v) <- queryCache) { val id = k.queryId + val runId = k.runId v.expiresAtMs match { case Some(ts) if nowMs >= ts => // Expired. Drop references. logInfo( - log"Removing references for ${MDC(QUERY_ID, id)} in " + + log"Removing references for id: ${MDC(QUERY_ID, id)} " + + log"runId: ${MDC(QUERY_RUN_ID, runId)} in " + log"session ${MDC(SESSION_ID, v.sessionId)} after expiry period") queryCache.remove(k) case Some(_) => // Inactive query waiting for expiration. Do nothing. logInfo( - log"Waiting for the expiration for ${MDC(QUERY_ID, id)} in " + + log"Waiting for the expiration for id: ${MDC(QUERY_ID, id)} " + + log"runId: ${MDC(QUERY_RUN_ID, runId)} in " + log"session ${MDC(SESSION_ID, v.sessionId)}") case None => // Active query, check if it is stopped. Enable timeout if it is stopped. @@ -256,7 +261,8 @@ private[connect] class SparkConnectStreamingQueryCache( if (!isActive) { logInfo( - log"Marking query ${MDC(QUERY_ID, id)} in " + + log"Marking query id: ${MDC(QUERY_ID, id)} " + + log"runId: ${MDC(QUERY_RUN_ID, runId)} in " + log"session ${MDC(SESSION_ID, v.sessionId)} inactive.") val expiresAtMs = nowMs + stoppedQueryInactivityTimeout.toMillis queryCache.put(k, v.copy(expiresAtMs = Some(expiresAtMs)))