[SPARK-28852][SQL] Implement SparkGetCatalogsOperation for Thrift Server
### What changes were proposed in this pull request? This PR implements `SparkGetCatalogsOperation` for Thrift Server metadata completeness. ### Why are the changes needed? Thrift Server metadata completeness. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test Closes #25555 from wangyum/SPARK-28852. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Xiao Li <gatorsmile@gmail.com>
This commit is contained in:
parent
a3328cdc0a
commit
adb506afd7
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hive.thriftserver
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
|
||||
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
|
||||
import org.apache.hive.service.cli.operation.GetCatalogsOperation
|
||||
import org.apache.hive.service.cli.session.HiveSession
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.util.{Utils => SparkUtils}
|
||||
|
||||
/**
|
||||
* Spark's own GetCatalogsOperation
|
||||
*
|
||||
* @param sqlContext SQLContext to use
|
||||
* @param parentSession a HiveSession from SessionManager
|
||||
*/
|
||||
private[hive] class SparkGetCatalogsOperation(
|
||||
sqlContext: SQLContext,
|
||||
parentSession: HiveSession)
|
||||
extends GetCatalogsOperation(parentSession) with Logging {
|
||||
|
||||
private var statementId: String = _
|
||||
|
||||
override def close(): Unit = {
|
||||
super.close()
|
||||
HiveThriftServer2.listener.onOperationClosed(statementId)
|
||||
}
|
||||
|
||||
override def runInternal(): Unit = {
|
||||
statementId = UUID.randomUUID().toString
|
||||
val logMsg = "Listing catalogs"
|
||||
logInfo(s"$logMsg with $statementId")
|
||||
setState(OperationState.RUNNING)
|
||||
// Always use the latest class loader provided by executionHive's state.
|
||||
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
|
||||
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
|
||||
|
||||
HiveThriftServer2.listener.onStatementStart(
|
||||
statementId,
|
||||
parentSession.getSessionHandle.getSessionId.toString,
|
||||
logMsg,
|
||||
statementId,
|
||||
parentSession.getUsername)
|
||||
|
||||
try {
|
||||
if (isAuthV2Enabled) {
|
||||
authorizeMetaGets(HiveOperationType.GET_CATALOGS, null)
|
||||
}
|
||||
setState(OperationState.FINISHED)
|
||||
} catch {
|
||||
case e: HiveSQLException =>
|
||||
setState(OperationState.ERROR)
|
||||
HiveThriftServer2.listener.onStatementError(
|
||||
statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw e
|
||||
}
|
||||
HiveThriftServer2.listener.onStatementFinish(statementId)
|
||||
}
|
||||
}
|
|
@ -63,6 +63,17 @@ private[thriftserver] class SparkSQLOperationManager()
|
|||
operation
|
||||
}
|
||||
|
||||
override def newGetCatalogsOperation(
|
||||
parentSession: HiveSession): GetCatalogsOperation = synchronized {
|
||||
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
|
||||
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
|
||||
" initialized or had already closed.")
|
||||
val operation = new SparkGetCatalogsOperation(sqlContext, parentSession)
|
||||
handleToOperation.put(operation.getHandle, operation)
|
||||
logDebug(s"Created GetCatalogsOperation with session=$parentSession.")
|
||||
operation
|
||||
}
|
||||
|
||||
override def newGetSchemasOperation(
|
||||
parentSession: HiveSession,
|
||||
catalogName: String,
|
||||
|
|
|
@ -223,4 +223,12 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
|
|||
assert(!rs.next())
|
||||
}
|
||||
}
|
||||
|
||||
test("Spark's own GetCatalogsOperation(SparkGetCatalogsOperation)") {
|
||||
withJdbcStatement() { statement =>
|
||||
val metaData = statement.getConnection.getMetaData
|
||||
val rs = metaData.getCatalogs
|
||||
assert(!rs.next())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ public class GetCatalogsOperation extends MetadataOperation {
|
|||
private static final TableSchema RESULT_SET_SCHEMA = new TableSchema()
|
||||
.addStringColumn("TABLE_CAT", "Catalog name. NULL if not applicable.");
|
||||
|
||||
private final RowSet rowSet;
|
||||
protected final RowSet rowSet;
|
||||
|
||||
protected GetCatalogsOperation(HiveSession parentSession) {
|
||||
super(parentSession, OperationType.GET_CATALOGS);
|
||||
|
|
|
@ -36,7 +36,7 @@ public class GetCatalogsOperation extends MetadataOperation {
|
|||
private static final TableSchema RESULT_SET_SCHEMA = new TableSchema()
|
||||
.addStringColumn("TABLE_CAT", "Catalog name. NULL if not applicable.");
|
||||
|
||||
private final RowSet rowSet;
|
||||
protected final RowSet rowSet;
|
||||
|
||||
protected GetCatalogsOperation(HiveSession parentSession) {
|
||||
super(parentSession, OperationType.GET_CATALOGS);
|
||||
|
|
Loading…
Reference in a new issue