[SPARK-18992][SQL] Move spark.sql.hive.thriftServer.singleSession to SQLConf

### What changes were proposed in this pull request?

Since `spark.sql.hive.thriftServer.singleSession` is a configuration of SQL component, this conf can be moved from `SparkConf` to `StaticSQLConf`.

When we introduced `spark.sql.hive.thriftServer.singleSession`, all the SQL configuration are session specific. They can be modified in different sessions.

In Spark 2.1, static SQL configuration is added. It is a perfect fit for `spark.sql.hive.thriftServer.singleSession`. Previously, we did the same move for `spark.sql.warehouse.dir` from `SparkConf` to `StaticSQLConf`

### How was this patch tested?
Added test cases in HiveThriftServer2Suites.scala

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16392 from gatorsmile/hiveThriftServerSingleSession.
This commit is contained in:
gatorsmile 2016-12-28 10:16:22 +08:00 committed by Wenchen Fan
parent 28ab0ec49f
commit 5ac62043cf
5 changed files with 87 additions and 51 deletions

View file

@ -831,6 +831,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString
def hiveThriftServerSingleSession: Boolean =
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION)
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
@ -1008,4 +1011,11 @@ object StaticSQLConf {
.doc("Only used for internal debugging. Not all functions are supported when it is enabled.")
.booleanConf
.createWithDefault(false)
val HIVE_THRIFT_SERVER_SINGLESESSION = buildConf("spark.sql.hive.thriftServer.singleSession")
.doc("When set to true, Hive Thrift server is running in a single session mode. " +
"All the JDBC/ODBC connections share the temporary views, function registries, " +
"SQL configuration and the current database.")
.booleanConf
.createWithDefault(false)
}

View file

