From 049e639fc22100af3cbe2ffcf3a2fd3ae2461373 Mon Sep 17 00:00:00 2001 From: Egor Pakhomov Date: Wed, 15 Jun 2016 14:25:55 -0700 Subject: [PATCH] [SPARK-15934] [SQL] Return binary mode in ThriftServer Returning binary mode to ThriftServer for backward compatibility. Tested with Squirrel and Tableau. Author: Egor Pakhomov Closes #13667 from epahomov/SPARK-15095-2.0. --- .../hive/thriftserver/HiveThriftServer2.scala | 18 ++++---- .../HiveThriftServer2Suites.scala | 41 ++++++++++++++++--- .../hive/thriftserver/UISeleniumSuite.scala | 2 +- 3 files changed, 47 insertions(+), 14 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index de70fdc14e..e3258d858f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.commons.logging.LogFactory import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hive.service.cli.thrift.ThriftHttpCLIService +import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} import org.apache.hive.service.server.HiveServer2 import org.apache.spark.SparkContext @@ -34,7 +34,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.{HiveSharedState, HiveUtils} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab import org.apache.spark.sql.internal.SQLConf @@ -271,7 +271,7 @@ object HiveThriftServer2 extends Logging { private[hive] class HiveThriftServer2(sqlContext: SQLContext) extends HiveServer2 - with ReflectedCompositeService with Logging { + with ReflectedCompositeService { // state is tracked internally so that the server only attempts to shut down if it successfully // started, and then once only. private val started = new AtomicBoolean(false) @@ -281,18 +281,20 @@ private[hive] class HiveThriftServer2(sqlContext: SQLContext) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) - if (isBinaryTransportMode(hiveConf)) { - logWarning("Binary mode is not supported, use HTTP mode instead") + val thriftCliService = if (isHTTPTransportMode(hiveConf)) { + new ThriftHttpCLIService(sparkSqlCliService) + } else { + new ThriftBinaryCLIService(sparkSqlCliService) } - val thriftCliService = new ThriftHttpCLIService(sparkSqlCliService) + setSuperField(this, "thriftCLIService", thriftCliService) addService(thriftCliService) initCompositeService(hiveConf) } - private def isBinaryTransportMode(hiveConf: HiveConf): Boolean = { + private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = { val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE) - transportMode.toLowerCase(Locale.ENGLISH).equals("binary") + transportMode.toLowerCase(Locale.ENGLISH).equals("http") } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index b3f4944c91..e388c2a082 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -55,8 +55,8 @@ object TestData { val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") } -class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { - override def mode: ServerMode.Value = ServerMode.http +class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { + override def mode: ServerMode.Value = ServerMode.binary private def withCLIServiceClient(f: ThriftCLIServiceClient => Unit): Unit = { // Transport creation logic below mimics HiveConnection.createBinaryTransport @@ -70,8 +70,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { try f(client) finally transport.close() } - // TODO: update this test to work in HTTP mode - ignore("GetInfo Thrift API") { + test("GetInfo Thrift API") { withCLIServiceClient { client => val user = System.getProperty("user.name") val sessionHandle = client.openSession(user, "") @@ -567,7 +566,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { } class SingleSessionSuite extends HiveThriftJdbcTest { - override def mode: ServerMode.Value = ServerMode.http + override def mode: ServerMode.Value = ServerMode.binary override protected def extraConf: Seq[String] = "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil @@ -617,6 +616,38 @@ class SingleSessionSuite extends HiveThriftJdbcTest { } } +class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { + override def mode: ServerMode.Value = ServerMode.http + + test("JDBC query execution") { + withJdbcStatement { statement => + val queries = Seq( + "SET spark.sql.shuffle.partitions=3", + "DROP TABLE IF EXISTS test", + "CREATE TABLE test(key INT, val STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", + "CACHE TABLE test") + + queries.foreach(statement.execute) + + assertResult(5, "Row count mismatch") { + val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test") + resultSet.next() + resultSet.getInt(1) + } + } + } + + test("Checks Hive version") { + withJdbcStatement { statement => + val resultSet = statement.executeQuery("SET spark.sql.hive.version") + resultSet.next() + assert(resultSet.getString(1) === "spark.sql.hive.version") + assert(resultSet.getString(2) === HiveUtils.hiveExecutionVersion) + } + } +} + object ServerMode extends Enumeration { val binary, http = Value } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala index b6b9de1ba6..bf431cd6b0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala @@ -36,7 +36,7 @@ class UISeleniumSuite implicit var webDriver: WebDriver = _ var server: HiveThriftServer2 = _ val uiPort = 20000 + Random.nextInt(10000) - override def mode: ServerMode.Value = ServerMode.http + override def mode: ServerMode.Value = ServerMode.binary override def beforeAll(): Unit = { webDriver = new HtmlUnitDriver {