From 37dc3f9ea724379d7d183caddc19b47bf9c7f4c2 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 16 Jul 2021 13:32:25 -0700 Subject: [PATCH] [SPARK-36128][SQL] Apply spark.sql.hive.metastorePartitionPruning for non-Hive tables that uses Hive metastore for partition management ### What changes were proposed in this pull request? In `CatalogFileIndex.filterPartitions`, check the config `spark.sql.hive.metastorePartitionPruning` and don't pushdown predicates to remote HMS if it is false. Instead, fallback to the `listPartitions` API and do the filtering on the client side. ### Why are the changes needed? Currently the config `spark.sql.hive.metastorePartitionPruning` is only effective for Hive tables, and for non-Hive tables we'd always use the `listPartitionsByFilter` API from HMS client. On the other hand, by default all data source tables also manage their partitions through HMS, when the config `spark.sql.hive.manageFilesourcePartitions` is turned on. Therefore, it seems reasonable to extend the above config for non-Hive tables as well. In certain cases the remote HMS service could throw exceptions when using the `listPartitionsByFilter` API, which, on the Spark side, is unrecoverable at the current state. Therefore it would be better to allow users to disable the API by using the above config. For instance, HMS only allow pushdown date column when direct SQL is used instead of JDO for interacting with the underlying RDBMS, and will throw exception otherwise. Even though the Spark Hive client will attempt to recover itself when the exception happens, it only does so when the config `hive.metastore.try.direct.sql` from remote HMS is `false`. There could be cases where the value of `hive.metastore.try.direct.sql` is true but remote HMS still throws exception. ### Does this PR introduce _any_ user-facing change? Yes now the config `spark.sql.hive.metastorePartitionPruning` is extended for non-Hive tables which use HMS to manage their partition metadata. ### How was this patch tested? Added a new unit test: ``` build/sbt "hive/testOnly *PruneFileSourcePartitionsSuite -- -z SPARK-36128" ``` Closes #33348 from sunchao/SPARK-36128-by-filter. Authored-by: Chao Sun Signed-off-by: Liang-Chi Hsieh --- .../catalog/ExternalCatalogUtils.scala | 14 ++++++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 7 +++---- .../datasources/CatalogFileIndex.scala | 6 +++--- .../execution/PruneHiveTablePartitions.scala | 19 ++----------------- .../PruneFileSourcePartitionsSuite.scala | 19 ++++++++++++++++++- 5 files changed, 40 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 72771e0aad..1cec7ed3a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, Predicate} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf object ExternalCatalogUtils { // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't @@ -132,6 +133,19 @@ object ExternalCatalogUtils { escapePathName(col) + "=" + partitionString } + def listPartitionsByFilter( + conf: SQLConf, + catalog: SessionCatalog, + table: CatalogTable, + partitionFilters: Seq[Expression]): Seq[CatalogTablePartition] = { + if (conf.metastorePartitionPruning) { + catalog.listPartitionsByFilter(table.identifier, partitionFilters) + } else { + ExternalCatalogUtils.prunePartitionsByFilter(table, catalog.listPartitions(table.identifier), + partitionFilters, conf.sessionLocalTimeZone) + } + } + def prunePartitionsByFilter( catalogTable: CatalogTable, inputPartitions: Seq[CatalogTablePartition], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a1c344a25d..b9663bb380 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -979,9 +979,7 @@ object SQLConf { val HIVE_METASTORE_PARTITION_PRUNING = buildConf("spark.sql.hive.metastorePartitionPruning") .doc("When true, some predicates will be pushed down into the Hive metastore so that " + - "unmatching partitions can be eliminated earlier. This only affects Hive tables " + - "not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " + - "HiveUtils.CONVERT_METASTORE_ORC for more information).") + "unmatching partitions can be eliminated earlier.") .version("1.5.0") .booleanConf .createWithDefault(true) @@ -1005,7 +1003,8 @@ object SQLConf { .doc("When true, enable metastore partition management for file source tables as well. " + "This includes both datasource and converted Hive tables. When partition management " + "is enabled, datasource tables store partition in the Hive metastore, and use the " + - "metastore to prune partitions during query planning.") + s"metastore to prune partitions during query planning when " + + s"$HIVE_METASTORE_PARTITION_PRUNING is set to true.") .version("2.1.1") .booleanConf .createWithDefault(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 91313f33a7..727b33018f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.StructType @@ -70,8 +70,8 @@ class CatalogFileIndex( def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = { if (table.partitionColumnNames.nonEmpty) { val startTime = System.nanoTime() - val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( - table.identifier, filters) + val selectedPartitions = ExternalCatalogUtils.listPartitionsByFilter( + sparkSession.sessionState.conf, sparkSession.sessionState.catalog, table, filters) val partitions = selectedPartitions.map { p => val path = new Path(p.location) val fs = path.getFileSystem(hadoopConf) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index a831e8b898..1bd47d7d7a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -56,22 +56,6 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) normalizedFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionColumnSet))) } - /** - * Prune the hive table using filters on the partitions of the table. - */ - private def prunePartitions( - relation: HiveTableRelation, - partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { - if (conf.metastorePartitionPruning) { - session.sessionState.catalog.listPartitionsByFilter( - relation.tableMeta.identifier, partitionFilters.toSeq) - } else { - ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, - session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), - partitionFilters.toSeq, conf.sessionLocalTimeZone) - } - } - /** * Update the statistics of the table. */ @@ -111,7 +95,8 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => val partitionKeyFilters = getPartitionKeyFilters(filters, relation) if (partitionKeyFilters.nonEmpty) { - val newPartitions = prunePartitions(relation, partitionKeyFilters) + val newPartitions = ExternalCatalogUtils.listPartitionsByFilter(conf, + session.sessionState.catalog, relation.tableMeta, partitionKeyFilters.toSeq) val newTableMeta = updateTableMeta(relation, newPartitions, partitionKeyFilters) val newRelation = relation.copy( tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index a16545a742..a669b803f0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution import org.scalatest.matchers.should.Matchers._ +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -115,7 +117,7 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase { withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, "")) { withTempPath { dir => spark.range(10).selectExpr("id", "id % 3 as p") - .write.partitionBy("p").parquet(dir.getCanonicalPath) + .write.partitionBy("p").parquet(dir.getCanonicalPath) withTempView("tmp") { spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); assertPrunedPartitions("SELECT COUNT(*) FROM tmp WHERE p = 0", 1, "(tmp.p = 0)") @@ -125,6 +127,21 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase { } } + test("SPARK-36128: spark.sql.hive.metastorePartitionPruning should work for file data sources") { + Seq(true, false).foreach { enablePruning => + withTable("tbl") { + withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> enablePruning.toString) { + spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl") + HiveCatalogMetrics.reset() + QueryTest.checkAnswer(sql("SELECT id FROM tbl WHERE p = 1"), + Seq(1, 4, 7).map(Row.apply(_)), checkToRDD = false) // avoid analyzing the query twice + val expectedCount = if (enablePruning) 1 else 3 + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount == expectedCount) + } + } + } + } + override def getScanExecPartitionSize(plan: SparkPlan): Long = { plan.collectFirst { case p: FileSourceScanExec => p.selectedPartitions.length