[SPARK-3826][SQL]enable hive-thriftserver to support hive-0.13.1

In #2241 hive-thriftserver is not enabled. This patch enable hive-thriftserver to support hive-0.13.1 by using a shim layer refer to #2241.

 1 A light shim layer(code in sql/hive-thriftserver/hive-version) for each different hive version to handle api compatibility

 2 New pom profiles "hive-default" and "hive-versions"(copy from #2241) to activate different hive version

 3 SBT cmd for different version as follows:
   hive-0.12.0 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.12.0 assembly
   hive-0.13.1 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.13.1 assembly

 4 Since hive-thriftserver depend on hive subproject, this patch should be merged with #2241 to enable hive-0.13.1 for hive-thriftserver

Author: wangfei <wangfei1@huawei.com>
Author: scwf <wangfei1@huawei.com>

Closes #2685 from scwf/shim-thriftserver1 and squashes the following commits:

f26f3be [wangfei] remove clean to save time
f5cac74 [wangfei] remove local hivecontext test
578234d [wangfei] use new shaded hive
18fb1ff [wangfei] exclude kryo in hive pom
fa21d09 [wangfei] clean package assembly/assembly
8a4daf2 [wangfei] minor fix
0d7f6cf [wangfei] address comments
f7c93ae [wangfei] adding build with hive 0.13 before running tests
bcf943f [wangfei] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1
c359822 [wangfei] reuse getCommandProcessor in hiveshim
52674a4 [scwf] sql/hive included since examples depend on it
3529e98 [scwf] move hive module to hive profile
f51ff4e [wangfei] update and fix conflicts
f48d3a5 [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1
41f727b [scwf] revert pom changes
13afde0 [scwf] fix small bug
4b681f4 [scwf] enable thriftserver in profile hive-0.13.1
0bc53aa [scwf] fixed when result filed is null
dfd1c63 [scwf] update run-tests to run hive-0.12.0 default now
c6da3ce [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver
7c66b8e [scwf] update pom according spark-2706
ae47489 [scwf] update and fix conflicts
This commit is contained in:
wangfei 2014-10-31 11:27:59 -07:00 committed by Michael Armbrust
parent adb6415c1d
commit 7c41d13570
12 changed files with 573 additions and 232 deletions

View file

@ -201,12 +201,6 @@
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
<id>hive-0.12.0</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>

View file

@ -142,17 +142,24 @@ CURRENT_BLOCK=$BLOCK_BUILD
# We always build with Hive because the PySpark Spark SQL tests need it.
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0"
echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS"
# NOTE: echo "q" is needed because sbt on encountering a build file with failure
#+ (either resolution or compilation) prompts the user for input either q, r, etc
#+ to quit or retry. This echo is there to make it not block.
# NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
# NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
#+ single argument!
# QUESTION: Why doesn't 'yes "q"' work?
# QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work?
# First build with 0.12 to ensure patches do not break the hive 12 build
echo "[info] Compile with hive 0.12"
echo -e "q\n" \
| sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly \
| sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thriftserver/compile \
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
# Then build with default version(0.13.1) because tests are based on this version
echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS -Phive"
echo -e "q\n" \
| sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive package assembly/assembly \
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
}

33
pom.xml
View file

@ -129,7 +129,7 @@
<flume.version>1.4.0</flume.version>
<zookeeper.version>3.4.5</zookeeper.version>
<!-- Version used in Maven Hive dependency -->
<hive.version>0.13.1</hive.version>
<hive.version>0.13.1a</hive.version>
<!-- Version used for internal directory structure -->
<hive.version.short>0.13.1</hive.version.short>
<derby.version>10.10.1.1</derby.version>
@ -240,6 +240,18 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<!-- This is temporarily included to fix issues with Hive 0.13 -->
<id>spark-staging-hive13</id>
<name>Spring Staging Repository Hive 13</name>
<url>https://oss.sonatype.org/content/repositories/orgspark-project-1089/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
@ -908,9 +920,9 @@
by Spark SQL for code generation. -->
<compilerPlugins>
<compilerPlugin>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
</compilerPlugin>
</compilerPlugins>
</configuration>
@ -1313,15 +1325,20 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<modules>
<module>sql/hive-thriftserver</module>
</modules>
</profile>
<profile>
<id>hive-0.12.0</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
<modules>
<module>sql/hive-thriftserver</module>
</modules>
<properties>
<hive.version>0.12.0-protobuf-2.5</hive.version>
<hive.version.short>0.12.0</hive.version.short>

View file

@ -1400,33 +1400,6 @@ class HiveContext(SQLContext):
class LocalHiveContext(HiveContext):
"""Starts up an instance of hive where metadata is stored locally.
An in-process metadata data is created with data stored in ./metadata.
Warehouse data is stored in in ./warehouse.
>>> import os
>>> hiveCtx = LocalHiveContext(sc)
>>> try:
... supress = hiveCtx.sql("DROP TABLE src")
... except Exception:
... pass
>>> kv1 = os.path.join(os.environ["SPARK_HOME"],
... 'examples/src/main/resources/kv1.txt')
>>> supress = hiveCtx.sql(
... "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
>>> supress = hiveCtx.sql("LOAD DATA LOCAL INPATH '%s' INTO TABLE src"
... % kv1)
>>> results = hiveCtx.sql("FROM src SELECT value"
... ).map(lambda r: int(r.value.split('_')[1]))
>>> num = results.count()
>>> reduce_sum = results.reduce(lambda x, y: x + y)
>>> num
500
>>> reduce_sum
130091
"""
def __init__(self, sparkContext, sqlContext=None):
HiveContext.__init__(self, sparkContext, sqlContext)
warnings.warn("LocalHiveContext is deprecated. "

View file

@ -70,6 +70,24 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-default-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>v${hive.version.short}/src/main/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>

View file

@ -29,11 +29,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.spark.Logging
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
extends Driver with Logging {
private[hive] abstract class AbstractSparkSQLDriver(
val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging {
private var tableSchema: Schema = _
private var hiveResponse: Seq[String] = _
private[hive] var tableSchema: Schema = _
private[hive] var hiveResponse: Seq[String] = _
override def init(): Unit = {
}
@ -74,16 +74,6 @@ private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveCo
override def getSchema: Schema = tableSchema
override def getResults(res: JArrayList[String]): Boolean = {
if (hiveResponse == null) {
false
} else {
res.addAll(hiveResponse)
hiveResponse = null
true
}
}
override def destroy() {
super.destroy()
hiveResponse = null

View file

@ -38,6 +38,8 @@ import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.thrift.transport.TSocket
import org.apache.spark.Logging
import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.hive.thriftserver.HiveThriftServerShim
private[hive] object SparkSQLCLIDriver {
private var prompt = "spark-sql"
@ -116,7 +118,7 @@ private[hive] object SparkSQLCLIDriver {
}
}
if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) {
if (!sessionState.isRemoteMode) {
// Hadoop-20 and above - we need to augment classpath using hiveconf
// components.
// See also: code in ExecDriver.java
@ -258,7 +260,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
} else {
var ret = 0
val hconf = conf.asInstanceOf[HiveConf]
val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf)
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf)
if (proc != null) {
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor]) {

View file

@ -24,6 +24,7 @@ import java.util.{List => JList}
import javax.security.auth.login.LoginException
import org.apache.commons.logging.Log
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hive.service.Service.STATE
@ -44,15 +45,17 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
setSuperField(this, "sessionManager", sparkSqlSessionManager)
addService(sparkSqlSessionManager)
var sparkServiceUGI: UserGroupInformation = null
try {
HiveAuthFactory.loginFromKeytab(hiveConf)
val serverUserName = ShimLoader.getHadoopShims
.getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf))
setSuperField(this, "serverUserName", serverUserName)
} catch {
case e @ (_: IOException | _: LoginException) =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
try {
HiveAuthFactory.loginFromKeytab(hiveConf)
sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
HiveThriftServerShim.setServerUserName(sparkServiceUGI, this)
} catch {
case e @ (_: IOException | _: LoginException) =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
}
}
initCompositeService(hiveConf)

View file

@ -17,24 +17,15 @@
package org.apache.spark.sql.hive.thriftserver.server
import java.sql.Timestamp
import java.util.{Map => JMap}
import scala.collection.mutable.Map
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map}
import scala.math.{random, round}
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, ReflectionUtils}
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
@ -54,158 +45,8 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val operation = new ExecuteStatementOperation(parentSession, statement, confOverlay) {
private var result: SchemaRDD = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug("CLOSING")
}
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
if (!iter.hasNext) {
new RowSet()
} else {
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
var curRow = 0
var rowSet = new ArrayBuffer[Row](maxRows.min(1024))
while (curRow < maxRows && iter.hasNext) {
val sparkRow = iter.next()
val row = new Row()
var curCol = 0
while (curCol < sparkRow.length) {
if (sparkRow.isNullAt(curCol)) {
addNullColumnValue(sparkRow, row, curCol)
} else {
addNonNullColumnValue(sparkRow, row, curCol)
}
curCol += 1
}
rowSet += row
curRow += 1
}
new RowSet(rowSet, 0)
}
}
def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to.addString(from(ordinal).asInstanceOf[String])
case IntegerType =>
to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal)))
case BooleanType =>
to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal)))
case DoubleType =>
to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal)))
case FloatType =>
to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal)))
case DecimalType =>
val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
case LongType =>
to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal)))
case ByteType =>
to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
case TimestampType =>
to.addColumnValue(
ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
val hiveString = result
.queryExecution
.asInstanceOf[HiveContext#QueryExecution]
.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to.addColumnValue(ColumnValue.stringValue(hiveString))
}
}
def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to.addString(null)
case IntegerType =>
to.addColumnValue(ColumnValue.intValue(null))
case BooleanType =>
to.addColumnValue(ColumnValue.booleanValue(null))
case DoubleType =>
to.addColumnValue(ColumnValue.doubleValue(null))
case FloatType =>
to.addColumnValue(ColumnValue.floatValue(null))
case DecimalType =>
to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal))
case LongType =>
to.addColumnValue(ColumnValue.longValue(null))
case ByteType =>
to.addColumnValue(ColumnValue.byteValue(null))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(null))
case TimestampType =>
to.addColumnValue(ColumnValue.timestampValue(null))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
to.addColumnValue(ColumnValue.stringValue(null: String))
}
}
def getResultSetSchema: TableSchema = {
logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
if (result.queryExecution.analyzed.output.size == 0) {
new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
} else {
val schema = result.queryExecution.analyzed.output.map { attr =>
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
}
new TableSchema(schema)
}
}
def run(): Unit = {
logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING)
try {
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
sessionToActivePool(parentSession) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
sessionToActivePool.get(parentSession).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {
val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
resultRdd.toLocalIterator
} else {
resultRdd.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
setHasResultSet(true)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
setState(OperationState.FINISHED)
}
}
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)(
hiveContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
operation
}

