[SPARK-15934] [SQL] Return binary mode in ThriftServer
Returning binary mode to ThriftServer for backward compatibility. Tested with Squirrel and Tableau. Author: Egor Pakhomov <egor@anchorfree.com> Closes #13667 from epahomov/SPARK-15095-2.0.
This commit is contained in:
parent
09925735b5
commit
049e639fc2
|
@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
|
|||
import org.apache.commons.logging.LogFactory
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
|
||||
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService
|
||||
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
|
||||
import org.apache.hive.service.server.HiveServer2
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
|
@ -34,7 +34,7 @@ import org.apache.spark.annotation.DeveloperApi
|
|||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.hive.HiveUtils
|
||||
import org.apache.spark.sql.hive.{HiveSharedState, HiveUtils}
|
||||
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
|
||||
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
@ -271,7 +271,7 @@ object HiveThriftServer2 extends Logging {
|
|||
|
||||
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
|
||||
extends HiveServer2
|
||||
with ReflectedCompositeService with Logging {
|
||||
with ReflectedCompositeService {
|
||||
// state is tracked internally so that the server only attempts to shut down if it successfully
|
||||
// started, and then once only.
|
||||
private val started = new AtomicBoolean(false)
|
||||
|
@ -281,18 +281,20 @@ private[hive] class HiveThriftServer2(sqlContext: SQLContext)
|
|||
setSuperField(this, "cliService", sparkSqlCliService)
|
||||
addService(sparkSqlCliService)
|
||||
|
||||
if (isBinaryTransportMode(hiveConf)) {
|
||||
logWarning("Binary mode is not supported, use HTTP mode instead")
|
||||
val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
|
||||
new ThriftHttpCLIService(sparkSqlCliService)
|
||||
} else {
|
||||
new ThriftBinaryCLIService(sparkSqlCliService)
|
||||
}
|
||||
val thriftCliService = new ThriftHttpCLIService(sparkSqlCliService)
|
||||
|
||||
setSuperField(this, "thriftCLIService", thriftCliService)
|
||||
addService(thriftCliService)
|
||||
initCompositeService(hiveConf)
|
||||
}
|
||||
|
||||
private def isBinaryTransportMode(hiveConf: HiveConf): Boolean = {
|
||||
private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
|
||||
val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
|
||||
transportMode.toLowerCase(Locale.ENGLISH).equals("binary")
|
||||
transportMode.toLowerCase(Locale.ENGLISH).equals("http")
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -55,8 +55,8 @@ object TestData {
|
|||
val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt")
|
||||
}
|
||||
|
||||
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
|
||||
override def mode: ServerMode.Value = ServerMode.http
|
||||
class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
|
||||
override def mode: ServerMode.Value = ServerMode.binary
|
||||
|
||||
private def withCLIServiceClient(f: ThriftCLIServiceClient => Unit): Unit = {
|
||||
// Transport creation logic below mimics HiveConnection.createBinaryTransport
|
||||
|
@ -70,8 +70,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
|
|||
try f(client) finally transport.close()
|
||||
}
|
||||
|
||||
// TODO: update this test to work in HTTP mode
|
||||
ignore("GetInfo Thrift API") {
|
||||
test("GetInfo Thrift API") {
|
||||
withCLIServiceClient { client =>
|
||||
val user = System.getProperty("user.name")
|
||||
val sessionHandle = client.openSession(user, "")
|
||||
|
@ -567,7 +566,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
|
|||
}
|
||||
|
||||
class SingleSessionSuite extends HiveThriftJdbcTest {
|
||||
override def mode: ServerMode.Value = ServerMode.http
|
||||
override def mode: ServerMode.Value = ServerMode.binary
|
||||
|
||||
override protected def extraConf: Seq[String] =
|
||||
"--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil
|
||||
|
@ -617,6 +616,38 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
|
|||
}
|
||||
}
|
||||
|
||||
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
|
||||
override def mode: ServerMode.Value = ServerMode.http
|
||||
|
||||
test("JDBC query execution") {
|
||||
withJdbcStatement { statement =>
|
||||
val queries = Seq(
|
||||
"SET spark.sql.shuffle.partitions=3",
|
||||
"DROP TABLE IF EXISTS test",
|
||||
"CREATE TABLE test(key INT, val STRING)",
|
||||
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
|
||||
"CACHE TABLE test")
|
||||
|
||||
queries.foreach(statement.execute)
|
||||
|
||||
assertResult(5, "Row count mismatch") {
|
||||
val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test")
|
||||
resultSet.next()
|
||||
resultSet.getInt(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("Checks Hive version") {
|
||||
withJdbcStatement { statement =>
|
||||
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
|
||||
resultSet.next()
|
||||
assert(resultSet.getString(1) === "spark.sql.hive.version")
|
||||
assert(resultSet.getString(2) === HiveUtils.hiveExecutionVersion)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object ServerMode extends Enumeration {
|
||||
val binary, http = Value
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ class UISeleniumSuite
|
|||
implicit var webDriver: WebDriver = _
|
||||
var server: HiveThriftServer2 = _
|
||||
val uiPort = 20000 + Random.nextInt(10000)
|
||||
override def mode: ServerMode.Value = ServerMode.http
|
||||
override def mode: ServerMode.Value = ServerMode.binary
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
webDriver = new HtmlUnitDriver {
|
||||
|
|
Loading…
Reference in a new issue