[SPARK-14940][SQL] Move ExternalCatalog to own file

## What changes were proposed in this pull request?

`interfaces.scala` was getting big. This just moves the biggest class in there to a new file for cleanliness.

## How was this patch tested?

Just moving things around.

Author: Andrew Or <andrew@databricks.com>

Closes #12721 from andrewor14/move-external-catalog.
This commit is contained in:
Andrew Or 2016-04-27 14:17:36 -07:00 committed by Reynold Xin
parent 4672e9838b
commit 37575115b9
12 changed files with 210 additions and 188 deletions

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
/**

View file

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.AnalysisException
/**
* Interface for the system catalog (of columns, partitions, tables, and databases).
*
* This is only used for non-temporary items, and implementations must be thread-safe as they
* can be accessed in multiple threads. This is an external catalog because it is expected to
* interact with external systems.
*
* Implementations should throw [[AnalysisException]] when table or database don't exist.
*/
abstract class ExternalCatalog {
import CatalogTypes.TablePartitionSpec
protected def requireDbExists(db: String): Unit = {
if (!databaseExists(db)) {
throw new AnalysisException(s"Database '$db' does not exist")
}
}
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/**
* Alter a database whose name matches the one specified in `dbDefinition`,
* assuming the database exists.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterDatabase(dbDefinition: CatalogDatabase): Unit
def getDatabase(db: String): CatalogDatabase
def databaseExists(db: String): Boolean
def listDatabases(): Seq[String]
def listDatabases(pattern: String): Seq[String]
def setCurrentDatabase(db: String): Unit
// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------
def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
def renameTable(db: String, oldName: String, newName: String): Unit
/**
* Alter a table whose name that matches the one specified in `tableDefinition`,
* assuming the table exists.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterTable(db: String, tableDefinition: CatalogTable): Unit
def getTable(db: String, table: String): CatalogTable
def getTableOption(db: String, table: String): Option[CatalogTable]
def tableExists(db: String, table: String): Boolean
def listTables(db: String): Seq[String]
def listTables(db: String, pattern: String): Seq[String]
def loadTable(
db: String,
table: String,
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit
def loadPartition(
db: String,
table: String,
loadPath: String,
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
def createPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit
def dropPartitions(
db: String,
table: String,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit
/**
* Override the specs of one or many existing table partitions, assuming they exist.
* This assumes index i of `specs` corresponds to index i of `newSpecs`.
*/
def renamePartitions(
db: String,
table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit
/**
* Alter one or many table partitions whose specs that match those specified in `parts`,
* assuming the partitions exist.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition]): Unit
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
/**
* List the metadata of all partitions that belong to the specified table, assuming it exists.
*
* A partial partition spec may optionally be provided to filter the partitions returned.
* For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
* then a partial spec of (a='1') will return the first two only.
* @param db database name
* @param table table name
* @param partialSpec partition spec
*/
def listPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
def createFunction(db: String, funcDefinition: CatalogFunction): Unit
def dropFunction(db: String, funcName: String): Unit
def renameFunction(db: String, oldName: String, newName: String): Unit
def getFunction(db: String, funcName: String): CatalogFunction
def functionExists(db: String, funcName: String): Boolean
def listFunctions(db: String, pattern: String): Seq[String]
}

View file

@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils
* All public methods should be synchronized for thread-safety.
*/
class InMemoryCatalog extends ExternalCatalog {
import ExternalCatalog._
import CatalogTypes.TablePartitionSpec
private class TableDesc(var table: CatalogTable) {
val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]

View file

@ -45,7 +45,7 @@ class SessionCatalog(
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: CatalystConf) extends Logging {
import ExternalCatalog._
import CatalogTypes.TablePartitionSpec
def this(
externalCatalog: ExternalCatalog,

View file

@ -26,171 +26,6 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
/**
* Interface for the system catalog (of columns, partitions, tables, and databases).
*
* This is only used for non-temporary items, and implementations must be thread-safe as they
* can be accessed in multiple threads. This is an external catalog because it is expected to
* interact with external systems.
*
* Implementations should throw [[AnalysisException]] when table or database don't exist.
*/
abstract class ExternalCatalog {
import ExternalCatalog._
protected def requireDbExists(db: String): Unit = {
if (!databaseExists(db)) {
throw new AnalysisException(s"Database '$db' does not exist")
}
}
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/**
* Alter a database whose name matches the one specified in `dbDefinition`,
* assuming the database exists.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterDatabase(dbDefinition: CatalogDatabase): Unit
def getDatabase(db: String): CatalogDatabase
def databaseExists(db: String): Boolean
def listDatabases(): Seq[String]
def listDatabases(pattern: String): Seq[String]
def setCurrentDatabase(db: String): Unit
// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------
def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
def renameTable(db: String, oldName: String, newName: String): Unit
/**
* Alter a table whose name that matches the one specified in `tableDefinition`,
* assuming the table exists.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterTable(db: String, tableDefinition: CatalogTable): Unit
def getTable(db: String, table: String): CatalogTable
def getTableOption(db: String, table: String): Option[CatalogTable]
def tableExists(db: String, table: String): Boolean
def listTables(db: String): Seq[String]
def listTables(db: String, pattern: String): Seq[String]
def loadTable(
db: String,
table: String,
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit
def loadPartition(
db: String,
table: String,
loadPath: String,
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
def createPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit
def dropPartitions(
db: String,
table: String,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit
/**
* Override the specs of one or many existing table partitions, assuming they exist.
* This assumes index i of `specs` corresponds to index i of `newSpecs`.
*/
def renamePartitions(
db: String,
table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit
/**
* Alter one or many table partitions whose specs that match those specified in `parts`,
* assuming the partitions exist.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition]): Unit
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
/**
* List the metadata of all partitions that belong to the specified table, assuming it exists.
*
* A partial partition spec may optionally be provided to filter the partitions returned.
* For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
* then a partial spec of (a='1') will return the first two only.
* @param db database name
* @param table table name
* @param partialSpec partition spec
*/
def listPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
def createFunction(db: String, funcDefinition: CatalogFunction): Unit
def dropFunction(db: String, funcName: String): Unit
def renameFunction(db: String, oldName: String, newName: String): Unit
def getFunction(db: String, funcName: String): CatalogFunction
def functionExists(db: String, funcName: String): Boolean
def listFunctions(db: String, pattern: String): Seq[String]
}
/**
* A function defined in the catalog.
*
@ -235,7 +70,7 @@ case class CatalogColumn(
* @param storage storage format of the partition
*/
case class CatalogTablePartition(
spec: ExternalCatalog.TablePartitionSpec,
spec: CatalogTypes.TablePartitionSpec,
storage: CatalogStorageFormat)
@ -316,7 +151,7 @@ case class CatalogDatabase(
properties: Map[String, String])
object ExternalCatalog {
object CatalogTypes {
/**
* Specifications of a table partition. Mapping column name to column value.
*/

View file

@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical

View file

@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.types._

View file

@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog}
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType}
@ -156,7 +157,7 @@ case class LoadData(
path: String,
isLocal: Boolean,
isOverwrite: Boolean,
partition: Option[ExternalCatalog.TablePartitionSpec]) extends RunnableCommand {
partition: Option[TablePartitionSpec]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.test.SharedSQLContext
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {

View file

@ -36,7 +36,7 @@ import org.apache.spark.sql.hive.client.HiveClient
* All public methods must be synchronized for thread-safety.
*/
private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCatalog with Logging {
import ExternalCatalog._
import CatalogTypes.TablePartitionSpec
// Exceptions thrown by the hive client that we would like to wrap
private val clientExceptions = Set(

View file

@ -21,6 +21,7 @@ import java.io.PrintStream
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
@ -119,7 +120,7 @@ private[hive] trait HiveClient {
def dropPartitions(
db: String,
table: String,
specs: Seq[ExternalCatalog.TablePartitionSpec],
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit
/**
@ -128,8 +129,8 @@ private[hive] trait HiveClient {
def renamePartitions(
db: String,
table: String,
specs: Seq[ExternalCatalog.TablePartitionSpec],
newSpecs: Seq[ExternalCatalog.TablePartitionSpec]): Unit
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit
/**
* Alter one or more table partitions whose specs match the ones specified in `newParts`,
@ -144,7 +145,7 @@ private[hive] trait HiveClient {
final def getPartition(
dbName: String,
tableName: String,
spec: ExternalCatalog.TablePartitionSpec): CatalogTablePartition = {
spec: TablePartitionSpec): CatalogTablePartition = {
getPartitionOption(dbName, tableName, spec).getOrElse {
throw new NoSuchPartitionException(dbName, tableName, spec)
}
@ -154,14 +155,14 @@ private[hive] trait HiveClient {
final def getPartitionOption(
db: String,
table: String,
spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition] = {
spec: TablePartitionSpec): Option[CatalogTablePartition] = {
getPartitionOption(getTable(db, table), spec)
}
/** Returns the specified partition or None if it does not exist. */
def getPartitionOption(
table: CatalogTable,
spec: ExternalCatalog.TablePartitionSpec): Option[CatalogTablePartition]
spec: TablePartitionSpec): Option[CatalogTablePartition]
/**
* Returns the partitions for the given table that match the supplied partition spec.
@ -170,7 +171,7 @@ private[hive] trait HiveClient {
final def getPartitions(
db: String,
table: String,
partialSpec: Option[ExternalCatalog.TablePartitionSpec]): Seq[CatalogTablePartition] = {
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
getPartitions(getTable(db, table), partialSpec)
}
@ -180,7 +181,7 @@ private[hive] trait HiveClient {
*/
def getPartitions(
table: CatalogTable,
partialSpec: Option[ExternalCatalog.TablePartitionSpec] = None): Seq[CatalogTablePartition]
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
/** Returns partitions filtered by predicates for the given table. */
def getPartitionsByFilter(

View file

@ -40,7 +40,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.util.{CircularBuffer, Utils}
@ -375,7 +375,7 @@ private[hive] class HiveClientImpl(
override def dropPartitions(
db: String,
table: String,
specs: Seq[ExternalCatalog.TablePartitionSpec],
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = client.getTable(db, table, true /* throw exception */)
@ -399,8 +399,8 @@ private[hive] class HiveClientImpl(
override def renamePartitions(
db: String,
table: String,
specs: Seq[ExternalCatalog.TablePartitionSpec],
newSpecs: Seq[ExternalCatalog.TablePartitionSpec]): Unit = withHiveState {
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState {
require(specs.size == newSpecs.size, "number of old and new partition specs differ")
val catalogTable = getTable(db, table)
val hiveTable = toHiveTable(catalogTable)