[SPARK-16691][SQL] move BucketSpec to catalyst module and use it in CatalogTable

## What changes were proposed in this pull request?

It's weird that we have `BucketSpec` to abstract bucket info, but don't use it in `CatalogTable`. This PR moves `BucketSpec` into catalyst module.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14331 from cloud-fan/check.
This commit is contained in:
Wenchen Fan 2016-07-25 22:05:48 +08:00 committed by Cheng Lian
parent d27d362eba
commit 64529b186a
21 changed files with 78 additions and 78 deletions

View file

@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
/**
@ -109,6 +110,24 @@ case class CatalogTablePartition(
storage: CatalogStorageFormat)
/**
* A container for bucketing information.
* Bucketing is a technology for decomposing data sets into more manageable parts, and the number
* of buckets is fixed so it does not fluctuate with data.
*
* @param numBuckets number of buckets.
* @param bucketColumnNames the names of the columns that used to generate the bucket id.
* @param sortColumnNames the names of the columns that used to sort data in each bucket.
*/
case class BucketSpec(
numBuckets: Int,
bucketColumnNames: Seq[String],
sortColumnNames: Seq[String]) {
if (numBuckets <= 0) {
throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.")
}
}
/**
* A table defined in the catalog.
*
@ -124,9 +143,7 @@ case class CatalogTable(
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
partitionColumnNames: Seq[String] = Seq.empty,
sortColumnNames: Seq[String] = Seq.empty,
bucketColumnNames: Seq[String] = Seq.empty,
numBuckets: Int = -1,
bucketSpec: Option[BucketSpec] = None,
owner: String = "",
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
@ -143,8 +160,8 @@ case class CatalogTable(
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
}
requireSubsetOfSchema(partitionColumnNames, "partition")
requireSubsetOfSchema(sortColumnNames, "sort")
requireSubsetOfSchema(bucketColumnNames, "bucket")
requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort")
requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket")
/** Columns this table is partitioned by. */
def partitionColumns: Seq[CatalogColumn] =
@ -172,9 +189,19 @@ case class CatalogTable(
override def toString: String = {
val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
val partitionColumns = partitionColumnNames.map("`" + _ + "`").mkString("[", ", ", "]")
val sortColumns = sortColumnNames.map("`" + _ + "`").mkString("[", ", ", "]")
val bucketColumns = bucketColumnNames.map("`" + _ + "`").mkString("[", ", ", "]")
val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
val bucketStrings = bucketSpec match {
case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
val bucketColumnsString = bucketColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
val sortColumnsString = sortColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
Seq(
s"Num Buckets: $numBuckets",
if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumnsString" else "",
if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumnsString" else ""
)
case _ => Nil
}
val output =
Seq(s"Table: ${identifier.quotedString}",
@ -183,10 +210,8 @@ case class CatalogTable(
s"Last Access: ${new Date(lastAccessTime).toString}",
s"Type: ${tableType.name}",
if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "",
if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "",
if (numBuckets != -1) s"Num Buckets: $numBuckets" else "",
if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumns" else "",
if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumns" else "",
if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else ""
) ++ bucketStrings ++ Seq(
viewOriginalText.map("Original View: " + _).getOrElse(""),
viewText.map("View: " + _).getOrElse(""),
comment.map("Comment: " + _).getOrElse(""),

View file

@ -692,7 +692,7 @@ abstract class CatalogTestUtils {
CatalogColumn("a", "int"),
CatalogColumn("b", "string")),
partitionColumnNames = Seq("a", "b"),
bucketColumnNames = Seq("col1"))
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
}
def newFunc(name: String, database: Option[String] = None): CatalogFunction = {

View file

@ -23,8 +23,9 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
/**

View file

@ -21,12 +21,11 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._

View file

@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@ -498,23 +498,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
def appendBucketInfo(numBuckets: Int, bucketColumns: Seq[String], sortColumns: Seq[String]) = {
append(buffer, "Num Buckets:", numBuckets.toString, "")
append(buffer, "Bucket Columns:", bucketColumns.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "")
def appendBucketInfo(bucketSpec: Option[BucketSpec]) = bucketSpec match {
case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
append(buffer, "Num Buckets:", numBuckets.toString, "")
append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", sortColumnNames.mkString("[", ", ", "]"), "")
case _ =>
}
DDLUtils.getBucketSpecFromTableProperties(metadata) match {
case Some(bucketSpec) =>
appendBucketInfo(
bucketSpec.numBuckets,
bucketSpec.bucketColumnNames,
bucketSpec.sortColumnNames)
case None =>
appendBucketInfo(
metadata.numBuckets,
metadata.bucketColumnNames,
metadata.sortColumnNames)
if (DDLUtils.isDatasourceTable(metadata)) {
appendBucketInfo(DDLUtils.getBucketSpecFromTableProperties(metadata))
} else {
appendBucketInfo(metadata.bucketSpec)
}
}
@ -808,7 +804,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
}
if (metadata.bucketColumnNames.nonEmpty) {
if (metadata.bucketSpec.isDefined) {
throw new UnsupportedOperationException(
"Creating Hive table with bucket spec is not supported yet.")
}

View file

@ -17,26 +17,6 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.AnalysisException
/**
* A container for bucketing information.
* Bucketing is a technology for decomposing data sets into more manageable parts, and the number
* of buckets is fixed so it does not fluctuate with data.
*
* @param numBuckets number of buckets.
* @param bucketColumnNames the names of the columns that used to generate the bucket id.
* @param sortColumnNames the names of the columns that used to sort data in each bucket.
*/
private[sql] case class BucketSpec(
numBuckets: Int,
bucketColumnNames: Seq[String],
sortColumnNames: Seq[String]) {
if (numBuckets <= 0) {
throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.")
}
}
private[sql] object BucketingUtils {
// The file name of bucketed data should have 3 parts:
// 1. some other information in the head of file name

View file

@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat

View file

@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

View file

@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.datasources
import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.InternalRow

View file

@ -28,6 +28,7 @@ import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

View file

@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources
import scala.collection.mutable
import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
@ -30,6 +29,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.FileRelation

View file

@ -153,7 +153,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier)
val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
val bucketColumnNames = tableMetadata.bucketColumnNames.toSet
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
val columns = tableMetadata.schema.map { c =>
new Column(
name = c.name,

View file

@ -20,15 +20,15 @@ package org.apache.spark.sql.execution.command
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing}
import org.apache.spark.sql.execution.datasources.CreateTableUsing
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructType}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
// TODO: merge this with DDLSuite (SPARK-14441)

View file

@ -26,13 +26,12 @@ import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}

View file

@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.execution.DataSourceScanExec

View file

@ -90,11 +90,12 @@ class CatalogSuite
.getOrElse { spark.catalog.listColumns(tableName) }
assume(tableMetadata.schema.nonEmpty, "bad test")
assume(tableMetadata.partitionColumnNames.nonEmpty, "bad test")
assume(tableMetadata.bucketColumnNames.nonEmpty, "bad test")
assume(tableMetadata.bucketSpec.isDefined, "bad test")
assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet)
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
columns.collect().foreach { col =>
assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name))
assert(col.isBucket == tableMetadata.bucketColumnNames.contains(col.name))
assert(col.isBucket == bucketColumnNames.contains(col.name))
}
}

