[SPARK-35321][SQL] Don't register Hive permanent functions when creating Hive client

### What changes were proposed in this pull request?

Instantiate a new Hive client through `Hive.getWithoutRegisterFns(conf, false)` instead of `Hive.get(conf)`, if `Hive` version is >= '2.3.9' (the built-in version).

### Why are the changes needed?

[HIVE-10319](https://issues.apache.org/jira/browse/HIVE-10319) introduced a new API `get_all_functions` which is only supported in Hive 1.3.0/2.0.0 and up. As result, when Spark 3.x talks to a HMS service of version 1.2 or lower, the following error will occur:
```
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.thrift.TApplicationException: Invalid method name: 'get_all_functions'
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3897)
        at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
        at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
        ... 96 more
Caused by: org.apache.thrift.TApplicationException: Invalid method name: 'get_all_functions'
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_all_functions(ThriftHiveMetastore.java:3845)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_all_functions(ThriftHiveMetastore.java:3833)
```

The `get_all_functions` is called only when `doRegisterAllFns` is set to true:
```java
  private Hive(HiveConf c, boolean doRegisterAllFns) throws HiveException {
    conf = c;
    if (doRegisterAllFns) {
      registerAllFunctionsOnce();
    }
  }
```

what this does is to register all Hive permanent functions defined in HMS in Hive's `FunctionRegistry` class, via iterating through results from `get_all_functions`. To Spark, this seems unnecessary as it loads Hive permanent (not built-in) UDF via directly calling the HMS API, i.e., `get_function`. The `FunctionRegistry` is only used in loading Hive's built-in function that is not supported by Spark. At this time, it only applies to `histogram_numeric`.

[HIVE-21563](https://issues.apache.org/jira/browse/HIVE-21563) introduced a new API `getWithoutRegisterFns` which skips the above registration and is available in Hive 2.3.9. Therefore, Spark should adopt it to avoid the cost.

### Does this PR introduce _any_ user-facing change?

Yes with this fix Spark now should be able to talk to HMS server with Hive 1.2.x and lower.

### How was this patch tested?

Manually started a HMS server of Hive version 1.2.2. Without the PR it failed with the above exception. With the PR the error disappeared and I can successfully perform common operations such as create table, create database, list tables, etc.

Closes #32887 from sunchao/SPARK-35321-new.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
This commit is contained in:
Chao Sun 2021-06-12 10:32:30 +08:00 committed by Yuming Wang
parent 703376e8a9
commit 9c7250fa73

View file

@ -57,11 +57,11 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, ShutdownHookManager, Utils}
import org.apache.spark.util.{CircularBuffer, ShutdownHookManager, Utils, VersionUtils}
/**
* A class that wraps the HiveClient and converts its responses to externally visible classes.
@ -219,6 +219,16 @@ private[hive] class HiveClientImpl(
hiveConf
}
private def getHive(conf: HiveConf): Hive = {
VersionUtils.majorMinorPatchVersion(version.fullVersion).map {
case (2, 3, v) if v >= 9 => Hive.getWithoutRegisterFns(conf)
case _ => Hive.get(conf)
}.getOrElse {
throw QueryExecutionErrors.unsupportedHiveMetastoreVersionError(
version.fullVersion, HiveUtils.HIVE_METASTORE_VERSION.key)
}
}
override val userName = UserGroupInformation.getCurrentUser.getShortUserName
override def getConf(key: String, defaultValue: String): String = {
@ -273,7 +283,7 @@ private[hive] class HiveClientImpl(
if (clientLoader.cachedHive != null) {
clientLoader.cachedHive.asInstanceOf[Hive]
} else {
val c = Hive.get(conf)
val c = getHive(conf)
clientLoader.cachedHive = c
c
}
@ -303,7 +313,7 @@ private[hive] class HiveClientImpl(
// with the side-effect of Hive.get(conf) to avoid using out-of-date HiveConf.
// See discussion in https://github.com/apache/spark/pull/16826/files#r104606859
// for more details.
Hive.get(conf)
getHive(conf)
// setCurrentSessionState will use the classLoader associated
// with the HiveConf in `state` to override the context class loader of the current
// thread.