From 4b7f2c5b1ee5bd5666fdac1ac9dc5ca7e74e2474 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 13 Dec 2018 22:02:39 +0800 Subject: [PATCH 1/3] [KYUUBI-133]fix #133 token expiration in HadoopRDD getPartitions --- .../scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 3ce6f932567..08f753f07d5 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -28,6 +28,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException import org.apache.hadoop.hive.ql.session.OperationLog +import org.apache.hadoop.mapred.JobConf import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil @@ -40,6 +41,7 @@ import yaooqinn.kyuubi.cli.FetchOrientation import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder} import yaooqinn.kyuubi.session.KyuubiSession import yaooqinn.kyuubi.ui.KyuubiServerMonitor +import yaooqinn.kyuubi.utils.ReflectUtils class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging { @@ -313,6 +315,10 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging statementId, session.getUserName) } + val hadoopConf = session.sparkSession.sparkContext.hadoopConfiguration + val jobConf = new JobConf(hadoopConf) + jobConf.setCredentials(session.ugi.getCredentials) + ReflectUtils.setFieldValue(session.sparkSession.sparkContext, "_hadoopConfiguration", jobConf) session.sparkSession.sparkContext.setJobGroup(statementId, statement) result = session.sparkSession.sql(statement) KyuubiServerMonitor.getListener(session.getUserName).foreach { From 40ac044500edaf2ed8ccee22bf8c7d70286f8187 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 20 Dec 2018 15:56:00 +0800 Subject: [PATCH 2/3] turn off union rdd parallel lising --- .../src/main/scala/org/apache/spark/KyuubiConf.scala | 9 ++++++--- .../main/scala/org/apache/spark/KyuubiSparkUtil.scala | 9 +++++++++ .../yaooqinn/kyuubi/operation/KyuubiOperation.scala | 6 ------ 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala index 0b70c706dec..71b1b8ff3e9 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -26,6 +26,8 @@ import scala.language.implicitConversions import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry} +import yaooqinn.kyuubi.service.ServiceException + /** * Kyuubi server level configuration which will be set when at the very beginning of server start. * @@ -44,6 +46,9 @@ object KyuubiConf { def apply(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register) } + val KYUUBI_HOME: String = + sys.env.getOrElse("KYUUBI_HOME", throw new ServiceException("KYUUBI_HOME is not set!")) + ///////////////////////////////////////////////////////////////////////////////////////////////// // High Availability by ZooKeeper // ///////////////////////////////////////////////////////////////////////////////////////////////// @@ -288,9 +293,7 @@ object KyuubiConf { KyuubiConfigBuilder("spark.kyuubi.backend.session.local.dir") .doc("Default value to set spark.local.dir") .stringConf - .createWithDefault( - s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}" - + File.separator + "local") + .createWithDefault(KYUUBI_HOME + File.separator + "local") ///////////////////////////////////////////////////////////////////////////////////////////////// // Authentication // diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala index d110b4ff4bd..4ae67dbbad3 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala @@ -105,6 +105,7 @@ object KyuubiSparkUtil extends Logging { val HDFS_CLIENT_CACHE_DEFAULT = "true" val FILE_CLIENT_CACHE: String = SPARK_HADOOP_PREFIX + "fs.file.impl.disable.cache" val FILE_CLIENT_CACHE_DEFAULT = "true" + val RDD_PAR_LISTING: String = SPARK_PREFIX + "rdd.parallelListingThreshold" // Runtime Spark Version val SPARK_VERSION: String = org.apache.spark.SPARK_VERSION @@ -282,6 +283,14 @@ object KyuubiSparkUtil extends Logging { if (UserGroupInformation.isSecurityEnabled) { conf.setIfMissing(HDFS_CLIENT_CACHE, HDFS_CLIENT_CACHE_DEFAULT) conf.setIfMissing(FILE_CLIENT_CACHE, FILE_CLIENT_CACHE_DEFAULT) + // If you are using Kyuubi against kerberized HDFS, you will run into HDFS_DELEGATION_TOKEN + // expiration in some particular sql queries. This exception is usually caught in + // HadoopRDD.getPartitions, where the JobConf has no Credentials because it is generated by + // Configuration, and the UGI.getCurrentUser contains only the oldest tokens which are + // destined to expire. The reason seems to be parallel listing UnionRDD's sub RDDs using a + // ForkJoinPool which points to another calling context. Turn off parallel listing seems + // to be a solution to this issue. + conf.setIfMissing(RDD_PAR_LISTING, Int.MaxValue.toString) } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 08f753f07d5..3ce6f932567 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -28,7 +28,6 @@ import scala.util.control.NonFatal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException import org.apache.hadoop.hive.ql.session.OperationLog -import org.apache.hadoop.mapred.JobConf import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil @@ -41,7 +40,6 @@ import yaooqinn.kyuubi.cli.FetchOrientation import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder} import yaooqinn.kyuubi.session.KyuubiSession import yaooqinn.kyuubi.ui.KyuubiServerMonitor -import yaooqinn.kyuubi.utils.ReflectUtils class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging { @@ -315,10 +313,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging statementId, session.getUserName) } - val hadoopConf = session.sparkSession.sparkContext.hadoopConfiguration - val jobConf = new JobConf(hadoopConf) - jobConf.setCredentials(session.ugi.getCredentials) - ReflectUtils.setFieldValue(session.sparkSession.sparkContext, "_hadoopConfiguration", jobConf) session.sparkSession.sparkContext.setJobGroup(statementId, statement) result = session.sparkSession.sql(statement) KyuubiServerMonitor.getListener(session.getUserName).foreach { From 203f61589ebfc6d3caa73a612c6370de98a26fb5 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 20 Dec 2018 16:03:53 +0800 Subject: [PATCH 3/3] rm unrelated change --- .../src/main/scala/org/apache/spark/KyuubiConf.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala index 71b1b8ff3e9..0b70c706dec 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -26,8 +26,6 @@ import scala.language.implicitConversions import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry} -import yaooqinn.kyuubi.service.ServiceException - /** * Kyuubi server level configuration which will be set when at the very beginning of server start. * @@ -46,9 +44,6 @@ object KyuubiConf { def apply(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register) } - val KYUUBI_HOME: String = - sys.env.getOrElse("KYUUBI_HOME", throw new ServiceException("KYUUBI_HOME is not set!")) - ///////////////////////////////////////////////////////////////////////////////////////////////// // High Availability by ZooKeeper // ///////////////////////////////////////////////////////////////////////////////////////////////// @@ -293,7 +288,9 @@ object KyuubiConf { KyuubiConfigBuilder("spark.kyuubi.backend.session.local.dir") .doc("Default value to set spark.local.dir") .stringConf - .createWithDefault(KYUUBI_HOME + File.separator + "local") + .createWithDefault( + s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}" + + File.separator + "local") ///////////////////////////////////////////////////////////////////////////////////////////////// // Authentication //