View file

@ -0,0 +1,225 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import java.sql.Timestamp
import java.util.{ArrayList => JArrayList, Map => JMap}
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.math._
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
/**
* A compatibility layer for interacting with Hive version 0.12.0.
*/
private[thriftserver] object HiveThriftServerShim {
val version = "0.12.0"
def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
val serverUserName = ShimLoader.getHadoopShims.getShortUserName(sparkServiceUGI)
setSuperField(sparkCliService, "serverUserName", serverUserName)
}
}
private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext)
extends AbstractSparkSQLDriver(_context) {
override def getResults(res: JArrayList[String]): Boolean = {
if (hiveResponse == null) {
false
} else {
res.addAll(hiveResponse)
hiveResponse = null
true
}
}
}
private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String])(
hiveContext: HiveContext,
sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
parentSession, statement, confOverlay) with Logging {
private var result: SchemaRDD = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug("CLOSING")
}
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
if (!iter.hasNext) {
new RowSet()
} else {
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
var curRow = 0
var rowSet = new ArrayBuffer[Row](maxRows.min(1024))
while (curRow < maxRows && iter.hasNext) {
val sparkRow = iter.next()
val row = new Row()
var curCol = 0
while (curCol < sparkRow.length) {
if (sparkRow.isNullAt(curCol)) {
addNullColumnValue(sparkRow, row, curCol)
} else {
addNonNullColumnValue(sparkRow, row, curCol)
}
curCol += 1
}
rowSet += row
curRow += 1
}
new RowSet(rowSet, 0)
}
}
def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to.addString(from(ordinal).asInstanceOf[String])
case IntegerType =>
to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal)))
case BooleanType =>
to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal)))
case DoubleType =>
to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal)))
case FloatType =>
to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal)))
case DecimalType =>
val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
case LongType =>
to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal)))
case ByteType =>
to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
case TimestampType =>
to.addColumnValue(
ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
val hiveString = result
.queryExecution
.asInstanceOf[HiveContext#QueryExecution]
.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to.addColumnValue(ColumnValue.stringValue(hiveString))
}
}
def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to.addString(null)
case IntegerType =>
to.addColumnValue(ColumnValue.intValue(null))
case BooleanType =>
to.addColumnValue(ColumnValue.booleanValue(null))
case DoubleType =>
to.addColumnValue(ColumnValue.doubleValue(null))
case FloatType =>
to.addColumnValue(ColumnValue.floatValue(null))
case DecimalType =>
to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal))
case LongType =>
to.addColumnValue(ColumnValue.longValue(null))
case ByteType =>
to.addColumnValue(ColumnValue.byteValue(null))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(null))
case TimestampType =>
to.addColumnValue(ColumnValue.timestampValue(null))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
to.addColumnValue(ColumnValue.stringValue(null: String))
}
}
def getResultSetSchema: TableSchema = {
logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
if (result.queryExecution.analyzed.output.size == 0) {
new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
} else {
val schema = result.queryExecution.analyzed.output.map { attr =>
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
}
new TableSchema(schema)
}
}
def run(): Unit = {
logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING)
try {
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
sessionToActivePool(parentSession) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
sessionToActivePool.get(parentSession).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {
val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
resultRdd.toLocalIterator
} else {
resultRdd.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
setHasResultSet(true)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
setState(OperationState.FINISHED)
}
}

View file

@ -0,0 +1,267 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
import java.sql.Timestamp
import java.util.concurrent.Future
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.math._
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.{Row => SparkRow, SchemaRDD}
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
/**
* A compatibility layer for interacting with Hive version 0.12.0.
*/
private[thriftserver] object HiveThriftServerShim {
val version = "0.13.1"
def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)
}
}
private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext)
extends AbstractSparkSQLDriver(_context) {
override def getResults(res: JList[_]): Boolean = {
if (hiveResponse == null) {
false
} else {
res.asInstanceOf[JArrayList[String]].addAll(hiveResponse)
hiveResponse = null
true
}
}
}
private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)(
hiveContext: HiveContext,
sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
parentSession, statement, confOverlay, runInBackground) with Logging {
private var result: SchemaRDD = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
private def runInternal(cmd: String) = {
try {
result = hiveContext.sql(cmd)
logDebug(result.queryExecution.toString())
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
iter = {
val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
resultRdd.toLocalIterator
} else {
resultRdd.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
}
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug("CLOSING")
}
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to += from.get(ordinal).asInstanceOf[String]
case IntegerType =>
to += from.getInt(ordinal)
case BooleanType =>
to += from.getBoolean(ordinal)
case DoubleType =>
to += from.getDouble(ordinal)
case FloatType =>
to += from.getFloat(ordinal)
case DecimalType =>
to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
case LongType =>
to += from.getLong(ordinal)
case ByteType =>
to += from.getByte(ordinal)
case ShortType =>
to += from.getShort(ordinal)
case TimestampType =>
to += from.get(ordinal).asInstanceOf[Timestamp]
case BinaryType =>
to += from.get(ordinal).asInstanceOf[String]
case _: ArrayType =>
to += from.get(ordinal).asInstanceOf[String]
case _: StructType =>
to += from.get(ordinal).asInstanceOf[String]
case _: MapType =>
to += from.get(ordinal).asInstanceOf[String]
}
}
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
if (!iter.hasNext) {
reultRowSet
} else {
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
var curRow = 0
while (curRow < maxRows && iter.hasNext) {
val sparkRow = iter.next()
val row = ArrayBuffer[Any]()
var curCol = 0
while (curCol < sparkRow.length) {
if (sparkRow.isNullAt(curCol)) {
row += null
} else {
addNonNullColumnValue(sparkRow, row, curCol)
}
curCol += 1
}
reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
curRow += 1
}
reultRowSet
}
}
def getResultSetSchema: TableSchema = {
logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
if (result.queryExecution.analyzed.output.size == 0) {
new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
} else {
val schema = result.queryExecution.analyzed.output.map { attr =>
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
}
new TableSchema(schema)
}
}
private def getConfigForOperation: HiveConf = {
var sqlOperationConf: HiveConf = getParentSession.getHiveConf
if (!getConfOverlay.isEmpty || shouldRunAsync) {
sqlOperationConf = new HiveConf(sqlOperationConf)
import scala.collection.JavaConversions._
for (confEntry <- getConfOverlay.entrySet) {
try {
sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
}
catch {
case e: IllegalArgumentException => {
throw new HiveSQLException("Error applying statement specific settings", e)
}
}
}
}
return sqlOperationConf
}
def run(): Unit = {
logInfo(s"Running query '$statement'")
val opConfig: HiveConf = getConfigForOperation
setState(OperationState.RUNNING)
setHasResultSet(true)
if (!shouldRunAsync) {
runInternal(statement)
setState(OperationState.FINISHED)
} else {
val parentSessionState = SessionState.get
val sessionHive: Hive = Hive.get
val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)
val backgroundOperation: Runnable = new Runnable {
def run {
val doAsAction: PrivilegedExceptionAction[AnyRef] =
new PrivilegedExceptionAction[AnyRef] {
def run: AnyRef = {
Hive.set(sessionHive)
SessionState.setCurrentSessionState(parentSessionState)
try {
runInternal(statement)
}
catch {
case e: HiveSQLException => {
setOperationException(e)
logError("Error running hive query: ", e)
}
}
return null
}
}
try {
ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
}
catch {
case e: Exception => {
setOperationException(new HiveSQLException(e))
logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
}
}
setState(OperationState.FINISHED)
}
}
try {
val backgroundHandle: Future[_] = getParentSession.getSessionManager.
submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
}
}
}

View file

@ -65,6 +65,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>