View file

@ -24,9 +24,9 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils

View file

@ -365,9 +365,9 @@ private[hive] class HiveClientImpl(
},
schema = schema,
partitionColumnNames = partCols.map(_.name),
sortColumnNames = Seq(), // TODO: populate this
bucketColumnNames = h.getBucketCols.asScala,
numBuckets = h.getNumBuckets,
// We can not populate bucketing information for Hive tables as Spark SQL has a different
// implementation of hash function from Hive.
bucketSpec = None,
owner = h.getOwner,
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
@ -764,10 +764,7 @@ private[hive] class HiveClientImpl(
hiveTable.setFields(schema.asJava)
}
hiveTable.setPartCols(partCols.asJava)
// TODO: set sort columns here too
hiveTable.setBucketCols(table.bucketColumnNames.asJava)
hiveTable.setOwner(conf.getUser)
hiveTable.setNumBuckets(table.numBuckets)
hiveTable.setCreateTime((table.createTime / 1000).toInt)
hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }

View file

@ -293,9 +293,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(desc.tableType == CatalogTableType.MANAGED)
assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string")))
assert(desc.partitionColumnNames.isEmpty)
assert(desc.sortColumnNames.isEmpty)
assert(desc.bucketColumnNames.isEmpty)
assert(desc.numBuckets == -1)
assert(desc.bucketSpec.isEmpty)
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
assert(desc.storage.locationUri.isEmpty)
@ -453,9 +451,7 @@ class HiveDDLCommandSuite extends PlanTest {
CatalogColumn("name", "string"),
CatalogColumn("month", "int")))
assert(desc.partitionColumnNames == Seq("month"))
assert(desc.sortColumnNames.isEmpty)
assert(desc.bucketColumnNames.isEmpty)
assert(desc.numBuckets == -1)
assert(desc.bucketSpec.isEmpty)
assert(desc.viewText.isEmpty)
assert(desc.viewOriginalText.isEmpty)
assert(desc.storage.locationUri == Some("/path/to/mercury"))

View file

@ -20,10 +20,11 @@ package org.apache.spark.sql.sources
import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions._