From a2460be9c30b67b9159fe339d115b84d53cc288a Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Mon, 22 May 2017 17:28:30 -0700 Subject: [PATCH] [SPARK-17410][SPARK-17284] Move Hive-generated Stats Info to HiveClientImpl ### What changes were proposed in this pull request? After we adding a new field `stats` into `CatalogTable`, we should not expose Hive-specific Stats metadata to `MetastoreRelation`. It complicates all the related codes. It also introduces a bug in `SHOW CREATE TABLE`. The statistics-related table properties should be skipped by `SHOW CREATE TABLE`, since it could be incorrect in the newly created table. See the Hive JIRA: https://issues.apache.org/jira/browse/HIVE-13792 Also fix the issue to fill Hive-generated RowCounts to our stats. This PR is to handle Hive-specific Stats metadata in `HiveClientImpl`. ### How was this patch tested? Added a few test cases. Author: Xiao Li Closes #14971 from gatorsmile/showCreateTableNew. --- .../sql/catalyst/catalog/interface.scala | 5 +- .../sql/catalyst/trees/TreeNodeSuite.scala | 3 +- .../spark/sql/hive/HiveStrategies.scala | 15 +- .../sql/hive/client/HiveClientImpl.scala | 67 +++++- .../sql/hive/test/TestHiveSingleton.scala | 6 +- .../spark/sql/hive/HiveDDLCommandSuite.scala | 1 - ...nalCatalogBackwardCompatibilitySuite.scala | 4 - .../sql/hive/MetastoreDataSourcesSuite.scala | 5 - .../spark/sql/hive/ShowCreateTableSuite.scala | 14 +- .../spark/sql/hive/StatisticsSuite.scala | 215 +++++++++++++++++- .../spark/sql/hive/client/VersionsSuite.scala | 2 +- .../hive/execution/HiveComparisonTest.scala | 5 - .../sql/hive/execution/HiveDDLSuite.scala | 5 - .../sql/hive/execution/HiveSQLViewSuite.scala | 2 - .../spark/sql/hive/orc/OrcSourceSuite.scala | 1 - 15 files changed, 291 insertions(+), 59 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index cc0cbba275..2f328ccc49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -203,6 +203,8 @@ case class BucketSpec( * sensitive schema was unable to be read from the table properties. * Used to trigger case-sensitive schema inference at query time, when * configured. + * @param ignoredProperties is a list of table properties that are used by the underlying table + * but ignored by Spark SQL yet. */ case class CatalogTable( identifier: TableIdentifier, @@ -221,7 +223,8 @@ case class CatalogTable( comment: Option[String] = None, unsupportedFeatures: Seq[String] = Seq.empty, tracksPartitionsInCatalog: Boolean = false, - schemaPreservesCase: Boolean = true) { + schemaPreservesCase: Boolean = true, + ignoredProperties: Map[String, String] = Map.empty) { import CatalogTable._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 37e3dfabd0..712841835a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -492,7 +492,8 @@ class TreeNodeSuite extends SparkFunSuite { "tracksPartitionsInCatalog" -> false, "properties" -> JNull, "unsupportedFeatures" -> List.empty[String], - "schemaPreservesCase" -> JBool(true))) + "schemaPreservesCase" -> JBool(true), + "ignoredProperties" -> JNull)) // For unknown case class, returns JNull. val bigValue = new Array[Int](10000) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 4f090d545c..9c60d22d35 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -119,20 +119,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => val table = relation.tableMeta - // TODO: check if this estimate is valid for tables after partition pruning. - // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. - // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys - // (see StatsSetupConst in Hive) that we can look at in the future. - // When table is external,`totalSize` is always zero, which will influence join strategy - // so when `totalSize` is zero, use `rawDataSize` instead. - val totalSize = table.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) - val rawDataSize = table.properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) - val sizeInBytes = if (totalSize.isDefined && totalSize.get > 0) { - totalSize.get - } else if (rawDataSize.isDefined && rawDataSize.get > 0) { - rawDataSize.get - } else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { + val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { try { val hadoopConf = session.sessionState.newHadoopConf() val tablePath = new Path(table.location) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 04f2751e79..b970be740a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order} @@ -414,6 +415,47 @@ private[hive] class HiveClientImpl( val properties = Option(h.getParameters).map(_.asScala.toMap).orNull + // Hive-generated Statistics are also recorded in ignoredProperties + val ignoredProperties = scala.collection.mutable.Map.empty[String, String] + for (key <- HiveStatisticsProperties; value <- properties.get(key)) { + ignoredProperties += key -> value + } + + val excludedTableProperties = HiveStatisticsProperties ++ Set( + // The property value of "comment" is moved to the dedicated field "comment" + "comment", + // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added + // in the function toHiveTable. + "EXTERNAL" + ) + + val filteredProperties = properties.filterNot { + case (key, _) => excludedTableProperties.contains(key) + } + val comment = properties.get("comment") + + val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_)) + val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_)) + val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)).filter(_ >= 0) + // TODO: check if this estimate is valid for tables after partition pruning. + // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be + // relatively cheap if parameters for the table are populated into the metastore. + // Currently, only totalSize, rawDataSize, and rowCount are used to build the field `stats` + // TODO: stats should include all the other two fields (`numFiles` and `numPartitions`). + // (see StatsSetupConst in Hive) + val stats = + // When table is external, `totalSize` is always zero, which will influence join strategy + // so when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero, + // return None. Later, we will use the other ways to estimate the statistics. + if (totalSize.isDefined && totalSize.get > 0L) { + Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount)) + } else if (rawDataSize.isDefined && rawDataSize.get > 0) { + Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount)) + } else { + // TODO: still fill the rowCount even if sizeInBytes is empty. Might break anything? + None + } + CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { @@ -451,13 +493,15 @@ private[hive] class HiveClientImpl( ), // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added // in the function toHiveTable. - properties = properties.filter(kv => kv._1 != "comment" && kv._1 != "EXTERNAL"), - comment = properties.get("comment"), + properties = filteredProperties, + stats = stats, + comment = comment, // In older versions of Spark(before 2.2.0), we expand the view original text and store // that into `viewExpandedText`, and that should be used in view resolution. So we get // `viewExpandedText` instead of `viewOriginalText` for viewText here. viewText = Option(h.getViewExpandedText), - unsupportedFeatures = unsupportedFeatures) + unsupportedFeatures = unsupportedFeatures, + ignoredProperties = ignoredProperties.toMap) } } @@ -474,7 +518,12 @@ private[hive] class HiveClientImpl( } override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState { - val hiveTable = toHiveTable(table, Some(userName)) + // getTableOption removes all the Hive-specific properties. Here, we fill them back to ensure + // these properties are still available to the others that share the same Hive metastore. + // If users explicitly alter these Hive-specific properties through ALTER TABLE DDL, we respect + // these user-specified values. + val hiveTable = toHiveTable( + table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName)) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"${table.database}.$tableName" shim.alterTable(client, qualifiedTableName, hiveTable) @@ -956,4 +1005,14 @@ private[hive] object HiveClientImpl { parameters = if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty) } + + // Below is the key of table properties for storing Hive-generated statistics + private val HiveStatisticsProperties = Set( + StatsSetupConst.COLUMN_STATS_ACCURATE, + StatsSetupConst.NUM_FILES, + StatsSetupConst.NUM_PARTITIONS, + StatsSetupConst.ROW_COUNT, + StatsSetupConst.RAW_DATA_SIZE, + StatsSetupConst.TOTAL_SIZE + ) } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala index 9bf84ab1fb..df7988f542 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala @@ -19,13 +19,17 @@ package org.apache.spark.sql.hive.test import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.SparkSession import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.client.HiveClient trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll { protected val spark: SparkSession = TestHive.sparkSession protected val hiveContext: TestHiveContext = TestHive + protected val hiveClient: HiveClient = + spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client protected override def afterAll(): Unit = { try { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 7584f1741c..d97b11e447 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -646,7 +646,6 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle } test("SPARK-15887: hive-site.xml should be loaded") { - val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client assert(hiveClient.getConf("hive.in.test", "") == "true") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala index 705d43f1f3..3bd3d0d6db 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala @@ -35,10 +35,6 @@ import org.apache.spark.util.Utils class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { - // To test `HiveExternalCatalog`, we need to read/write the raw table meta from/to hive client. - val hiveClient: HiveClient = - spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - val tempDir = Utils.createTempDir().getCanonicalFile val tempDirUri = tempDir.toURI val tempDirStr = tempDir.getAbsolutePath diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index b554694815..c785aca985 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -52,11 +52,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile } - // To test `HiveExternalCatalog`, we need to read the raw table metadata(schema, partition - // columns and bucket specification are still in table properties) from hive client. - private def hiveClient: HiveClient = - sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - test("persistent JSON table") { withTable("jsonTable") { sql( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala index 081153df8e..fad81c7e94 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -325,26 +325,20 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing "last_modified_by", "last_modified_time", "Owner:", - "COLUMN_STATS_ACCURATE", // The following are hive specific schema parameters which we do not need to match exactly. - "numFiles", - "numRows", - "rawDataSize", - "totalSize", "totalNumberFiles", "maxFileSize", - "minFileSize", - // EXTERNAL is not non-deterministic, but it is filtered out for external tables. - "EXTERNAL" + "minFileSize" ) table.copy( createTime = 0L, lastAccessTime = 0L, - properties = table.properties.filterKeys(!nondeterministicProps.contains(_)) + properties = table.properties.filterKeys(!nondeterministicProps.contains(_)), + stats = None, + ignoredProperties = Map.empty ) } - assert(normalize(actual) == normalize(expected)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 3191b9975f..5d52f8baa3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -19,11 +19,14 @@ package org.apache.spark.sql.hive import java.io.{File, PrintWriter} +import org.apache.hadoop.hive.common.StatsSetupConst import scala.reflect.ClassTag +import scala.util.matching.Regex import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} +import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.joins._ @@ -61,7 +64,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val relation = spark.table("csv_table").queryExecution.analyzed.children.head .asInstanceOf[CatalogRelation] - val properties = relation.tableMeta.properties + val properties = relation.tableMeta.ignoredProperties assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0") assert(properties("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0") @@ -175,7 +178,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") checkTableStats( textTable, - hasSizeInBytes = false, + hasSizeInBytes = true, expectedRowCounts = None) // noscan won't count the number of rows @@ -215,6 +218,210 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + private def createNonPartitionedTable( + tabName: String, + analyzedBySpark: Boolean = true, + analyzedByHive: Boolean = true): Unit = { + sql( + s""" + |CREATE TABLE $tabName (key STRING, value STRING) + |STORED AS TEXTFILE + |TBLPROPERTIES ('prop1' = 'val1', 'prop2' = 'val2') + """.stripMargin) + sql(s"INSERT INTO TABLE $tabName SELECT * FROM src") + if (analyzedBySpark) sql(s"ANALYZE TABLE $tabName COMPUTE STATISTICS") + // This is to mimic the scenario in which Hive genrates statistics before we reading it + if (analyzedByHive) hiveClient.runSqlHive(s"ANALYZE TABLE $tabName COMPUTE STATISTICS") + val describeResult1 = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName") + + val tableMetadata = + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)).properties + // statistics info is not contained in the metadata of the original table + assert(Seq(StatsSetupConst.COLUMN_STATS_ACCURATE, + StatsSetupConst.NUM_FILES, + StatsSetupConst.NUM_PARTITIONS, + StatsSetupConst.ROW_COUNT, + StatsSetupConst.RAW_DATA_SIZE, + StatsSetupConst.TOTAL_SIZE).forall(!tableMetadata.contains(_))) + + if (analyzedByHive) { + assert(StringUtils.filterPattern(describeResult1, "*numRows\\s+500*").nonEmpty) + } else { + assert(StringUtils.filterPattern(describeResult1, "*numRows\\s+500*").isEmpty) + } + } + + private def extractStatsPropValues( + descOutput: Seq[String], + propKey: String): Option[BigInt] = { + val str = descOutput + .filterNot(_.contains(HiveExternalCatalog.STATISTICS_PREFIX)) + .filter(_.contains(propKey)) + if (str.isEmpty) { + None + } else { + assert(str.length == 1, "found more than one matches") + val pattern = new Regex(s"""$propKey\\s+(-?\\d+)""") + val pattern(value) = str.head.trim + Option(BigInt(value)) + } + } + + test("get statistics when not analyzed in both Hive and Spark") { + val tabName = "tab1" + withTable(tabName) { + createNonPartitionedTable(tabName, analyzedByHive = false, analyzedBySpark = false) + checkTableStats( + tabName, hasSizeInBytes = true, expectedRowCounts = None) + + // ALTER TABLE SET TBLPROPERTIES invalidates some contents of Hive specific statistics + // This is triggered by the Hive alterTable API + val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName") + + val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize") + val numRows = extractStatsPropValues(describeResult, "numRows") + val totalSize = extractStatsPropValues(describeResult, "totalSize") + assert(rawDataSize.isEmpty, "rawDataSize should not be shown without table analysis") + assert(numRows.isEmpty, "numRows should not be shown without table analysis") + assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost") + } + } + + test("alter table rename after analyze table") { + Seq(true, false).foreach { analyzedBySpark => + val oldName = "tab1" + val newName = "tab2" + withTable(oldName, newName) { + createNonPartitionedTable(oldName, analyzedByHive = true, analyzedBySpark = analyzedBySpark) + val fetchedStats1 = checkTableStats( + oldName, hasSizeInBytes = true, expectedRowCounts = Some(500)) + sql(s"ALTER TABLE $oldName RENAME TO $newName") + val fetchedStats2 = checkTableStats( + newName, hasSizeInBytes = true, expectedRowCounts = Some(500)) + assert(fetchedStats1 == fetchedStats2) + + // ALTER TABLE RENAME does not affect the contents of Hive specific statistics + val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $newName") + + val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize") + val numRows = extractStatsPropValues(describeResult, "numRows") + val totalSize = extractStatsPropValues(describeResult, "totalSize") + assert(rawDataSize.isDefined && rawDataSize.get > 0, "rawDataSize is lost") + assert(numRows.isDefined && numRows.get == 500, "numRows is lost") + assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost") + } + } + } + + test("alter table SET TBLPROPERTIES after analyze table") { + Seq(true, false).foreach { analyzedBySpark => + val tabName = "tab1" + withTable(tabName) { + createNonPartitionedTable(tabName, analyzedByHive = true, analyzedBySpark = analyzedBySpark) + val fetchedStats1 = checkTableStats( + tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) + sql(s"ALTER TABLE $tabName SET TBLPROPERTIES ('foo' = 'a')") + val fetchedStats2 = checkTableStats( + tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) + assert(fetchedStats1 == fetchedStats2) + + val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName") + + val totalSize = extractStatsPropValues(describeResult, "totalSize") + assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost") + + // ALTER TABLE SET TBLPROPERTIES invalidates some Hive specific statistics + // This is triggered by the Hive alterTable API + val numRows = extractStatsPropValues(describeResult, "numRows") + assert(numRows.isDefined && numRows.get == -1, "numRows is lost") + val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize") + assert(rawDataSize.isDefined && rawDataSize.get == -1, "rawDataSize is lost") + } + } + } + + test("alter table UNSET TBLPROPERTIES after analyze table") { + Seq(true, false).foreach { analyzedBySpark => + val tabName = "tab1" + withTable(tabName) { + createNonPartitionedTable(tabName, analyzedByHive = true, analyzedBySpark = analyzedBySpark) + val fetchedStats1 = checkTableStats( + tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) + sql(s"ALTER TABLE $tabName UNSET TBLPROPERTIES ('prop1')") + val fetchedStats2 = checkTableStats( + tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) + assert(fetchedStats1 == fetchedStats2) + + val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName") + + val totalSize = extractStatsPropValues(describeResult, "totalSize") + assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost") + + // ALTER TABLE UNSET TBLPROPERTIES invalidates some Hive specific statistics + // This is triggered by the Hive alterTable API + val numRows = extractStatsPropValues(describeResult, "numRows") + assert(numRows.isDefined && numRows.get == -1, "numRows is lost") + val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize") + assert(rawDataSize.isDefined && rawDataSize.get == -1, "rawDataSize is lost") + } + } + } + + test("add/drop partitions - managed table") { + val catalog = spark.sessionState.catalog + val managedTable = "partitionedTable" + withTable(managedTable) { + sql( + s""" + |CREATE TABLE $managedTable (key INT, value STRING) + |PARTITIONED BY (ds STRING, hr STRING) + """.stripMargin) + + for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) { + sql( + s""" + |INSERT OVERWRITE TABLE $managedTable + |partition (ds='$ds',hr='$hr') + |SELECT 1, 'a' + """.stripMargin) + } + + checkTableStats( + managedTable, hasSizeInBytes = false, expectedRowCounts = None) + + sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS") + + val stats1 = checkTableStats( + managedTable, hasSizeInBytes = true, expectedRowCounts = Some(4)) + + sql( + s""" + |ALTER TABLE $managedTable DROP PARTITION (ds='2008-04-08'), + |PARTITION (hr='12') + """.stripMargin) + assert(catalog.listPartitions(TableIdentifier(managedTable)).map(_.spec).toSet == + Set(Map("ds" -> "2008-04-09", "hr" -> "11"))) + + val stats2 = checkTableStats( + managedTable, hasSizeInBytes = true, expectedRowCounts = Some(4)) + assert(stats1 == stats2) + + sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS") + + val stats3 = checkTableStats( + managedTable, hasSizeInBytes = true, expectedRowCounts = Some(1)) + assert(stats2.get.sizeInBytes > stats3.get.sizeInBytes) + + sql(s"ALTER TABLE $managedTable ADD PARTITION (ds='2008-04-08', hr='12')") + sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS") + val stats4 = checkTableStats( + managedTable, hasSizeInBytes = true, expectedRowCounts = Some(1)) + + assert(stats2.get.sizeInBytes > stats4.get.sizeInBytes) + assert(stats4.get.sizeInBytes == stats3.get.sizeInBytes) + } + } + test("test statistics of LogicalRelation converted from Hive serde tables") { val parquetTable = "parquetTable" val orcTable = "orcTable" @@ -232,7 +439,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(500)) } withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") { - checkTableStats(orcTable, hasSizeInBytes = false, expectedRowCounts = None) + // We still can get tableSize from Hive before Analyze + checkTableStats(orcTable, hasSizeInBytes = true, expectedRowCounts = None) sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS") checkTableStats(orcTable, hasSizeInBytes = true, expectedRowCounts = Some(500)) } @@ -254,7 +462,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", ")) // Validate statistics - val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client val table = hiveClient.getTable("default", tableName) val props = table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 7aff49c0fc..f109843f5b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -576,7 +576,7 @@ class VersionsSuite extends SparkFunSuite with Logging { versionSpark.sql("CREATE TABLE tbl AS SELECT 1 AS a") assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1))) val tableMeta = versionSpark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl")) - val totalSize = tableMeta.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + val totalSize = tableMeta.stats.map(_.sizeInBytes) // Except 0.12, all the following versions will fill the Hive-generated statistics if (version == "0.12") { assert(totalSize.isEmpty) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index abe5d83571..98aa92a9bb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -192,12 +192,7 @@ abstract class HiveComparisonTest "last_modified_by", "last_modified_time", "Owner:", - "COLUMN_STATS_ACCURATE", // The following are hive specific schema parameters which we do not need to match exactly. - "numFiles", - "numRows", - "rawDataSize", - "totalSize", "totalNumberFiles", "maxFileSize", "minFileSize" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 13f5c5dd8e..9a682260e2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1199,11 +1199,6 @@ class HiveDDLSuite "last_modified_by", "last_modified_time", "Owner:", - "COLUMN_STATS_ACCURATE", - "numFiles", - "numRows", - "rawDataSize", - "totalSize", "totalNumberFiles", "maxFileSize", "minFileSize" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala index 5afb37b382..97e4c2b6b2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala @@ -28,8 +28,6 @@ import org.apache.spark.sql.types.StructType * A test suite for Hive view related functionality. */ class HiveSQLViewSuite extends SQLViewSuite with TestHiveSingleton { - protected override val spark: SparkSession = TestHive.sparkSession - import testImplicits._ test("create a permanent/temp view using a hive, built-in, and permanent user function") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 6bfb88c0c1..52fa401d32 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -153,7 +153,6 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA } test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { - val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client val location = Utils.createTempDir() val uri = location.toURI try {