[SPARK-18028][SQL] simplify TableFileCatalog

## What changes were proposed in this pull request?

Simplify/cleanup TableFileCatalog:

1. pass a `CatalogTable` instead of `databaseName` and `tableName` into `TableFileCatalog`, so that we don't need to fetch table metadata from metastore again
2. In `TableFileCatalog.filterPartitions0`, DO NOT set `PartitioningAwareFileCatalog.BASE_PATH_PARAM`. According to the [classdoc](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L189-L209), the default value of `basePath` already satisfies our need. What's more, if we set this parameter, we may break the case 2 which is metioned in the classdoc.
3. add `equals` and `hashCode` to `TableFileCatalog`
4. add `SessionCatalog.listPartitionsByFilter` which handles case sensitivity.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15568 from cloud-fan/table-file-catalog.
This commit is contained in:
Wenchen Fan 2016-10-25 08:42:21 +08:00
parent 407c3cedf2
commit 84a3399908
5 changed files with 84 additions and 36 deletions

View file

@ -755,6 +755,20 @@ class SessionCatalog(
externalCatalog.listPartitions(db, table, partialSpec)
}
/**
* List the metadata of partitions that belong to the specified table, assuming it exists, that
* satisfy the given partition-pruning predicate expressions.
*/
def listPartitionsByFilter(
tableName: TableIdentifier,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
externalCatalog.listPartitionsByFilter(db, table, predicates)
}
/**
* Verify if the input partition spec exactly matches the existing defined partition spec
* The columns must be the same but the orders could be different.

View file

@ -20,36 +20,30 @@ package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType
/**
* A [[FileCatalog]] for a metastore catalog table.
*
* @param sparkSession a [[SparkSession]]
* @param db the table's database name
* @param table the table's (unqualified) name
* @param partitionSchema the schema of a partitioned table's partition columns
* @param table the metadata of the table
* @param sizeInBytes the table's data size in bytes
* @param fileStatusCache optional cache implementation to use for file listing
*/
class TableFileCatalog(
sparkSession: SparkSession,
db: String,
table: String,
partitionSchema: Option[StructType],
val table: CatalogTable,
override val sizeInBytes: Long) extends FileCatalog {
protected val hadoopConf = sparkSession.sessionState.newHadoopConf
private val fileStatusCache = FileStatusCache.newCache(sparkSession)
private val externalCatalog = sparkSession.sharedState.externalCatalog
assert(table.identifier.database.isDefined,
"The table identifier must be qualified in TableFileCatalog")
private val catalogTable = externalCatalog.getTable(db, table)
private val baseLocation = catalogTable.storage.locationUri
private val baseLocation = table.storage.locationUri
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
@ -66,24 +60,32 @@ class TableFileCatalog(
* @param filters partition-pruning filters
*/
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
val parameters = baseLocation
.map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc))
.getOrElse(Map.empty)
partitionSchema match {
case Some(schema) =>
val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters)
val partitions = selectedPartitions.map { p =>
PartitionPath(p.toRow(schema), p.storage.locationUri.get)
}
val partitionSpec = PartitionSpec(schema, partitions)
new PrunedTableFileCatalog(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
case None =>
new ListingFileCatalog(sparkSession, rootPaths, parameters, None, fileStatusCache)
if (table.partitionColumnNames.nonEmpty) {
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitionSchema = table.partitionSchema
val partitions = selectedPartitions.map { p =>
PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
new PrunedTableFileCatalog(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
} else {
new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None)
}
}
override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
// `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
// of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
// implement `equals` and `hashCode` here, to make it work with cache lookup.
override def equals(o: Any): Boolean = o match {
case other: TableFileCatalog => this.table.identifier == other.table.identifier
case _ => false
}
override def hashCode(): Int = table.identifier.hashCode()
}
/**

View file

@ -226,12 +226,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
Some(partitionSchema))
val logicalRelation = cached.getOrElse {
val db = metastoreRelation.databaseName
val table = metastoreRelation.tableName
val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
val fileCatalog = {
val catalog = new TableFileCatalog(
sparkSession, db, table, Some(partitionSchema), sizeInBytes)
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
if (lazyPruningEnabled) {
catalog
} else {

View file

@ -19,12 +19,15 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils
@ -317,4 +320,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
sql("DROP TABLE cachedTable")
}
test("cache a table using TableFileCatalog") {
withTable("test") {
sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
location = tableFileCatalog,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
val plan = LogicalRelation(relation, catalogTable = Some(tableMeta))
spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan))
assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
val sameCatalog = new TableFileCatalog(spark, tableMeta, 0)
val sameRelation = HadoopFsRelation(
location = sameCatalog,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta))
assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined)
}
}
}

View file

@ -45,12 +45,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
|LOCATION '${dir.getAbsolutePath}'""".stripMargin)
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val tableFileCatalog = new TableFileCatalog(
spark,
tableMeta.database,
tableMeta.identifier.table,
Some(tableMeta.partitionSchema),
0)
val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)