[SQL] SPARK-4700: Add HTTP protocol spark thrift server
Add HTTP protocol support and test cases to spark thrift server, so users can deploy thrift server in both TCP and http mode. Author: Judy Nash <judynash@microsoft.com> Author: judynash <judynash@microsoft.com> Closes #3672 from judynash/master and squashes the following commits: 526315d [Judy Nash] correct spacing on startThriftServer method 31a6520 [Judy Nash] fix code style issues and update sql programming guide format issue 47bf87e [Judy Nash] modify withJdbcStatement method definition to meet less than 100 line length 2e9c11c [Judy Nash] add thrift server in http mode documentation on sql programming guide 1cbd305 [Judy Nash] Merge remote-tracking branch 'upstream/master' 2b1d312 [Judy Nash] updated http thrift server support based on feedback 377532c [judynash] add HTTP protocol spark thrift server
This commit is contained in:
parent
d12c0711fa
commit
17688d1429
|
@ -938,6 +938,18 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
|
|||
|
||||
You may also use the beeline script that comes with Hive.
|
||||
|
||||
Thrift JDBC server also supports sending thrift RPC messages over HTTP transport.
|
||||
Use the following setting to enable HTTP mode as system property or in `hive-site.xml` file in `conf/`:
|
||||
|
||||
hive.server2.transport.mode - Set this to value: http
|
||||
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
|
||||
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
|
||||
|
||||
To test, use beeline to connect to the JDBC/ODBC server in http mode with:
|
||||
|
||||
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
|
||||
|
||||
|
||||
## Running the Spark SQL CLI
|
||||
|
||||
The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute
|
||||
|
|
|
@ -19,7 +19,8 @@ package org.apache.spark.sql.hive.thriftserver
|
|||
|
||||
import org.apache.commons.logging.LogFactory
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
|
||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
|
||||
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
|
||||
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
|
||||
|
||||
import org.apache.spark.Logging
|
||||
|
@ -85,10 +86,22 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext)
|
|||
setSuperField(this, "cliService", sparkSqlCliService)
|
||||
addService(sparkSqlCliService)
|
||||
|
||||
val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService)
|
||||
setSuperField(this, "thriftCLIService", thriftCliService)
|
||||
addService(thriftCliService)
|
||||
if (isHTTPTransportMode(hiveConf)) {
|
||||
val thriftCliService = new ThriftHttpCLIService(sparkSqlCliService)
|
||||
setSuperField(this, "thriftCLIService", thriftCliService)
|
||||
addService(thriftCliService)
|
||||
} else {
|
||||
val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService)
|
||||
setSuperField(this, "thriftCLIService", thriftCliService)
|
||||
addService(thriftCliService)
|
||||
}
|
||||
|
||||
initCompositeService(hiveConf)
|
||||
}
|
||||
|
||||
private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
|
||||
val transportMode: String = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
|
||||
transportMode.equalsIgnoreCase("http")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -70,11 +70,20 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|
|||
port
|
||||
}
|
||||
|
||||
def withJdbcStatement(serverStartTimeout: FiniteDuration = 1.minute)(f: Statement => Unit) {
|
||||
def withJdbcStatement(
|
||||
serverStartTimeout: FiniteDuration = 1.minute,
|
||||
httpMode: Boolean = false)(
|
||||
f: Statement => Unit) {
|
||||
val port = randomListeningPort
|
||||
|
||||
startThriftServer(port, serverStartTimeout) {
|
||||
val jdbcUri = s"jdbc:hive2://${"localhost"}:$port/"
|
||||
startThriftServer(port, serverStartTimeout, httpMode) {
|
||||
val jdbcUri = if (httpMode) {
|
||||
s"jdbc:hive2://${"localhost"}:$port/" +
|
||||
"default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice"
|
||||
} else {
|
||||
s"jdbc:hive2://${"localhost"}:$port/"
|
||||
}
|
||||
|
||||
val user = System.getProperty("user.name")
|
||||
val connection = DriverManager.getConnection(jdbcUri, user, "")
|
||||
val statement = connection.createStatement()
|
||||
|
@ -113,7 +122,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|
|||
|
||||
def startThriftServer(
|
||||
port: Int,
|
||||
serverStartTimeout: FiniteDuration = 1.minute)(
|
||||
serverStartTimeout: FiniteDuration = 1.minute,
|
||||
httpMode: Boolean = false)(
|
||||
f: => Unit) {
|
||||
val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
|
||||
val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
|
||||
|
@ -121,15 +131,28 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|
|||
val warehousePath = getTempFilePath("warehouse")
|
||||
val metastorePath = getTempFilePath("metastore")
|
||||
val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
|
||||
|
||||
val command =
|
||||
s"""$startScript
|
||||
| --master local
|
||||
| --hiveconf hive.root.logger=INFO,console
|
||||
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
|
||||
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
|
||||
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=${"localhost"}
|
||||
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port
|
||||
""".stripMargin.split("\\s+").toSeq
|
||||
if (httpMode) {
|
||||
s"""$startScript
|
||||
| --master local
|
||||
| --hiveconf hive.root.logger=INFO,console
|
||||
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
|
||||
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
|
||||
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
|
||||
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=http
|
||||
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT}=$port
|
||||
""".stripMargin.split("\\s+").toSeq
|
||||
} else {
|
||||
s"""$startScript
|
||||
| --master local
|
||||
| --hiveconf hive.root.logger=INFO,console
|
||||
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
|
||||
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
|
||||
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
|
||||
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port
|
||||
""".stripMargin.split("\\s+").toSeq
|
||||
}
|
||||
|
||||
val serverRunning = Promise[Unit]()
|
||||
val buffer = new ArrayBuffer[String]()
|
||||
|
@ -140,7 +163,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|
|||
|
||||
def captureLogOutput(line: String): Unit = {
|
||||
buffer += line
|
||||
if (line.contains("ThriftBinaryCLIService listening on")) {
|
||||
if (line.contains("ThriftBinaryCLIService listening on") ||
|
||||
line.contains("Started ThriftHttpCLIService in http")) {
|
||||
serverRunning.success(())
|
||||
}
|
||||
}
|
||||
|
@ -217,6 +241,25 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
test("Test JDBC query execution in Http Mode") {
|
||||
withJdbcStatement(httpMode = true) { statement =>
|
||||
val queries = Seq(
|
||||
"SET spark.sql.shuffle.partitions=3",
|
||||
"DROP TABLE IF EXISTS test",
|
||||
"CREATE TABLE test(key INT, val STRING)",
|
||||
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
|
||||
"CACHE TABLE test")
|
||||
|
||||
queries.foreach(statement.execute)
|
||||
|
||||
assertResult(5, "Row count mismatch") {
|
||||
val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test")
|
||||
resultSet.next()
|
||||
resultSet.getInt(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-3004 regression: result set containing NULL") {
|
||||
withJdbcStatement() { statement =>
|
||||
val queries = Seq(
|
||||
|
@ -267,6 +310,14 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
test("Checks Hive version in Http Mode") {
|
||||
withJdbcStatement(httpMode = true) { statement =>
|
||||
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
|
||||
resultSet.next()
|
||||
assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-4292 regression: result set iterator issue") {
|
||||
withJdbcStatement() { statement =>
|
||||
val queries = Seq(
|
||||
|
|
Loading…
Reference in a new issue