[SPARK-28293][SQL] Implement Spark's own GetTableTypesOperation
## What changes were proposed in this pull request? The table type is from Hive now. This will have some issues. For example, we don't support `index_table`, different Hive supports different table types: Build with Hive 1.2.1: ![image](https://user-images.githubusercontent.com/5399861/60792689-be38b880-a198-11e9-82b8-868992a505e3.png) Build with Hive 2.3.5: ![image](https://user-images.githubusercontent.com/5399861/60792727-d4467900-a198-11e9-952c-210bb7bb3bed.png) This pr implement Spark's own `GetTableTypesOperation`. ## How was this patch tested? unit tests and manual tests: ![image](https://user-images.githubusercontent.com/5399861/60793368-2a67ec00-a19a-11e9-9511-c67483dcc370.png) Closes #25073 from wangyum/SPARK-28293. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
parent
167fa0402d
commit
045191e610
|
@ -561,6 +561,8 @@ object CatalogTableType {
|
|||
val EXTERNAL = new CatalogTableType("EXTERNAL")
|
||||
val MANAGED = new CatalogTableType("MANAGED")
|
||||
val VIEW = new CatalogTableType("VIEW")
|
||||
|
||||
val tableTypes = Seq(EXTERNAL, MANAGED, VIEW)
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hive.thriftserver
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
|
||||
import org.apache.hive.service.cli._
|
||||
import org.apache.hive.service.cli.operation.GetTableTypesOperation
|
||||
import org.apache.hive.service.cli.session.HiveSession
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
||||
import org.apache.spark.util.{Utils => SparkUtils}
|
||||
|
||||
/**
|
||||
* Spark's own GetTableTypesOperation
|
||||
*
|
||||
* @param sqlContext SQLContext to use
|
||||
* @param parentSession a HiveSession from SessionManager
|
||||
*/
|
||||
private[hive] class SparkGetTableTypesOperation(
|
||||
sqlContext: SQLContext,
|
||||
parentSession: HiveSession)
|
||||
extends GetTableTypesOperation(parentSession) with SparkMetadataOperationUtils with Logging {
|
||||
|
||||
private var statementId: String = _
|
||||
|
||||
override def close(): Unit = {
|
||||
super.close()
|
||||
HiveThriftServer2.listener.onOperationClosed(statementId)
|
||||
}
|
||||
|
||||
override def runInternal(): Unit = {
|
||||
statementId = UUID.randomUUID().toString
|
||||
val logMsg = "Listing table types"
|
||||
logInfo(s"$logMsg with $statementId")
|
||||
setState(OperationState.RUNNING)
|
||||
// Always use the latest class loader provided by executionHive's state.
|
||||
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
|
||||
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
|
||||
|
||||
if (isAuthV2Enabled) {
|
||||
authorizeMetaGets(HiveOperationType.GET_TABLETYPES, null)
|
||||
}
|
||||
|
||||
HiveThriftServer2.listener.onStatementStart(
|
||||
statementId,
|
||||
parentSession.getSessionHandle.getSessionId.toString,
|
||||
logMsg,
|
||||
statementId,
|
||||
parentSession.getUsername)
|
||||
|
||||
try {
|
||||
val tableTypes = CatalogTableType.tableTypes.map(tableTypeString).toSet
|
||||
tableTypes.foreach { tableType =>
|
||||
rowSet.addRow(Array[AnyRef](tableType))
|
||||
}
|
||||
setState(OperationState.FINISHED)
|
||||
} catch {
|
||||
case e: HiveSQLException =>
|
||||
setState(OperationState.ERROR)
|
||||
HiveThriftServer2.listener.onStatementError(
|
||||
statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw e
|
||||
}
|
||||
HiveThriftServer2.listener.onStatementFinish(statementId)
|
||||
}
|
||||
}
|
|
@ -53,7 +53,7 @@ private[hive] class SparkGetTablesOperation(
|
|||
tableName: String,
|
||||
tableTypes: JList[String])
|
||||
extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes)
|
||||
with Logging{
|
||||
with SparkMetadataOperationUtils with Logging {
|
||||
|
||||
private var statementId: String = _
|
||||
|
||||
|
@ -146,11 +146,4 @@ private[hive] class SparkGetTablesOperation(
|
|||
rowSet.addRow(rowData)
|
||||
}
|
||||
}
|
||||
|
||||
private def tableTypeString(tableType: CatalogTableType): String = tableType match {
|
||||
case EXTERNAL | MANAGED => "TABLE"
|
||||
case VIEW => "VIEW"
|
||||
case t =>
|
||||
throw new IllegalArgumentException(s"Unknown table type is found at showCreateHiveTable: $t")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hive.thriftserver
|
||||
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW}
|
||||
|
||||
/**
|
||||
* Utils for metadata operations.
|
||||
*/
|
||||
private[hive] trait SparkMetadataOperationUtils {
|
||||
|
||||
def tableTypeString(tableType: CatalogTableType): String = tableType match {
|
||||
case EXTERNAL | MANAGED => "TABLE"
|
||||
case VIEW => "VIEW"
|
||||
case t =>
|
||||
throw new IllegalArgumentException(s"Unknown table type is found: $t")
|
||||
}
|
||||
}
|
|
@ -21,13 +21,13 @@ import java.util.{List => JList, Map => JMap}
|
|||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import org.apache.hive.service.cli._
|
||||
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, GetColumnsOperation, GetSchemasOperation, MetadataOperation, Operation, OperationManager}
|
||||
import org.apache.hive.service.cli.operation._
|
||||
import org.apache.hive.service.cli.session.HiveSession
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.hive.HiveUtils
|
||||
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation, SparkGetColumnsOperation, SparkGetSchemasOperation, SparkGetTablesOperation}
|
||||
import org.apache.spark.sql.hive.thriftserver._
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
/**
|
||||
|
@ -100,7 +100,7 @@ private[thriftserver] class SparkSQLOperationManager()
|
|||
columnName: String): GetColumnsOperation = synchronized {
|
||||
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
|
||||
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
|
||||
s" initialized or had already closed.")
|
||||
" initialized or had already closed.")
|
||||
val operation = new SparkGetColumnsOperation(sqlContext, parentSession,
|
||||
catalogName, schemaName, tableName, columnName)
|
||||
handleToOperation.put(operation.getHandle, operation)
|
||||
|
@ -108,6 +108,17 @@ private[thriftserver] class SparkSQLOperationManager()
|
|||
operation
|
||||
}
|
||||
|
||||
override def newGetTableTypesOperation(
|
||||
parentSession: HiveSession): GetTableTypesOperation = synchronized {
|
||||
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
|
||||
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
|
||||
" initialized or had already closed.")
|
||||
val operation = new SparkGetTableTypesOperation(sqlContext, parentSession)
|
||||
handleToOperation.put(operation.getHandle, operation)
|
||||
logDebug(s"Created GetTableTypesOperation with session=$parentSession.")
|
||||
operation
|
||||
}
|
||||
|
||||
def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
|
||||
val iterator = confMap.entrySet().iterator()
|
||||
while (iterator.hasNext) {
|
||||
|
|
|
@ -166,4 +166,20 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
|
|||
checkResult(metaData.getColumns(null, "%", "table_not_exist", null), Seq.empty)
|
||||
}
|
||||
}
|
||||
|
||||
test("Spark's own GetTableTypesOperation(SparkGetTableTypesOperation)") {
|
||||
def checkResult(rs: ResultSet, tableTypes: Seq[String]): Unit = {
|
||||
for (i <- tableTypes.indices) {
|
||||
assert(rs.next())
|
||||
assert(rs.getString("TABLE_TYPE") === tableTypes(i))
|
||||
}
|
||||
// Make sure there are no more elements
|
||||
assert(!rs.next())
|
||||
}
|
||||
|
||||
withJdbcStatement() { statement =>
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
checkResult(metaData.getTableTypes, Seq("TABLE", "VIEW"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ public class GetTableTypesOperation extends MetadataOperation {
|
|||
protected static TableSchema RESULT_SET_SCHEMA = new TableSchema()
|
||||
.addStringColumn("TABLE_TYPE", "Table type name.");
|
||||
|
||||
private final RowSet rowSet;
|
||||
protected final RowSet rowSet;
|
||||
private final TableTypeMapping tableTypeMapping;
|
||||
|
||||
protected GetTableTypesOperation(HiveSession parentSession) {
|
||||
|
|
|
@ -39,7 +39,7 @@ public class GetTableTypesOperation extends MetadataOperation {
|
|||
protected static TableSchema RESULT_SET_SCHEMA = new TableSchema()
|
||||
.addStringColumn("TABLE_TYPE", "Table type name.");
|
||||
|
||||
private final RowSet rowSet;
|
||||
protected final RowSet rowSet;
|
||||
private final TableTypeMapping tableTypeMapping;
|
||||
|
||||
protected GetTableTypesOperation(HiveSession parentSession) {
|
||||
|
|
Loading…
Reference in a new issue