From dcacfc5da66e6fb3417f32534fe56fdff32764f0 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 26 May 2019 08:24:41 -0700 Subject: [PATCH] [SPARK-27074][SQL][test-hadoop3.2][test-maven] Hive 3.1 metastore support HiveClientImpl.runHive ## What changes were proposed in this pull request? Hive 3.1.1's `CommandProcessor` have 2 changes: 1. [HIVE-17626](https://issues.apache.org/jira/browse/HIVE-17626)(Hive 3.0.0) add ReExecDriver. So the current code path is: https://github.com/apache/spark/blob/02bbe977abaf7006b845a7e99d612b0235aa0025/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L736-L742 We can disable `hive.query.reexecution.enabled` to workaround this change. 2. [HIVE-18238](http://issues.apache.org/jira/browse/HIVE-18238)(Hive 3.0.0) changed the `Driver.close()` function return type. We can workaround it by ` driver.getClass.getMethod("close").invoke(driver)` So Hive 3.1 metastore could support `HiveClientImpl.runHive` after this pr. ## How was this patch tested? unit tests Closes #23992 from wangyum/SPARK-27074. Authored-by: Yuming Wang Signed-off-by: Dongjoon Hyun --- .../spark/sql/hive/client/HiveClientImpl.scala | 16 ++++++++++++---- .../spark/sql/hive/client/VersionsSuite.scala | 13 +++++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index eaca03c608..2f8ac825fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -735,8 +735,18 @@ private[hive] class HiveClientImpl( * in the sequence is one row. * Since upgrading the built-in Hive to 2.3, hive-llap-client is needed when * running MapReduce jobs with `runHive`. + * Since HIVE-17626(Hive 3.0.0), need to set hive.query.reexecution.enabled=false. */ protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState { + def closeDriver(driver: Driver): Unit = { + // Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed + // and the CommandProcessorFactory.clean function removed. + driver.getClass.getMethod("close").invoke(driver) + if (version != hive.v3_1) { + CommandProcessorFactory.clean(conf) + } + } + logDebug(s"Running hiveql '$cmd'") if (cmd.toLowerCase(Locale.ROOT).startsWith("set")) { logDebug(s"Changing config: $cmd") } try { @@ -750,15 +760,13 @@ private[hive] class HiveClientImpl( val response: CommandProcessorResponse = driver.run(cmd) // Throw an exception if there is an error in query processing. if (response.getResponseCode != 0) { - driver.close() - CommandProcessorFactory.clean(conf) + closeDriver(driver) throw new QueryExecutionException(response.getErrorMessage) } driver.setMaxRows(maxRows) val results = shim.getDriverResults(driver) - driver.close() - CommandProcessorFactory.clean(conf) + closeDriver(driver) results case _ => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 8f2365c436..328457948c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -123,9 +123,11 @@ class VersionsSuite extends SparkFunSuite with Logging { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } - // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. if (version == "3.1") { + // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. hadoopConf.set("hive.in.test", "true") + // Since HIVE-17626(Hive 3.0.0), need to set hive.query.reexecution.enabled=false. + hadoopConf.set("hive.query.reexecution.enabled", "false") } client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf)) if (versionSpark != null) versionSpark.reset() @@ -584,10 +586,13 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: sql read hive materialized view") { // HIVE-14249 Since Hive 2.3.0, materialized view is supported. - // But skip Hive 3.1 because of SPARK-27074. - if (version == "2.3") { + if (version == "2.3" || version == "3.1") { + // Since HIVE-14498(Hive 3.0), Automatic rewriting for materialized view cannot be enabled + // if the materialized view uses non-transactional tables. + val disableRewrite = if (version == "2.3") "" else "DISABLE REWRITE" client.runSqlHive("CREATE TABLE materialized_view_tbl (c1 INT)") - client.runSqlHive("CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM materialized_view_tbl") + client.runSqlHive( + s"CREATE MATERIALIZED VIEW mv1 $disableRewrite AS SELECT * FROM materialized_view_tbl") val e = intercept[AnalysisException](versionSpark.table("mv1").collect()).getMessage assert(e.contains("Hive materialized view is not supported")) }