[SPARK-34060][SQL] Fix Hive table caching while updating stats by ALTER TABLE .. DROP PARTITION

### What changes were proposed in this pull request?
Fix canonicalisation of `HiveTableRelation` by normalisation of `CatalogTable`, and exclude table stats and temporary fields from the canonicalized plan.

### Why are the changes needed?
This fixes the issue demonstrated by the example below:
```scala
scala> spark.conf.set("spark.sql.statistics.size.autoUpdate.enabled", true)
scala> sql(s"CREATE TABLE tbl (id int, part int) USING hive PARTITIONED BY (part)")
scala> sql("INSERT INTO tbl PARTITION (part=0) SELECT 0")
scala> sql("INSERT INTO tbl PARTITION (part=1) SELECT 1")
scala> sql("CACHE TABLE tbl")
scala> sql("SELECT * FROM tbl").show(false)
+---+----+
|id |part|
+---+----+
|0  |0   |
|1  |1   |
+---+----+

scala> spark.catalog.isCached("tbl")
scala> sql("ALTER TABLE tbl DROP PARTITION (part=0)")
scala> spark.catalog.isCached("tbl")
res19: Boolean = false
```
`ALTER TABLE .. DROP PARTITION` must keep the table in the cache.

### Does this PR introduce _any_ user-facing change?
Yes. After the changes, the drop partition command keeps the table in the cache while updating table stats:
```scala
scala> sql("ALTER TABLE tbl DROP PARTITION (part=0)")
scala> spark.catalog.isCached("tbl")
res19: Boolean = true
```

### How was this patch tested?
By running new UT in `AlterTableDropPartitionSuite`.

Closes #31112 from MaxGekk/fix-caching-hive-table-2.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Max Gekk 2021-01-11 07:03:44 +00:00 committed by Wenchen Fan
parent 664ef184c1
commit d97e99157e
3 changed files with 58 additions and 28 deletions

View file

@ -510,6 +510,30 @@ object CatalogTable {
propKey == originalKey || propKey == s"$originalKey.numParts" || propKey == originalKey || propKey == s"$originalKey.numParts" ||
propKey.startsWith(s"$originalKey.part.") propKey.startsWith(s"$originalKey.part.")
} }
def normalize(table: CatalogTable): CatalogTable = {
val nondeterministicProps = Set(
"CreateTime",
"transient_lastDdlTime",
"grantTime",
"lastUpdateTime",
"last_modified_by",
"last_modified_time",
"Owner:",
// The following are hive specific schema parameters which we do not need to match exactly.
"totalNumberFiles",
"maxFileSize",
"minFileSize"
)
table.copy(
createTime = 0L,
lastAccessTime = 0L,
properties = table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap,
stats = None,
ignoredProperties = Map.empty
)
}
} }
/** /**
@ -781,10 +805,7 @@ case class HiveTableRelation(
def isPartitioned: Boolean = partitionCols.nonEmpty def isPartitioned: Boolean = partitionCols.nonEmpty
override def doCanonicalize(): HiveTableRelation = copy( override def doCanonicalize(): HiveTableRelation = copy(
tableMeta = tableMeta.copy( tableMeta = CatalogTable.normalize(tableMeta),
storage = CatalogStorageFormat.empty,
createTime = -1
),
dataCols = dataCols.zipWithIndex.map { dataCols = dataCols.zipWithIndex.map {
case (attr, index) => attr.withExprId(ExprId(index)) case (attr, index) => attr.withExprId(ExprId(index))
}, },

View file

@ -223,29 +223,6 @@ abstract class ShowCreateTableSuite extends QueryTest with SQLTestUtils {
} }
protected def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = { protected def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
def normalize(table: CatalogTable): CatalogTable = { assert(CatalogTable.normalize(actual) == CatalogTable.normalize(expected))
val nondeterministicProps = Set(
"CreateTime",
"transient_lastDdlTime",
"grantTime",
"lastUpdateTime",
"last_modified_by",
"last_modified_time",
"Owner:",
// The following are hive specific schema parameters which we do not need to match exactly.
"totalNumberFiles",
"maxFileSize",
"minFileSize"
)
table.copy(
createTime = 0L,
lastAccessTime = 0L,
properties = table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap,
stats = None,
ignoredProperties = Map.empty
)
}
assert(normalize(actual) == normalize(expected))
} }
} }

View file

@ -17,7 +17,9 @@
package org.apache.spark.sql.hive.execution.command package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.command.v1 import org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.internal.SQLConf
/** /**
* The class contains tests for the `ALTER TABLE .. DROP PARTITION` command to check * The class contains tests for the `ALTER TABLE .. DROP PARTITION` command to check
@ -42,4 +44,34 @@ class AlterTableDropPartitionSuite
} }
} }
} }
test("SPARK-34060: update stats of cached table") {
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") {
withNamespaceAndTable("ns", "tbl") { t =>
def checkTableSize(expected: String): Unit = {
val stats =
sql(s"DESCRIBE TABLE EXTENDED $t")
.select("data_type")
.where("col_name = 'Statistics'")
.first()
.getString(0)
assert(stats.contains(expected))
}
sql(s"CREATE TABLE $t (id int, part int) $defaultUsing PARTITIONED BY (part)")
sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0")
sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1")
assert(!spark.catalog.isCached(t))
sql(s"CACHE TABLE $t")
assert(spark.catalog.isCached(t))
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(0, 0), Row(1, 1)))
checkTableSize("4 bytes")
sql(s"ALTER TABLE $t DROP PARTITION (part=0)")
assert(spark.catalog.isCached(t))
checkTableSize("2 bytes")
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 1)))
}
}
}
} }