@ -28,7 +28,7 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.hive.service.server.HiveServer2
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
@ -72,8 +72,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
val ctx = if (sessionState.hiveThriftServerSingleSession) {
val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
sqlContext
} else {
sqlContext.newSession()

View file

@ -98,9 +98,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val user = System.getProperty("user.name")
val sessionHandle = client.openSession(user, "")
withJdbcStatement { statement =>
withJdbcStatement("test_16563") { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_16563",
"CREATE TABLE test_16563(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_16563")
@ -134,16 +133,14 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
rows_first.numRows()
}
statement.executeQuery("DROP TABLE IF EXISTS test_16563")
}
}
}
test("JDBC query execution") {
withJdbcStatement { statement =>
withJdbcStatement("test") { statement =>
val queries = Seq(
"SET spark.sql.shuffle.partitions=3",
"DROP TABLE IF EXISTS test",
"CREATE TABLE test(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
"CACHE TABLE test")
@ -159,7 +156,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("Checks Hive version") {
withJdbcStatement { statement =>
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
assert(resultSet.getString(1) === "spark.sql.hive.version")
@ -168,9 +165,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("SPARK-3004 regression: result set containing NULL") {
withJdbcStatement { statement =>
withJdbcStatement("test_null") { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_null",
"CREATE TABLE test_null(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null")
@ -189,9 +185,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("SPARK-4292 regression: result set iterator issue") {
withJdbcStatement { statement =>
withJdbcStatement("test_4292") { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_4292",
"CREATE TABLE test_4292(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292")
@ -203,15 +198,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
resultSet.next()
assert(resultSet.getInt(1) === key)
}
statement.executeQuery("DROP TABLE IF EXISTS test_4292")
}
}
test("SPARK-4309 regression: Date type support") {
withJdbcStatement { statement =>
withJdbcStatement("test_date") { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_date",
"CREATE TABLE test_date(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date")
@ -227,9 +219,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("SPARK-4407 regression: Complex type support") {
withJdbcStatement { statement =>
withJdbcStatement("test_map") { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
@ -251,9 +242,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("SPARK-12143 regression: Binary type support") {
withJdbcStatement { statement =>
withJdbcStatement("test_binary") { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_binary",
"CREATE TABLE test_binary(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_binary")
@ -262,7 +252,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val expected: Array[Byte] = "val_238".getBytes
assertResult(expected) {
val resultSet = statement.executeQuery(
"SELECT CAST(value as BINARY) FROM test_date LIMIT 1")
"SELECT CAST(value as BINARY) FROM test_binary LIMIT 1")
resultSet.next()
resultSet.getObject(1)
}
@ -275,12 +265,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
var defaultV2: String = null
var data: ArrayBuffer[Int] = null
withMultipleConnectionJdbcStatement(
withMultipleConnectionJdbcStatement("test_map")(
// create table
{ statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map",
"CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC",
@ -418,9 +407,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
// This test often hangs and then times out, leaving the hanging processes.
// Let's ignore it and improve the test.
ignore("test jdbc cancel") {
withJdbcStatement { statement =>
withJdbcStatement("test_map") { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
@ -478,7 +466,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("test add jar") {
withMultipleConnectionJdbcStatement(
withMultipleConnectionJdbcStatement("smallKV", "addJar")(
{
statement =>
val jarFile =
@ -492,10 +480,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
{
statement =>
val queries = Seq(
"DROP TABLE IF EXISTS smallKV",
"CREATE TABLE smallKV(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE smallKV",
"DROP TABLE IF EXISTS addJar",
"""CREATE TABLE addJar(key string)
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
""".stripMargin)
@ -524,15 +510,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
expectedResult.close()
assert(expectedResultBuffer === actualResultBuffer)
statement.executeQuery("DROP TABLE IF EXISTS addJar")
statement.executeQuery("DROP TABLE IF EXISTS smallKV")
}
)
}
test("Checks Hive version via SET -v") {
withJdbcStatement { statement =>
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SET -v")
val conf = mutable.Map.empty[String, String]
@ -545,7 +528,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("Checks Hive version via SET") {
withJdbcStatement { statement =>
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SET")
val conf = mutable.Map.empty[String, String]
@ -558,7 +541,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("SPARK-11595 ADD JAR with input path having URL scheme") {
withJdbcStatement { statement =>
withJdbcStatement("test_udtf") { statement =>
try {
val jarPath = "../hive/src/test/resources/TestUDTF.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
@ -586,7 +569,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val dataPath = "../hive/src/test/resources/data/files/kv1.txt"
Seq(
s"CREATE TABLE test_udtf(key INT, value STRING)",
"CREATE TABLE test_udtf(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf"
).foreach(statement.execute)
@ -624,8 +607,8 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
override protected def extraConf: Seq[String] =
"--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil
test("test single session") {
withMultipleConnectionJdbcStatement(
test("share the temporary functions across JDBC connections") {
withMultipleConnectionJdbcStatement()(
{ statement =>
val jarPath = "../hive/src/test/resources/TestUDTF.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
@ -667,16 +650,63 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
}
)
}
test("unable to changing spark.sql.hive.thriftServer.singleSession using JDBC connections") {
withJdbcStatement() { statement =>
// JDBC connections are not able to set the conf spark.sql.hive.thriftServer.singleSession
val e = intercept[SQLException] {
statement.executeQuery("SET spark.sql.hive.thriftServer.singleSession=false")
}.getMessage
assert(e.contains(
"Cannot modify the value of a static config: spark.sql.hive.thriftServer.singleSession"))
}
}
test("share the current database and temporary tables across JDBC connections") {
withMultipleConnectionJdbcStatement()(
{ statement =>
statement.execute("CREATE DATABASE IF NOT EXISTS db1")
},
{ statement =>
val rs1 = statement.executeQuery("SELECT current_database()")
assert(rs1.next())
assert(rs1.getString(1) === "default")
statement.execute("USE db1")
val rs2 = statement.executeQuery("SELECT current_database()")
assert(rs2.next())
assert(rs2.getString(1) === "db1")
statement.execute("CREATE TEMP VIEW tempView AS SELECT 123")
},
{ statement =>
// the current database is set to db1 by another JDBC connection.
val rs1 = statement.executeQuery("SELECT current_database()")
assert(rs1.next())
assert(rs1.getString(1) === "db1")
val rs2 = statement.executeQuery("SELECT * from tempView")
assert(rs2.next())
assert(rs2.getString(1) === "123")
statement.execute("USE default")
statement.execute("DROP VIEW tempView")
statement.execute("DROP DATABASE db1 CASCADE")
}
)
}
}
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
override def mode: ServerMode.Value = ServerMode.http
test("JDBC query execution") {
withJdbcStatement { statement =>
withJdbcStatement("test") { statement =>
val queries = Seq(
"SET spark.sql.shuffle.partitions=3",
"DROP TABLE IF EXISTS test",
"CREATE TABLE test(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
"CACHE TABLE test")
@ -692,7 +722,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
}
test("Checks Hive version") {
withJdbcStatement { statement =>
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
assert(resultSet.getString(1) === "spark.sql.hive.version")
@ -718,7 +748,7 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
s"jdbc:hive2://localhost:$serverPort/"
}
def withMultipleConnectionJdbcStatement(fs: (Statement => Unit)*) {
def withMultipleConnectionJdbcStatement(tableNames: String*)(fs: (Statement => Unit)*) {
val user = System.getProperty("user.name")
val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
val statements = connections.map(_.createStatement())
@ -726,13 +756,16 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
try {
statements.zip(fs).foreach { case (s, f) => f(s) }
} finally {
tableNames.foreach { name =>
statements(0).execute(s"DROP TABLE IF EXISTS $name")
}
statements.foreach(_.close())
connections.foreach(_.close())
}
}
def withJdbcStatement(f: Statement => Unit) {
withMultipleConnectionJdbcStatement(f)
def withJdbcStatement(tableNames: String*)(f: Statement => Unit) {
withMultipleConnectionJdbcStatement(tableNames: _*)(f)
}
}

View file

@ -74,7 +74,7 @@ class UISeleniumSuite
}
ignore("thrift server ui test") {
withJdbcStatement { statement =>
withJdbcStatement("test_map") { statement =>
val baseURL = s"http://localhost:$uiPort"
val queries = Seq(

View file

@ -141,10 +141,4 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
}
// TODO: why do we get this from SparkConf but not SQLConf?
def hiveThriftServerSingleSession: Boolean = {
sparkSession.sparkContext.conf.getBoolean(
"spark.sql.hive.thriftServer.singleSession", defaultValue = false)
}
}