[SPARK-18911][SQL] Define CatalogStatistics to interact with metastore and convert it to Statistics in relations

## What changes were proposed in this pull request?

Statistics in LogicalPlan should use attributes to refer to columns rather than column names, because two columns from two relations can have the same column name. But CatalogTable doesn't have the concepts of attribute or broadcast hint in Statistics. Therefore, putting Statistics in CatalogTable is confusing.

We define a different statistic structure in CatalogTable, which is only responsible for interacting with metastore, and is converted to statistics in LogicalPlan when it is used.

## How was this patch tested?

add test cases

Author: wangzhenhua <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>

Closes #16323 from wzhfy/nameToAttr.
This commit is contained in:
wangzhenhua 2016-12-24 15:34:44 +08:00 committed by Wenchen Fan
parent a848f0ba84
commit 3cff816157
9 changed files with 95 additions and 23 deletions

View file

@ -21,8 +21,8 @@ import java.util.Date
import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.types.{StructField, StructType}
@ -171,7 +171,7 @@ case class CatalogTable(
createTime: Long = System.currentTimeMillis, createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1, lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty, properties: Map[String, String] = Map.empty,
stats: Option[Statistics] = None, stats: Option[CatalogStatistics] = None,
viewOriginalText: Option[String] = None, viewOriginalText: Option[String] = None,
viewText: Option[String] = None, viewText: Option[String] = None,
comment: Option[String] = None, comment: Option[String] = None,
@ -247,6 +247,34 @@ case class CatalogTable(
} }
/**
* This class of statistics is used in [[CatalogTable]] to interact with metastore.
* We define this new class instead of directly using [[Statistics]] here because there are no
* concepts of attributes or broadcast hint in catalog.
*/
case class CatalogStatistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
colStats: Map[String, ColumnStat] = Map.empty) {
/**
* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
* on column names.
*/
def toPlanStats(planOutput: Seq[Attribute]): Statistics = {
val matched = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
attributeStats = AttributeMap(matched))
}
/** Readable string representation for the CatalogStatistics. */
def simpleString: String = {
val rowCountString = if (rowCount.isDefined) s", ${rowCount.get} rows" else ""
s"$sizeInBytes bytes$rowCountString"
}
}
case class CatalogTableType private(name: String) case class CatalogTableType private(name: String)
object CatalogTableType { object CatalogTableType {
val EXTERNAL = new CatalogTableType("EXTERNAL") val EXTERNAL = new CatalogTableType("EXTERNAL")

View file

@ -41,13 +41,13 @@ import org.apache.spark.sql.types._
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`. * defaults to the product of children's `sizeInBytes`.
* @param rowCount Estimated number of rows. * @param rowCount Estimated number of rows.
* @param colStats Column-level statistics. * @param attributeStats Statistics for Attributes.
* @param isBroadcastable If true, output is small enough to be used in a broadcast join. * @param isBroadcastable If true, output is small enough to be used in a broadcast join.
*/ */
case class Statistics( case class Statistics(
sizeInBytes: BigInt, sizeInBytes: BigInt,
rowCount: Option[BigInt] = None, rowCount: Option[BigInt] = None,
colStats: Map[String, ColumnStat] = Map.empty, attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
isBroadcastable: Boolean = false) { isBroadcastable: Boolean = false) {
override def toString: String = "Statistics(" + simpleString + ")" override def toString: String = "Statistics(" + simpleString + ")"

View file

@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
@ -64,7 +64,7 @@ case class AnalyzeColumnCommand(
AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames) AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames)
// We also update table-level stats in order to keep them consistent with column-level stats. // We also update table-level stats in order to keep them consistent with column-level stats.
val statistics = Statistics( val statistics = CatalogStatistics(
sizeInBytes = sizeInBytes, sizeInBytes = sizeInBytes,
rowCount = Some(rowCount), rowCount = Some(rowCount),
// Newly computed column stats should override the existing ones. // Newly computed column stats should override the existing ones.

View file

@ -25,8 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.internal.SessionState
@ -62,9 +61,9 @@ case class AnalyzeTableCommand(
def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L) val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
var newStats: Option[Statistics] = None var newStats: Option[CatalogStatistics] = None
if (newTotalSize > 0 && newTotalSize != oldTotalSize) { if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
newStats = Some(Statistics(sizeInBytes = newTotalSize)) newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
} }
// We only set rowCount when noscan is false, because otherwise: // We only set rowCount when noscan is false, because otherwise:
// 1. when total size is not changed, we don't need to alter the table; // 1. when total size is not changed, we don't need to alter the table;
@ -76,7 +75,8 @@ case class AnalyzeTableCommand(
newStats = if (newStats.isDefined) { newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
} else { } else {
Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) Some(CatalogStatistics(
sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
} }
} }
} }

View file

@ -73,7 +73,7 @@ case class LogicalRelation(
override lazy val cleanArgs: Seq[Any] = Seq(relation) override lazy val cleanArgs: Seq[Any] = Seq(relation)
@transient override lazy val statistics: Statistics = { @transient override lazy val statistics: Statistics = {
catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse( catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(
Statistics(sizeInBytes = relation.sizeInBytes)) Statistics(sizeInBytes = relation.sizeInBytes))
} }

View file

@ -24,6 +24,7 @@ import scala.collection.mutable
import scala.util.Random import scala.util.Random
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.internal.StaticSQLConf
@ -39,7 +40,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
import testImplicits._ import testImplicits._
private def checkTableStats(tableName: String, expectedRowCount: Option[Int]) private def checkTableStats(tableName: String, expectedRowCount: Option[Int])
: Option[Statistics] = { : Option[CatalogStatistics] = {
val df = spark.table(tableName) val df = spark.table(tableName)
val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation => val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount) assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
@ -260,4 +261,46 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
} }
} }
} }
// This test will be run twice: with and without Hive support
test("conversion from CatalogStatistics to Statistics") {
withTable("ds_tbl", "hive_tbl") {
// Test data source table
checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true)
// Test hive serde table
if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false)
}
}
}
private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = {
// Create an empty table and run analyze command on it.
val createTableSql = if (isDatasourceTable) {
s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET"
} else {
s"CREATE TABLE $tableName (c1 INT, c2 STRING)"
}
sql(createTableSql)
// Analyze only one column.
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
case catalogRel: CatalogRelation => (catalogRel, catalogRel.catalogTable)
case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
}.head
val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
// Check catalog statistics
assert(catalogTable.stats.isDefined)
assert(catalogTable.stats.get.sizeInBytes == 0)
assert(catalogTable.stats.get.rowCount == Some(0))
assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))
// Check relation statistics
assert(relation.statistics.sizeInBytes == 0)
assert(relation.statistics.rowCount == Some(0))
assert(relation.statistics.attributeStats.size == 1)
val (attribute, colStat) = relation.statistics.attributeStats.head
assert(attribute.name == "c1")
assert(colStat == emptyColStat)
}
} }

View file

@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics} import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils
@ -656,7 +656,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
} }
table = table.copy( table = table.copy(
stats = Some(Statistics( stats = Some(CatalogStatistics(
sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)), sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)),
rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)), rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
colStats = colStats.toMap))) colStats = colStats.toMap)))

View file

@ -113,7 +113,7 @@ private[hive] case class MetastoreRelation(
} }
@transient override lazy val statistics: Statistics = { @transient override lazy val statistics: Statistics = {
catalogTable.stats.getOrElse(Statistics( catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
sizeInBytes = { sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)

View file

@ -23,7 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.joins._
@ -152,7 +152,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
} }
private def checkTableStats( private def checkTableStats(
stats: Option[Statistics], stats: Option[CatalogStatistics],
hasSizeInBytes: Boolean, hasSizeInBytes: Boolean,
expectedRowCounts: Option[Int]): Unit = { expectedRowCounts: Option[Int]): Unit = {
if (hasSizeInBytes || expectedRowCounts.nonEmpty) { if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
@ -168,7 +168,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
tableName: String, tableName: String,
isDataSourceTable: Boolean, isDataSourceTable: Boolean,
hasSizeInBytes: Boolean, hasSizeInBytes: Boolean,
expectedRowCounts: Option[Int]): Option[Statistics] = { expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
val df = sql(s"SELECT * FROM $tableName") val df = sql(s"SELECT * FROM $tableName")
val stats = df.queryExecution.analyzed.collect { val stats = df.queryExecution.analyzed.collect {
case rel: MetastoreRelation => case rel: MetastoreRelation =>
@ -435,10 +435,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
} }
/** Used to test refreshing cached metadata once table stats are updated. */ /** Used to test refreshing cached metadata once table stats are updated. */
private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean): (Statistics, Statistics) = { private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean)
: (CatalogStatistics, CatalogStatistics) = {
val tableName = "tbl" val tableName = "tbl"
var statsBeforeUpdate: Statistics = null var statsBeforeUpdate: CatalogStatistics = null
var statsAfterUpdate: Statistics = null var statsAfterUpdate: CatalogStatistics = null
withTable(tableName) { withTable(tableName) {
val tableIndent = TableIdentifier(tableName, Some("default")) val tableIndent = TableIdentifier(tableName, Some("default"))
val catalog = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog] val catalog = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]