[SPARK-17410][SPARK-17284] Move Hive-generated Stats Info to HiveClientImpl
### What changes were proposed in this pull request? After we adding a new field `stats` into `CatalogTable`, we should not expose Hive-specific Stats metadata to `MetastoreRelation`. It complicates all the related codes. It also introduces a bug in `SHOW CREATE TABLE`. The statistics-related table properties should be skipped by `SHOW CREATE TABLE`, since it could be incorrect in the newly created table. See the Hive JIRA: https://issues.apache.org/jira/browse/HIVE-13792 Also fix the issue to fill Hive-generated RowCounts to our stats. This PR is to handle Hive-specific Stats metadata in `HiveClientImpl`. ### How was this patch tested? Added a few test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #14971 from gatorsmile/showCreateTableNew.
This commit is contained in:
parent
9b09101938
commit
a2460be9c3
|
@ -203,6 +203,8 @@ case class BucketSpec(
|
||||||
* sensitive schema was unable to be read from the table properties.
|
* sensitive schema was unable to be read from the table properties.
|
||||||
* Used to trigger case-sensitive schema inference at query time, when
|
* Used to trigger case-sensitive schema inference at query time, when
|
||||||
* configured.
|
* configured.
|
||||||
|
* @param ignoredProperties is a list of table properties that are used by the underlying table
|
||||||
|
* but ignored by Spark SQL yet.
|
||||||
*/
|
*/
|
||||||
case class CatalogTable(
|
case class CatalogTable(
|
||||||
identifier: TableIdentifier,
|
identifier: TableIdentifier,
|
||||||
|
@ -221,7 +223,8 @@ case class CatalogTable(
|
||||||
comment: Option[String] = None,
|
comment: Option[String] = None,
|
||||||
unsupportedFeatures: Seq[String] = Seq.empty,
|
unsupportedFeatures: Seq[String] = Seq.empty,
|
||||||
tracksPartitionsInCatalog: Boolean = false,
|
tracksPartitionsInCatalog: Boolean = false,
|
||||||
schemaPreservesCase: Boolean = true) {
|
schemaPreservesCase: Boolean = true,
|
||||||
|
ignoredProperties: Map[String, String] = Map.empty) {
|
||||||
|
|
||||||
import CatalogTable._
|
import CatalogTable._
|
||||||
|
|
||||||
|
|
|
@ -492,7 +492,8 @@ class TreeNodeSuite extends SparkFunSuite {
|
||||||
"tracksPartitionsInCatalog" -> false,
|
"tracksPartitionsInCatalog" -> false,
|
||||||
"properties" -> JNull,
|
"properties" -> JNull,
|
||||||
"unsupportedFeatures" -> List.empty[String],
|
"unsupportedFeatures" -> List.empty[String],
|
||||||
"schemaPreservesCase" -> JBool(true)))
|
"schemaPreservesCase" -> JBool(true),
|
||||||
|
"ignoredProperties" -> JNull))
|
||||||
|
|
||||||
// For unknown case class, returns JNull.
|
// For unknown case class, returns JNull.
|
||||||
val bigValue = new Array[Int](10000)
|
val bigValue = new Array[Int](10000)
|
||||||
|
|
|
@ -119,20 +119,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
|
||||||
case relation: CatalogRelation
|
case relation: CatalogRelation
|
||||||
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
|
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
|
||||||
val table = relation.tableMeta
|
val table = relation.tableMeta
|
||||||
// TODO: check if this estimate is valid for tables after partition pruning.
|
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
|
||||||
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
|
|
||||||
// relatively cheap if parameters for the table are populated into the metastore.
|
|
||||||
// Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
|
|
||||||
// (see StatsSetupConst in Hive) that we can look at in the future.
|
|
||||||
// When table is external,`totalSize` is always zero, which will influence join strategy
|
|
||||||
// so when `totalSize` is zero, use `rawDataSize` instead.
|
|
||||||
val totalSize = table.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
|
|
||||||
val rawDataSize = table.properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
|
|
||||||
val sizeInBytes = if (totalSize.isDefined && totalSize.get > 0) {
|
|
||||||
totalSize.get
|
|
||||||
} else if (rawDataSize.isDefined && rawDataSize.get > 0) {
|
|
||||||
rawDataSize.get
|
|
||||||
} else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
|
|
||||||
try {
|
try {
|
||||||
val hadoopConf = session.sessionState.newHadoopConf()
|
val hadoopConf = session.sessionState.newHadoopConf()
|
||||||
val tablePath = new Path(table.location)
|
val tablePath = new Path(table.location)
|
||||||
|
|
|
@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
|
import org.apache.hadoop.hive.common.StatsSetupConst
|
||||||
import org.apache.hadoop.hive.conf.HiveConf
|
import org.apache.hadoop.hive.conf.HiveConf
|
||||||
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
|
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
|
||||||
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
|
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
|
||||||
|
@ -414,6 +415,47 @@ private[hive] class HiveClientImpl(
|
||||||
|
|
||||||
val properties = Option(h.getParameters).map(_.asScala.toMap).orNull
|
val properties = Option(h.getParameters).map(_.asScala.toMap).orNull
|
||||||
|
|
||||||
|
// Hive-generated Statistics are also recorded in ignoredProperties
|
||||||
|
val ignoredProperties = scala.collection.mutable.Map.empty[String, String]
|
||||||
|
for (key <- HiveStatisticsProperties; value <- properties.get(key)) {
|
||||||
|
ignoredProperties += key -> value
|
||||||
|
}
|
||||||
|
|
||||||
|
val excludedTableProperties = HiveStatisticsProperties ++ Set(
|
||||||
|
// The property value of "comment" is moved to the dedicated field "comment"
|
||||||
|
"comment",
|
||||||
|
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
|
||||||
|
// in the function toHiveTable.
|
||||||
|
"EXTERNAL"
|
||||||
|
)
|
||||||
|
|
||||||
|
val filteredProperties = properties.filterNot {
|
||||||
|
case (key, _) => excludedTableProperties.contains(key)
|
||||||
|
}
|
||||||
|
val comment = properties.get("comment")
|
||||||
|
|
||||||
|
val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_))
|
||||||
|
val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_))
|
||||||
|
val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)).filter(_ >= 0)
|
||||||
|
// TODO: check if this estimate is valid for tables after partition pruning.
|
||||||
|
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
|
||||||
|
// relatively cheap if parameters for the table are populated into the metastore.
|
||||||
|
// Currently, only totalSize, rawDataSize, and rowCount are used to build the field `stats`
|
||||||
|
// TODO: stats should include all the other two fields (`numFiles` and `numPartitions`).
|
||||||
|
// (see StatsSetupConst in Hive)
|
||||||
|
val stats =
|
||||||
|
// When table is external, `totalSize` is always zero, which will influence join strategy
|
||||||
|
// so when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero,
|
||||||
|
// return None. Later, we will use the other ways to estimate the statistics.
|
||||||
|
if (totalSize.isDefined && totalSize.get > 0L) {
|
||||||
|
Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount))
|
||||||
|
} else if (rawDataSize.isDefined && rawDataSize.get > 0) {
|
||||||
|
Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount))
|
||||||
|
} else {
|
||||||
|
// TODO: still fill the rowCount even if sizeInBytes is empty. Might break anything?
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
CatalogTable(
|
CatalogTable(
|
||||||
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
|
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
|
||||||
tableType = h.getTableType match {
|
tableType = h.getTableType match {
|
||||||
|
@ -451,13 +493,15 @@ private[hive] class HiveClientImpl(
|
||||||
),
|
),
|
||||||
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
|
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
|
||||||
// in the function toHiveTable.
|
// in the function toHiveTable.
|
||||||
properties = properties.filter(kv => kv._1 != "comment" && kv._1 != "EXTERNAL"),
|
properties = filteredProperties,
|
||||||
comment = properties.get("comment"),
|
stats = stats,
|
||||||
|
comment = comment,
|
||||||
// In older versions of Spark(before 2.2.0), we expand the view original text and store
|
// In older versions of Spark(before 2.2.0), we expand the view original text and store
|
||||||
// that into `viewExpandedText`, and that should be used in view resolution. So we get
|
// that into `viewExpandedText`, and that should be used in view resolution. So we get
|
||||||
// `viewExpandedText` instead of `viewOriginalText` for viewText here.
|
// `viewExpandedText` instead of `viewOriginalText` for viewText here.
|
||||||
viewText = Option(h.getViewExpandedText),
|
viewText = Option(h.getViewExpandedText),
|
||||||
unsupportedFeatures = unsupportedFeatures)
|
unsupportedFeatures = unsupportedFeatures,
|
||||||
|
ignoredProperties = ignoredProperties.toMap)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,7 +518,12 @@ private[hive] class HiveClientImpl(
|
||||||
}
|
}
|
||||||
|
|
||||||
override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
|
override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
|
||||||
val hiveTable = toHiveTable(table, Some(userName))
|
// getTableOption removes all the Hive-specific properties. Here, we fill them back to ensure
|
||||||
|
// these properties are still available to the others that share the same Hive metastore.
|
||||||
|
// If users explicitly alter these Hive-specific properties through ALTER TABLE DDL, we respect
|
||||||
|
// these user-specified values.
|
||||||
|
val hiveTable = toHiveTable(
|
||||||
|
table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName))
|
||||||
// Do not use `table.qualifiedName` here because this may be a rename
|
// Do not use `table.qualifiedName` here because this may be a rename
|
||||||
val qualifiedTableName = s"${table.database}.$tableName"
|
val qualifiedTableName = s"${table.database}.$tableName"
|
||||||
shim.alterTable(client, qualifiedTableName, hiveTable)
|
shim.alterTable(client, qualifiedTableName, hiveTable)
|
||||||
|
@ -956,4 +1005,14 @@ private[hive] object HiveClientImpl {
|
||||||
parameters =
|
parameters =
|
||||||
if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty)
|
if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Below is the key of table properties for storing Hive-generated statistics
|
||||||
|
private val HiveStatisticsProperties = Set(
|
||||||
|
StatsSetupConst.COLUMN_STATS_ACCURATE,
|
||||||
|
StatsSetupConst.NUM_FILES,
|
||||||
|
StatsSetupConst.NUM_PARTITIONS,
|
||||||
|
StatsSetupConst.ROW_COUNT,
|
||||||
|
StatsSetupConst.RAW_DATA_SIZE,
|
||||||
|
StatsSetupConst.TOTAL_SIZE
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,17 @@ package org.apache.spark.sql.hive.test
|
||||||
|
|
||||||
import org.scalatest.BeforeAndAfterAll
|
import org.scalatest.BeforeAndAfterAll
|
||||||
|
|
||||||
import org.apache.spark.sql.SparkSession
|
|
||||||
import org.apache.spark.SparkFunSuite
|
import org.apache.spark.SparkFunSuite
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
import org.apache.spark.sql.hive.HiveExternalCatalog
|
||||||
|
import org.apache.spark.sql.hive.client.HiveClient
|
||||||
|
|
||||||
|
|
||||||
trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
|
trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
|
||||||
protected val spark: SparkSession = TestHive.sparkSession
|
protected val spark: SparkSession = TestHive.sparkSession
|
||||||
protected val hiveContext: TestHiveContext = TestHive
|
protected val hiveContext: TestHiveContext = TestHive
|
||||||
|
protected val hiveClient: HiveClient =
|
||||||
|
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
|
||||||
|
|
||||||
protected override def afterAll(): Unit = {
|
protected override def afterAll(): Unit = {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -646,7 +646,6 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
|
||||||
}
|
}
|
||||||
|
|
||||||
test("SPARK-15887: hive-site.xml should be loaded") {
|
test("SPARK-15887: hive-site.xml should be loaded") {
|
||||||
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
|
|
||||||
assert(hiveClient.getConf("hive.in.test", "") == "true")
|
assert(hiveClient.getConf("hive.in.test", "") == "true")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,10 +35,6 @@ import org.apache.spark.util.Utils
|
||||||
class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
|
class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
|
||||||
with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
|
with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
|
||||||
|
|
||||||
// To test `HiveExternalCatalog`, we need to read/write the raw table meta from/to hive client.
|
|
||||||
val hiveClient: HiveClient =
|
|
||||||
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
|
|
||||||
|
|
||||||
val tempDir = Utils.createTempDir().getCanonicalFile
|
val tempDir = Utils.createTempDir().getCanonicalFile
|
||||||
val tempDirUri = tempDir.toURI
|
val tempDirUri = tempDir.toURI
|
||||||
val tempDirStr = tempDir.getAbsolutePath
|
val tempDirStr = tempDir.getAbsolutePath
|
||||||
|
|
|
@ -52,11 +52,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
||||||
jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
|
jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
|
||||||
}
|
}
|
||||||
|
|
||||||
// To test `HiveExternalCatalog`, we need to read the raw table metadata(schema, partition
|
|
||||||
// columns and bucket specification are still in table properties) from hive client.
|
|
||||||
private def hiveClient: HiveClient =
|
|
||||||
sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
|
|
||||||
|
|
||||||
test("persistent JSON table") {
|
test("persistent JSON table") {
|
||||||
withTable("jsonTable") {
|
withTable("jsonTable") {
|
||||||
sql(
|
sql(
|
||||||
|
|
|
@ -325,26 +325,20 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
|
||||||
"last_modified_by",
|
"last_modified_by",
|
||||||
"last_modified_time",
|
"last_modified_time",
|
||||||
"Owner:",
|
"Owner:",
|
||||||
"COLUMN_STATS_ACCURATE",
|
|
||||||
// The following are hive specific schema parameters which we do not need to match exactly.
|
// The following are hive specific schema parameters which we do not need to match exactly.
|
||||||
"numFiles",
|
|
||||||
"numRows",
|
|
||||||
"rawDataSize",
|
|
||||||
"totalSize",
|
|
||||||
"totalNumberFiles",
|
"totalNumberFiles",
|
||||||
"maxFileSize",
|
"maxFileSize",
|
||||||
"minFileSize",
|
"minFileSize"
|
||||||
// EXTERNAL is not non-deterministic, but it is filtered out for external tables.
|
|
||||||
"EXTERNAL"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
table.copy(
|
table.copy(
|
||||||
createTime = 0L,
|
createTime = 0L,
|
||||||
lastAccessTime = 0L,
|
lastAccessTime = 0L,
|
||||||
properties = table.properties.filterKeys(!nondeterministicProps.contains(_))
|
properties = table.properties.filterKeys(!nondeterministicProps.contains(_)),
|
||||||
|
stats = None,
|
||||||
|
ignoredProperties = Map.empty
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(normalize(actual) == normalize(expected))
|
assert(normalize(actual) == normalize(expected))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,11 +19,14 @@ package org.apache.spark.sql.hive
|
||||||
|
|
||||||
import java.io.{File, PrintWriter}
|
import java.io.{File, PrintWriter}
|
||||||
|
|
||||||
|
import org.apache.hadoop.hive.common.StatsSetupConst
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
import scala.util.matching.Regex
|
||||||
|
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
|
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
|
||||||
|
import org.apache.spark.sql.catalyst.util.StringUtils
|
||||||
import org.apache.spark.sql.execution.command.DDLUtils
|
import org.apache.spark.sql.execution.command.DDLUtils
|
||||||
import org.apache.spark.sql.execution.datasources.LogicalRelation
|
import org.apache.spark.sql.execution.datasources.LogicalRelation
|
||||||
import org.apache.spark.sql.execution.joins._
|
import org.apache.spark.sql.execution.joins._
|
||||||
|
@ -61,7 +64,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|
||||||
val relation = spark.table("csv_table").queryExecution.analyzed.children.head
|
val relation = spark.table("csv_table").queryExecution.analyzed.children.head
|
||||||
.asInstanceOf[CatalogRelation]
|
.asInstanceOf[CatalogRelation]
|
||||||
|
|
||||||
val properties = relation.tableMeta.properties
|
val properties = relation.tableMeta.ignoredProperties
|
||||||
assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0")
|
assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0")
|
||||||
assert(properties("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")
|
assert(properties("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")
|
||||||
|
|
||||||
|
@ -175,7 +178,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|
||||||
sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
|
sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
|
||||||
checkTableStats(
|
checkTableStats(
|
||||||
textTable,
|
textTable,
|
||||||
hasSizeInBytes = false,
|
hasSizeInBytes = true,
|
||||||
expectedRowCounts = None)
|
expectedRowCounts = None)
|
||||||
|
|
||||||
// noscan won't count the number of rows
|
// noscan won't count the number of rows
|
||||||
|
@ -215,6 +218,210 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def createNonPartitionedTable(
|
||||||
|
tabName: String,
|
||||||
|
analyzedBySpark: Boolean = true,
|
||||||
|
analyzedByHive: Boolean = true): Unit = {
|
||||||
|
sql(
|
||||||
|
s"""
|
||||||
|
|CREATE TABLE $tabName (key STRING, value STRING)
|
||||||
|
|STORED AS TEXTFILE
|
||||||
|
|TBLPROPERTIES ('prop1' = 'val1', 'prop2' = 'val2')
|
||||||
|
""".stripMargin)
|
||||||
|
sql(s"INSERT INTO TABLE $tabName SELECT * FROM src")
|
||||||
|
if (analyzedBySpark) sql(s"ANALYZE TABLE $tabName COMPUTE STATISTICS")
|
||||||
|
// This is to mimic the scenario in which Hive genrates statistics before we reading it
|
||||||
|
if (analyzedByHive) hiveClient.runSqlHive(s"ANALYZE TABLE $tabName COMPUTE STATISTICS")
|
||||||
|
val describeResult1 = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName")
|
||||||
|
|
||||||
|
val tableMetadata =
|
||||||
|
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)).properties
|
||||||
|
// statistics info is not contained in the metadata of the original table
|
||||||
|
assert(Seq(StatsSetupConst.COLUMN_STATS_ACCURATE,
|
||||||
|
StatsSetupConst.NUM_FILES,
|
||||||
|
StatsSetupConst.NUM_PARTITIONS,
|
||||||
|
StatsSetupConst.ROW_COUNT,
|
||||||
|
StatsSetupConst.RAW_DATA_SIZE,
|
||||||
|
StatsSetupConst.TOTAL_SIZE).forall(!tableMetadata.contains(_)))
|
||||||
|
|
||||||
|
if (analyzedByHive) {
|
||||||
|
assert(StringUtils.filterPattern(describeResult1, "*numRows\\s+500*").nonEmpty)
|
||||||
|
} else {
|
||||||
|
assert(StringUtils.filterPattern(describeResult1, "*numRows\\s+500*").isEmpty)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def extractStatsPropValues(
|
||||||
|
descOutput: Seq[String],
|
||||||
|
propKey: String): Option[BigInt] = {
|
||||||
|
val str = descOutput
|
||||||
|
.filterNot(_.contains(HiveExternalCatalog.STATISTICS_PREFIX))
|
||||||
|
.filter(_.contains(propKey))
|
||||||
|
if (str.isEmpty) {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
assert(str.length == 1, "found more than one matches")
|
||||||
|
val pattern = new Regex(s"""$propKey\\s+(-?\\d+)""")
|
||||||
|
val pattern(value) = str.head.trim
|
||||||
|
Option(BigInt(value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("get statistics when not analyzed in both Hive and Spark") {
|
||||||
|
val tabName = "tab1"
|
||||||
|
withTable(tabName) {
|
||||||
|
createNonPartitionedTable(tabName, analyzedByHive = false, analyzedBySpark = false)
|
||||||
|
checkTableStats(
|
||||||
|
tabName, hasSizeInBytes = true, expectedRowCounts = None)
|
||||||
|
|
||||||
|
// ALTER TABLE SET TBLPROPERTIES invalidates some contents of Hive specific statistics
|
||||||
|
// This is triggered by the Hive alterTable API
|
||||||
|
val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName")
|
||||||
|
|
||||||
|
val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize")
|
||||||
|
val numRows = extractStatsPropValues(describeResult, "numRows")
|
||||||
|
val totalSize = extractStatsPropValues(describeResult, "totalSize")
|
||||||
|
assert(rawDataSize.isEmpty, "rawDataSize should not be shown without table analysis")
|
||||||
|
assert(numRows.isEmpty, "numRows should not be shown without table analysis")
|
||||||
|
assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("alter table rename after analyze table") {
|
||||||
|
Seq(true, false).foreach { analyzedBySpark =>
|
||||||
|
val oldName = "tab1"
|
||||||
|
val newName = "tab2"
|
||||||
|
withTable(oldName, newName) {
|
||||||
|
createNonPartitionedTable(oldName, analyzedByHive = true, analyzedBySpark = analyzedBySpark)
|
||||||
|
val fetchedStats1 = checkTableStats(
|
||||||
|
oldName, hasSizeInBytes = true, expectedRowCounts = Some(500))
|
||||||
|
sql(s"ALTER TABLE $oldName RENAME TO $newName")
|
||||||
|
val fetchedStats2 = checkTableStats(
|
||||||
|
newName, hasSizeInBytes = true, expectedRowCounts = Some(500))
|
||||||
|
assert(fetchedStats1 == fetchedStats2)
|
||||||
|
|
||||||
|
// ALTER TABLE RENAME does not affect the contents of Hive specific statistics
|
||||||
|
val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $newName")
|
||||||
|
|
||||||
|
val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize")
|
||||||
|
val numRows = extractStatsPropValues(describeResult, "numRows")
|
||||||
|
val totalSize = extractStatsPropValues(describeResult, "totalSize")
|
||||||
|
assert(rawDataSize.isDefined && rawDataSize.get > 0, "rawDataSize is lost")
|
||||||
|
assert(numRows.isDefined && numRows.get == 500, "numRows is lost")
|
||||||
|
assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("alter table SET TBLPROPERTIES after analyze table") {
|
||||||
|
Seq(true, false).foreach { analyzedBySpark =>
|
||||||
|
val tabName = "tab1"
|
||||||
|
withTable(tabName) {
|
||||||
|
createNonPartitionedTable(tabName, analyzedByHive = true, analyzedBySpark = analyzedBySpark)
|
||||||
|
val fetchedStats1 = checkTableStats(
|
||||||
|
tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
|
||||||
|
sql(s"ALTER TABLE $tabName SET TBLPROPERTIES ('foo' = 'a')")
|
||||||
|
val fetchedStats2 = checkTableStats(
|
||||||
|
tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
|
||||||
|
assert(fetchedStats1 == fetchedStats2)
|
||||||
|
|
||||||
|
val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName")
|
||||||
|
|
||||||
|
val totalSize = extractStatsPropValues(describeResult, "totalSize")
|
||||||
|
assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost")
|
||||||
|
|
||||||
|
// ALTER TABLE SET TBLPROPERTIES invalidates some Hive specific statistics
|
||||||
|
// This is triggered by the Hive alterTable API
|
||||||
|
val numRows = extractStatsPropValues(describeResult, "numRows")
|
||||||
|
assert(numRows.isDefined && numRows.get == -1, "numRows is lost")
|
||||||
|
val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize")
|
||||||
|
assert(rawDataSize.isDefined && rawDataSize.get == -1, "rawDataSize is lost")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("alter table UNSET TBLPROPERTIES after analyze table") {
|
||||||
|
Seq(true, false).foreach { analyzedBySpark =>
|
||||||
|
val tabName = "tab1"
|
||||||
|
withTable(tabName) {
|
||||||
|
createNonPartitionedTable(tabName, analyzedByHive = true, analyzedBySpark = analyzedBySpark)
|
||||||
|
val fetchedStats1 = checkTableStats(
|
||||||
|
tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
|
||||||
|
sql(s"ALTER TABLE $tabName UNSET TBLPROPERTIES ('prop1')")
|
||||||
|
val fetchedStats2 = checkTableStats(
|
||||||
|
tabName, hasSizeInBytes = true, expectedRowCounts = Some(500))
|
||||||
|
assert(fetchedStats1 == fetchedStats2)
|
||||||
|
|
||||||
|
val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName")
|
||||||
|
|
||||||
|
val totalSize = extractStatsPropValues(describeResult, "totalSize")
|
||||||
|
assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost")
|
||||||
|
|
||||||
|
// ALTER TABLE UNSET TBLPROPERTIES invalidates some Hive specific statistics
|
||||||
|
// This is triggered by the Hive alterTable API
|
||||||
|
val numRows = extractStatsPropValues(describeResult, "numRows")
|
||||||
|
assert(numRows.isDefined && numRows.get == -1, "numRows is lost")
|
||||||
|
val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize")
|
||||||
|
assert(rawDataSize.isDefined && rawDataSize.get == -1, "rawDataSize is lost")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("add/drop partitions - managed table") {
|
||||||
|
val catalog = spark.sessionState.catalog
|
||||||
|
val managedTable = "partitionedTable"
|
||||||
|
withTable(managedTable) {
|
||||||
|
sql(
|
||||||
|
s"""
|
||||||
|
|CREATE TABLE $managedTable (key INT, value STRING)
|
||||||
|
|PARTITIONED BY (ds STRING, hr STRING)
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
|
||||||
|
sql(
|
||||||
|
s"""
|
||||||
|
|INSERT OVERWRITE TABLE $managedTable
|
||||||
|
|partition (ds='$ds',hr='$hr')
|
||||||
|
|SELECT 1, 'a'
|
||||||
|
""".stripMargin)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkTableStats(
|
||||||
|
managedTable, hasSizeInBytes = false, expectedRowCounts = None)
|
||||||
|
|
||||||
|
sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS")
|
||||||
|
|
||||||
|
val stats1 = checkTableStats(
|
||||||
|
managedTable, hasSizeInBytes = true, expectedRowCounts = Some(4))
|
||||||
|
|
||||||
|
sql(
|
||||||
|
s"""
|
||||||
|
|ALTER TABLE $managedTable DROP PARTITION (ds='2008-04-08'),
|
||||||
|
|PARTITION (hr='12')
|
||||||
|
""".stripMargin)
|
||||||
|
assert(catalog.listPartitions(TableIdentifier(managedTable)).map(_.spec).toSet ==
|
||||||
|
Set(Map("ds" -> "2008-04-09", "hr" -> "11")))
|
||||||
|
|
||||||
|
val stats2 = checkTableStats(
|
||||||
|
managedTable, hasSizeInBytes = true, expectedRowCounts = Some(4))
|
||||||
|
assert(stats1 == stats2)
|
||||||
|
|
||||||
|
sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS")
|
||||||
|
|
||||||
|
val stats3 = checkTableStats(
|
||||||
|
managedTable, hasSizeInBytes = true, expectedRowCounts = Some(1))
|
||||||
|
assert(stats2.get.sizeInBytes > stats3.get.sizeInBytes)
|
||||||
|
|
||||||
|
sql(s"ALTER TABLE $managedTable ADD PARTITION (ds='2008-04-08', hr='12')")
|
||||||
|
sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS")
|
||||||
|
val stats4 = checkTableStats(
|
||||||
|
managedTable, hasSizeInBytes = true, expectedRowCounts = Some(1))
|
||||||
|
|
||||||
|
assert(stats2.get.sizeInBytes > stats4.get.sizeInBytes)
|
||||||
|
assert(stats4.get.sizeInBytes == stats3.get.sizeInBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
test("test statistics of LogicalRelation converted from Hive serde tables") {
|
test("test statistics of LogicalRelation converted from Hive serde tables") {
|
||||||
val parquetTable = "parquetTable"
|
val parquetTable = "parquetTable"
|
||||||
val orcTable = "orcTable"
|
val orcTable = "orcTable"
|
||||||
|
@ -232,7 +439,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|
||||||
checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
|
checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
|
||||||
}
|
}
|
||||||
withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") {
|
withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") {
|
||||||
checkTableStats(orcTable, hasSizeInBytes = false, expectedRowCounts = None)
|
// We still can get tableSize from Hive before Analyze
|
||||||
|
checkTableStats(orcTable, hasSizeInBytes = true, expectedRowCounts = None)
|
||||||
sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS")
|
sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS")
|
||||||
checkTableStats(orcTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
|
checkTableStats(orcTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
|
||||||
}
|
}
|
||||||
|
@ -254,7 +462,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|
||||||
sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", "))
|
sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", "))
|
||||||
|
|
||||||
// Validate statistics
|
// Validate statistics
|
||||||
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
|
|
||||||
val table = hiveClient.getTable("default", tableName)
|
val table = hiveClient.getTable("default", tableName)
|
||||||
|
|
||||||
val props = table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats"))
|
val props = table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats"))
|
||||||
|
|
|
@ -576,7 +576,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
|
||||||
versionSpark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
|
versionSpark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
|
||||||
assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1)))
|
assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1)))
|
||||||
val tableMeta = versionSpark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl"))
|
val tableMeta = versionSpark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl"))
|
||||||
val totalSize = tableMeta.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
|
val totalSize = tableMeta.stats.map(_.sizeInBytes)
|
||||||
// Except 0.12, all the following versions will fill the Hive-generated statistics
|
// Except 0.12, all the following versions will fill the Hive-generated statistics
|
||||||
if (version == "0.12") {
|
if (version == "0.12") {
|
||||||
assert(totalSize.isEmpty)
|
assert(totalSize.isEmpty)
|
||||||
|
|
|
@ -192,12 +192,7 @@ abstract class HiveComparisonTest
|
||||||
"last_modified_by",
|
"last_modified_by",
|
||||||
"last_modified_time",
|
"last_modified_time",
|
||||||
"Owner:",
|
"Owner:",
|
||||||
"COLUMN_STATS_ACCURATE",
|
|
||||||
// The following are hive specific schema parameters which we do not need to match exactly.
|
// The following are hive specific schema parameters which we do not need to match exactly.
|
||||||
"numFiles",
|
|
||||||
"numRows",
|
|
||||||
"rawDataSize",
|
|
||||||
"totalSize",
|
|
||||||
"totalNumberFiles",
|
"totalNumberFiles",
|
||||||
"maxFileSize",
|
"maxFileSize",
|
||||||
"minFileSize"
|
"minFileSize"
|
||||||
|
|
|
@ -1199,11 +1199,6 @@ class HiveDDLSuite
|
||||||
"last_modified_by",
|
"last_modified_by",
|
||||||
"last_modified_time",
|
"last_modified_time",
|
||||||
"Owner:",
|
"Owner:",
|
||||||
"COLUMN_STATS_ACCURATE",
|
|
||||||
"numFiles",
|
|
||||||
"numRows",
|
|
||||||
"rawDataSize",
|
|
||||||
"totalSize",
|
|
||||||
"totalNumberFiles",
|
"totalNumberFiles",
|
||||||
"maxFileSize",
|
"maxFileSize",
|
||||||
"minFileSize"
|
"minFileSize"
|
||||||
|
|
|
@ -28,8 +28,6 @@ import org.apache.spark.sql.types.StructType
|
||||||
* A test suite for Hive view related functionality.
|
* A test suite for Hive view related functionality.
|
||||||
*/
|
*/
|
||||||
class HiveSQLViewSuite extends SQLViewSuite with TestHiveSingleton {
|
class HiveSQLViewSuite extends SQLViewSuite with TestHiveSingleton {
|
||||||
protected override val spark: SparkSession = TestHive.sparkSession
|
|
||||||
|
|
||||||
import testImplicits._
|
import testImplicits._
|
||||||
|
|
||||||
test("create a permanent/temp view using a hive, built-in, and permanent user function") {
|
test("create a permanent/temp view using a hive, built-in, and permanent user function") {
|
||||||
|
|
|
@ -153,7 +153,6 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
|
||||||
}
|
}
|
||||||
|
|
||||||
test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
|
test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
|
||||||
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
|
|
||||||
val location = Utils.createTempDir()
|
val location = Utils.createTempDir()
|
||||||
val uri = location.toURI
|
val uri = location.toURI
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in a new issue