[SPARK-13139][SQL] Parse Hive DDL commands ourselves

## What changes were proposed in this pull request?

This patch is ported over from viirya's changes in #11048. Currently for most DDLs we just pass the query text directly to Hive. Instead, we should parse these commands ourselves and in the future (not part of this patch) use the `HiveCatalog` to process these DDLs. This is a pretext to merging `SQLContext` and `HiveContext`.

Note: As of this patch we still pass the query text to Hive. The difference is that we now parse the commands ourselves so in the future we can just use our own catalog.

## How was this patch tested?

Jenkins, new `DDLCommandSuite`, which comprises of about 40% of the changes here.

Author: Andrew Or <andrew@databricks.com>

Closes #11573 from andrewor14/parser-plus-plus.
This commit is contained in:
Andrew Or 2016-03-11 15:13:48 -08:00 committed by Yin Huai
parent 42afd72c65
commit 66d9d0edfe
8 changed files with 1318 additions and 36 deletions

View file

@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.parser
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.types._
@ -29,6 +30,7 @@ import org.apache.spark.sql.types._
object ParserUtils {
object Token {
// Match on (text, children)
def unapply(node: ASTNode): Some[(String, List[ASTNode])] = {
CurrentOrigin.setPosition(node.line, node.positionInLine)
node.pattern
@ -160,7 +162,14 @@ object ParserUtils {
}
/**
* Throw an exception because we cannot parse the given node.
* Throw an exception because we cannot parse the given node for some unexpected reason.
*/
def parseFailed(msg: String, node: ASTNode): Nothing = {
throw new AnalysisException(s"$msg: '${node.source}")
}
/**
* Throw an exception because there are no rules to parse the node.
*/
def noParseRule(msg: String, node: ASTNode): Nothing = {
throw new NotImplementedError(

View file

@ -784,6 +784,15 @@ class SQLContext private[sql](
Dataset.newDataFrame(this, parseSql(sqlText))
}
/**
* Executes a SQL query without parsing it, but instead passing it directly to an underlying
* system to process. This is currently only used for Hive DDLs and will be removed as soon
* as Spark can parse all supported Hive DDLs itself.
*/
private[sql] def runNativeSql(sqlText: String): Seq[Row] = {
throw new UnsupportedOperationException
}
/**
* Returns the specified table as a [[DataFrame]].
*

View file

@ -29,7 +29,26 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
import ParserUtils._
/** Check if a command should not be explained. */
protected def isNoExplainCommand(command: String): Boolean = "TOK_DESCTABLE" == command
protected def isNoExplainCommand(command: String): Boolean = {
"TOK_DESCTABLE" == command || "TOK_ALTERTABLE" == command
}
/**
* For each node, extract properties in the form of a list ['key1', 'key2', 'key3', 'value']
* into a pair (key1.key2.key3, value).
*/
private def extractProps(
props: Seq[ASTNode],
expectedNodeText: String): Seq[(String, String)] = {
props.map {
case Token(x, keysAndValue) if x == expectedNodeText =>
val key = keysAndValue.init.map { x => unquoteString(x.text) }.mkString(".")
val value = unquoteString(keysAndValue.last.text)
(key, value)
case p =>
parseFailed(s"Expected property '$expectedNodeText' in command", p)
}
}
protected override def nodeToPlan(node: ASTNode): LogicalPlan = {
node match {
@ -64,10 +83,86 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
val tableIdent = extractTableIdent(nameParts)
RefreshTable(tableIdent)
// CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment]
// [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)];
case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) =>
val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq(
"TOK_IFNOTEXISTS",
"TOK_DATABASELOCATION",
"TOK_DATABASECOMMENT",
"TOK_DATABASEPROPERTIES"), args)
val location = dbLocation.map {
case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc)
case _ => parseFailed("Invalid CREATE DATABASE command", node)
}
val comment = databaseComment.map {
case Token("TOK_DATABASECOMMENT", Token(com, Nil) :: Nil) => unquoteString(com)
case _ => parseFailed("Invalid CREATE DATABASE command", node)
}
val props = dbprops.toSeq.flatMap {
case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) =>
extractProps(propList, "TOK_TABLEPROPERTY")
case _ => parseFailed("Invalid CREATE DATABASE command", node)
}.toMap
CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source)
// CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
// [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
case Token("TOK_CREATEFUNCTION", args) =>
// Example format:
//
// TOK_CREATEFUNCTION
// :- db_name
// :- func_name
// :- alias
// +- TOK_RESOURCE_LIST
// :- TOK_RESOURCE_URI
// : :- TOK_JAR
// : +- '/path/to/jar'
// +- TOK_RESOURCE_URI
// :- TOK_FILE
// +- 'path/to/file'
val (funcNameArgs, otherArgs) = args.partition {
case Token("TOK_RESOURCE_LIST", _) => false
case Token("TOK_TEMPORARY", _) => false
case Token(_, Nil) => true
case _ => parseFailed("Invalid CREATE FUNCTION command", node)
}
// If database name is specified, there are 3 tokens, otherwise 2.
val (funcName, alias) = funcNameArgs match {
case Token(dbName, Nil) :: Token(fname, Nil) :: Token(aname, Nil) :: Nil =>
(unquoteString(dbName) + "." + unquoteString(fname), unquoteString(aname))
case Token(fname, Nil) :: Token(aname, Nil) :: Nil =>
(unquoteString(fname), unquoteString(aname))
case _ =>
parseFailed("Invalid CREATE FUNCTION command", node)
}
// Extract other keywords, if they exist
val Seq(rList, temp) = getClauses(Seq("TOK_RESOURCE_LIST", "TOK_TEMPORARY"), otherArgs)
val resourcesMap = rList.toSeq.flatMap {
case Token("TOK_RESOURCE_LIST", resources) =>
resources.map {
case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) =>
val resourceType = rType match {
case Token("TOK_JAR", Nil) => "jar"
case Token("TOK_FILE", Nil) => "file"
case Token("TOK_ARCHIVE", Nil) => "archive"
case Token(f, _) => parseFailed(s"Unexpected resource format '$f'", node)
}
(resourceType, unquoteString(rPath))
case _ => parseFailed("Invalid CREATE FUNCTION command", node)
}
case _ => parseFailed("Invalid CREATE FUNCTION command", node)
}.toMap
CreateFunction(funcName, alias, resourcesMap, temp.isDefined)(node.source)
case Token("TOK_ALTERTABLE", alterTableArgs) =>
AlterTableCommandParser.parse(node)
case Token("TOK_CREATETABLEUSING", createTableArgs) =>
val Seq(
temp,
allowExisting,
ifNotExists,
Some(tabName),
tableCols,
Some(Token("TOK_TABLEPROVIDER", providerNameParts)),
@ -79,30 +174,22 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
"TOK_TABLEPROVIDER",
"TOK_TABLEOPTIONS",
"TOK_QUERY"), createTableArgs)
val tableIdent: TableIdentifier = extractTableIdent(tabName)
val columns = tableCols.map {
case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField))
case _ => parseFailed("Invalid CREATE TABLE command", node)
}
val provider = providerNameParts.map {
case Token(name, Nil) => name
case _ => parseFailed("Invalid CREATE TABLE command", node)
}.mkString(".")
val options: Map[String, String] = tableOpts.toSeq.flatMap {
case Token("TOK_TABLEOPTIONS", options) =>
options.map {
case Token("TOK_TABLEOPTION", keysAndValue) =>
val key = keysAndValue.init.map(_.text).mkString(".")
val value = unquoteString(keysAndValue.last.text)
(key, value)
}
val options = tableOpts.toSeq.flatMap {
case Token("TOK_TABLEOPTIONS", opts) => extractProps(opts, "TOK_TABLEOPTION")
case _ => parseFailed("Invalid CREATE TABLE command", node)
}.toMap
val asClause = tableAs.map(nodeToPlan)
val asClause = tableAs.map(nodeToPlan(_))
if (temp.isDefined && allowExisting.isDefined) {
if (temp.isDefined && ifNotExists.isDefined) {
throw new AnalysisException(
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}
@ -113,7 +200,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}
val mode = if (allowExisting.isDefined) {
val mode = if (ifNotExists.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
@ -136,7 +223,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
provider,
temp.isDefined,
options,
allowExisting.isDefined,
ifNotExists.isDefined,
managedIfNoPath = false)
}

View file

@ -0,0 +1,428 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, SortDirection}
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.StructType
/**
* Helper object to parse alter table commands.
*/
object AlterTableCommandParser {
import ParserUtils._
/**
* Parse the given node assuming it is an alter table command.
*/
def parse(node: ASTNode): LogicalPlan = {
node.children match {
case (tabName @ Token("TOK_TABNAME", _)) :: otherNodes =>
val tableIdent = extractTableIdent(tabName)
val partSpec = getClauseOption("TOK_PARTSPEC", node.children).map(parsePartitionSpec)
matchAlterTableCommands(node, otherNodes, tableIdent, partSpec)
case _ =>
parseFailed("Could not parse ALTER TABLE command", node)
}
}
private def cleanAndUnquoteString(s: String): String = {
cleanIdentifier(unquoteString(s))
}
/**
* Extract partition spec from the given [[ASTNode]] as a map, assuming it exists.
*
* Expected format:
* +- TOK_PARTSPEC
* :- TOK_PARTVAL
* : :- dt
* : +- '2008-08-08'
* +- TOK_PARTVAL
* :- country
* +- 'us'
*/
private def parsePartitionSpec(node: ASTNode): Map[String, String] = {
node match {
case Token("TOK_PARTSPEC", partitions) =>
partitions.map {
// Note: sometimes there's a "=", "<" or ">" between the key and the value
case Token("TOK_PARTVAL", ident :: conj :: constant :: Nil) =>
(cleanAndUnquoteString(ident.text), cleanAndUnquoteString(constant.text))
case Token("TOK_PARTVAL", ident :: constant :: Nil) =>
(cleanAndUnquoteString(ident.text), cleanAndUnquoteString(constant.text))
case Token("TOK_PARTVAL", ident :: Nil) =>
(cleanAndUnquoteString(ident.text), null)
case _ =>
parseFailed("Invalid ALTER TABLE command", node)
}.toMap
case _ =>
parseFailed("Expected partition spec in ALTER TABLE command", node)
}
}
/**
* Extract table properties from the given [[ASTNode]] as a map, assuming it exists.
*
* Expected format:
* +- TOK_TABLEPROPERTIES
* +- TOK_TABLEPROPLIST
* :- TOK_TABLEPROPERTY
* : :- 'test'
* : +- 'value'
* +- TOK_TABLEPROPERTY
* :- 'comment'
* +- 'new_comment'
*/
private def extractTableProps(node: ASTNode): Map[String, String] = {
node match {
case Token("TOK_TABLEPROPERTIES", propsList) =>
propsList.flatMap {
case Token("TOK_TABLEPROPLIST", props) =>
props.map { case Token("TOK_TABLEPROPERTY", key :: value :: Nil) =>
val k = cleanAndUnquoteString(key.text)
val v = value match {
case Token("TOK_NULL", Nil) => null
case _ => cleanAndUnquoteString(value.text)
}
(k, v)
}
case _ =>
parseFailed("Invalid ALTER TABLE command", node)
}.toMap
case _ =>
parseFailed("Expected table properties in ALTER TABLE command", node)
}
}
/**
* Parse an alter table command from a [[ASTNode]] into a [[LogicalPlan]].
* This follows https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL.
*
* @param node the original [[ASTNode]] to parse.
* @param otherNodes the other [[ASTNode]]s after the first one containing the table name.
* @param tableIdent identifier of the table, parsed from the first [[ASTNode]].
* @param partition spec identifying the partition this command is concerned with, if any.
*/
// TODO: This method is massive. Break it down.
private def matchAlterTableCommands(
node: ASTNode,
otherNodes: Seq[ASTNode],
tableIdent: TableIdentifier,
partition: Option[TablePartitionSpec]): LogicalPlan = {
otherNodes match {
// ALTER TABLE table_name RENAME TO new_table_name;
case Token("TOK_ALTERTABLE_RENAME", renameArgs) :: _ =>
val tableNameClause = getClause("TOK_TABNAME", renameArgs)
val newTableIdent = extractTableIdent(tableNameClause)
AlterTableRename(tableIdent, newTableIdent)(node.source)
// ALTER TABLE table_name SET TBLPROPERTIES ('comment' = new_comment);
case Token("TOK_ALTERTABLE_PROPERTIES", args) :: _ =>
val properties = extractTableProps(args.head)
AlterTableSetProperties(tableIdent, properties)(node.source)
// ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
case Token("TOK_ALTERTABLE_DROPPROPERTIES", args) :: _ =>
val properties = extractTableProps(args.head)
val ifExists = getClauseOption("TOK_IFEXISTS", args).isDefined
AlterTableUnsetProperties(tableIdent, properties, ifExists)(node.source)
// ALTER TABLE table_name [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
case Token("TOK_ALTERTABLE_SERIALIZER", Token(serdeClassName, Nil) :: serdeArgs) :: _ =>
AlterTableSerDeProperties(
tableIdent,
Some(cleanAndUnquoteString(serdeClassName)),
serdeArgs.headOption.map(extractTableProps),
partition)(node.source)
// ALTER TABLE table_name [PARTITION spec] SET SERDEPROPERTIES serde_properties;
case Token("TOK_ALTERTABLE_SERDEPROPERTIES", args) :: _ =>
AlterTableSerDeProperties(
tableIdent,
None,
Some(extractTableProps(args.head)),
partition)(node.source)
// ALTER TABLE table_name CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS;
case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_ALTERTABLE_BUCKETS", b) :: Nil) :: _ =>
val clusterCols: Seq[String] = b.head match {
case Token("TOK_TABCOLNAME", children) => children.map(_.text)
case _ => parseFailed("Invalid ALTER TABLE command", node)
}
// If sort columns are specified, num buckets should be the third arg.
// If sort columns are not specified, num buckets should be the second arg.
// TODO: actually use `sortDirections` once we actually store that in the metastore
val (sortCols: Seq[String], sortDirections: Seq[SortDirection], numBuckets: Int) = {
b.tail match {
case Token("TOK_TABCOLNAME", children) :: numBucketsNode :: Nil =>
val (cols, directions) = children.map {
case Token("TOK_TABSORTCOLNAMEASC", Token(col, Nil) :: Nil) => (col, Ascending)
case Token("TOK_TABSORTCOLNAMEDESC", Token(col, Nil) :: Nil) => (col, Descending)
}.unzip
(cols, directions, numBucketsNode.text.toInt)
case numBucketsNode :: Nil =>
(Nil, Nil, numBucketsNode.text.toInt)
case _ =>
parseFailed("Invalid ALTER TABLE command", node)
}
}
AlterTableStorageProperties(
tableIdent,
BucketSpec(numBuckets, clusterCols, sortCols))(node.source)
// ALTER TABLE table_name NOT CLUSTERED
case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_NOT_CLUSTERED", Nil) :: Nil) :: _ =>
AlterTableNotClustered(tableIdent)(node.source)
// ALTER TABLE table_name NOT SORTED
case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_NOT_SORTED", Nil) :: Nil) :: _ =>
AlterTableNotSorted(tableIdent)(node.source)
// ALTER TABLE table_name SKEWED BY (col1, col2)
// ON ((col1_value, col2_value) [, (col1_value, col2_value), ...])
// [STORED AS DIRECTORIES];
case Token("TOK_ALTERTABLE_SKEWED",
Token("TOK_TABLESKEWED",
Token("TOK_TABCOLNAME", colNames) :: colValues :: rest) :: Nil) :: _ =>
// Example format:
//
// +- TOK_ALTERTABLE_SKEWED
// :- TOK_TABLESKEWED
// : :- TOK_TABCOLNAME
// : : :- dt
// : : +- country
// :- TOK_TABCOLVALUE_PAIR
// : :- TOK_TABCOLVALUES
// : : :- TOK_TABCOLVALUE
// : : : :- '2008-08-08'
// : : : +- 'us'
// : :- TOK_TABCOLVALUES
// : : :- TOK_TABCOLVALUE
// : : : :- '2009-09-09'
// : : : +- 'uk'
// +- TOK_STOREASDIR
val names = colNames.map { n => cleanAndUnquoteString(n.text) }
val values = colValues match {
case Token("TOK_TABCOLVALUE", vals) =>
Seq(vals.map { n => cleanAndUnquoteString(n.text) })
case Token("TOK_TABCOLVALUE_PAIR", pairs) =>
pairs.map {
case Token("TOK_TABCOLVALUES", Token("TOK_TABCOLVALUE", vals) :: Nil) =>
vals.map { n => cleanAndUnquoteString(n.text) }
case _ =>
parseFailed("Invalid ALTER TABLE command", node)
}
case _ =>
parseFailed("Invalid ALTER TABLE command", node)
}
val storedAsDirs = rest match {
case Token("TOK_STOREDASDIRS", Nil) :: Nil => true
case _ => false
}
AlterTableSkewed(
tableIdent,
names,
values,
storedAsDirs)(node.source)
// ALTER TABLE table_name NOT SKEWED
case Token("TOK_ALTERTABLE_SKEWED", Nil) :: _ =>
AlterTableNotSkewed(tableIdent)(node.source)
// ALTER TABLE table_name NOT STORED AS DIRECTORIES
case Token("TOK_ALTERTABLE_SKEWED", Token("TOK_STOREDASDIRS", Nil) :: Nil) :: _ =>
AlterTableNotStoredAsDirs(tableIdent)(node.source)
// ALTER TABLE table_name SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] );
case Token("TOK_ALTERTABLE_SKEWED_LOCATION",
Token("TOK_SKEWED_LOCATIONS",
Token("TOK_SKEWED_LOCATION_LIST", locationMaps) :: Nil) :: Nil) :: _ =>
// Expected format:
//
// +- TOK_ALTERTABLE_SKEWED_LOCATION
// +- TOK_SKEWED_LOCATIONS
// +- TOK_SKEWED_LOCATION_LIST
// :- TOK_SKEWED_LOCATION_MAP
// : :- 'col1'
// : +- 'loc1'
// +- TOK_SKEWED_LOCATION_MAP
// :- TOK_TABCOLVALUES
// : +- TOK_TABCOLVALUE
// : :- 'col2'
// : +- 'col3'
// +- 'loc2'
val skewedMaps = locationMaps.flatMap {
case Token("TOK_SKEWED_LOCATION_MAP", col :: loc :: Nil) =>
col match {
case Token(const, Nil) =>
Seq((cleanAndUnquoteString(const), cleanAndUnquoteString(loc.text)))
case Token("TOK_TABCOLVALUES", Token("TOK_TABCOLVALUE", keys) :: Nil) =>
keys.map { k => (cleanAndUnquoteString(k.text), cleanAndUnquoteString(loc.text)) }
}
case _ =>
parseFailed("Invalid ALTER TABLE command", node)
}.toMap
AlterTableSkewedLocation(tableIdent, skewedMaps)(node.source)
// ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
// spec [LOCATION 'loc2'] ...;
case Token("TOK_ALTERTABLE_ADDPARTS", args) :: _ =>
val (ifNotExists, parts) = args.head match {
case Token("TOK_IFNOTEXISTS", Nil) => (true, args.tail)
case _ => (false, args)
}
// List of (spec, location) to describe partitions to add
// Each partition spec may or may not be followed by a location
val parsedParts = new ArrayBuffer[(TablePartitionSpec, Option[String])]
parts.foreach {
case t @ Token("TOK_PARTSPEC", _) =>
parsedParts += ((parsePartitionSpec(t), None))
case Token("TOK_PARTITIONLOCATION", loc :: Nil) =>
// Update the location of the last partition we just added
if (parsedParts.nonEmpty) {
val (spec, _) = parsedParts.remove(parsedParts.length - 1)
parsedParts += ((spec, Some(unquoteString(loc.text))))
}
case _ =>
parseFailed("Invalid ALTER TABLE command", node)
}
AlterTableAddPartition(tableIdent, parsedParts, ifNotExists)(node.source)
// ALTER TABLE table_name PARTITION spec1 RENAME TO PARTITION spec2;
case Token("TOK_ALTERTABLE_RENAMEPART", spec :: Nil) :: _ =>
val newPartition = parsePartitionSpec(spec)
val oldPartition = partition.getOrElse {
parseFailed("Expected old partition spec in ALTER TABLE rename partition command", node)
}
AlterTableRenamePartition(tableIdent, oldPartition, newPartition)(node.source)
// ALTER TABLE table_name_1 EXCHANGE PARTITION spec WITH TABLE table_name_2;
case Token("TOK_ALTERTABLE_EXCHANGEPARTITION", spec :: newTable :: Nil) :: _ =>
val parsedSpec = parsePartitionSpec(spec)
val newTableIdent = extractTableIdent(newTable)
AlterTableExchangePartition(tableIdent, newTableIdent, parsedSpec)(node.source)
// ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
case Token("TOK_ALTERTABLE_DROPPARTS", args) :: _ =>
val parts = args.collect { case p @ Token("TOK_PARTSPEC", _) => parsePartitionSpec(p) }
val ifExists = getClauseOption("TOK_IFEXISTS", args).isDefined
val purge = getClauseOption("PURGE", args).isDefined
AlterTableDropPartition(tableIdent, parts, ifExists, purge)(node.source)
// ALTER TABLE table_name ARCHIVE PARTITION spec;
case Token("TOK_ALTERTABLE_ARCHIVE", spec :: Nil) :: _ =>
AlterTableArchivePartition(tableIdent, parsePartitionSpec(spec))(node.source)
// ALTER TABLE table_name UNARCHIVE PARTITION spec;
case Token("TOK_ALTERTABLE_UNARCHIVE", spec :: Nil) :: _ =>
AlterTableUnarchivePartition(tableIdent, parsePartitionSpec(spec))(node.source)
// ALTER TABLE table_name [PARTITION spec] SET FILEFORMAT file_format;
case Token("TOK_ALTERTABLE_FILEFORMAT", args) :: _ =>
val Seq(fileFormat, genericFormat) =
getClauses(Seq("TOK_TABLEFILEFORMAT", "TOK_FILEFORMAT_GENERIC"), args)
// Note: the AST doesn't contain information about which file format is being set here.
// E.g. we can't differentiate between INPUTFORMAT and OUTPUTFORMAT if either is set.
// Right now this just stores the values, but we should figure out how to get the keys.
val fFormat = fileFormat
.map { _.children.map { n => cleanAndUnquoteString(n.text) }}
.getOrElse(Seq())
val gFormat = genericFormat.map { f => cleanAndUnquoteString(f.children(0).text) }
AlterTableSetFileFormat(tableIdent, partition, fFormat, gFormat)(node.source)
// ALTER TABLE table_name [PARTITION spec] SET LOCATION "loc";
case Token("TOK_ALTERTABLE_LOCATION", Token(loc, Nil) :: Nil) :: _ =>
AlterTableSetLocation(tableIdent, partition, cleanAndUnquoteString(loc))(node.source)
// ALTER TABLE table_name TOUCH [PARTITION spec];
case Token("TOK_ALTERTABLE_TOUCH", args) :: _ =>
// Note: the partition spec, if it exists, comes after TOUCH, so `partition` should
// always be None here. Instead, we need to parse it from the TOUCH node's children.
val part = getClauseOption("TOK_PARTSPEC", args).map(parsePartitionSpec)
AlterTableTouch(tableIdent, part)(node.source)
// ALTER TABLE table_name [PARTITION spec] COMPACT 'compaction_type';
case Token("TOK_ALTERTABLE_COMPACT", Token(compactType, Nil) :: Nil) :: _ =>
AlterTableCompact(tableIdent, partition, cleanAndUnquoteString(compactType))(node.source)
// ALTER TABLE table_name [PARTITION spec] CONCATENATE;
case Token("TOK_ALTERTABLE_MERGEFILES", _) :: _ =>
AlterTableMerge(tableIdent, partition)(node.source)
// ALTER TABLE table_name [PARTITION spec] CHANGE [COLUMN] col_old_name col_new_name
// column_type [COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT];
case Token("TOK_ALTERTABLE_RENAMECOL", oldName :: newName :: dataType :: args) :: _ =>
val afterColName: Option[String] =
getClauseOption("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", args).map { ap =>
ap.children match {
case Token(col, Nil) :: Nil => col
case _ => parseFailed("Invalid ALTER TABLE command", node)
}
}
val restrict = getClauseOption("TOK_RESTRICT", args).isDefined
val cascade = getClauseOption("TOK_CASCADE", args).isDefined
val comment = args.headOption.map {
case Token("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", _) => null
case Token("TOK_RESTRICT", _) => null
case Token("TOK_CASCADE", _) => null
case Token(commentStr, Nil) => cleanAndUnquoteString(commentStr)
case _ => parseFailed("Invalid ALTER TABLE command", node)
}
AlterTableChangeCol(
tableIdent,
partition,
oldName.text,
newName.text,
nodeToDataType(dataType),
comment,
afterColName,
restrict,
cascade)(node.source)
// ALTER TABLE table_name [PARTITION spec] ADD COLUMNS (name type [COMMENT comment], ...)
// [CASCADE|RESTRICT]
case Token("TOK_ALTERTABLE_ADDCOLS", args) :: _ =>
val columnNodes = getClause("TOK_TABCOLLIST", args).children
val columns = StructType(columnNodes.map(nodeToStructField))
val restrict = getClauseOption("TOK_RESTRICT", args).isDefined
val cascade = getClauseOption("TOK_CASCADE", args).isDefined
AlterTableAddCol(tableIdent, partition, columns, restrict, cascade)(node.source)
// ALTER TABLE table_name [PARTITION spec] REPLACE COLUMNS (name type [COMMENT comment], ...)
// [CASCADE|RESTRICT]
case Token("TOK_ALTERTABLE_REPLACECOLS", args) :: _ =>
val columnNodes = getClause("TOK_TABCOLLIST", args).children
val columns = StructType(columnNodes.map(nodeToStructField))
val restrict = getClauseOption("TOK_RESTRICT", args).isDefined
val cascade = getClauseOption("TOK_CASCADE", args).isDefined
AlterTableReplaceCol(tableIdent, partition, columns, restrict, cascade)(node.source)
case _ =>
parseFailed("Unsupported ALTER TABLE command", node)
}
}
}

View file

@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import org.apache.spark.Logging
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._
// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
/**
* A DDL command expected to be parsed and run in an underlying system instead of in Spark.
*/
abstract class NativeDDLCommand(val sql: String) extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
sqlContext.runNativeSql(sql)
}
override val output: Seq[Attribute] = {
Seq(AttributeReference("result", StringType, nullable = false)())
}
}
case class CreateDatabase(
databaseName: String,
ifNotExists: Boolean,
path: Option[String],
comment: Option[String],
props: Map[String, String])(sql: String)
extends NativeDDLCommand(sql) with Logging
case class CreateFunction(
functionName: String,
alias: String,
resourcesMap: Map[String, String],
isTemp: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableRename(
oldName: TableIdentifier,
newName: TableIdentifier)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableSetProperties(
tableName: TableIdentifier,
properties: Map[String, String])(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableUnsetProperties(
tableName: TableIdentifier,
properties: Map[String, String],
ifExists: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableSerDeProperties(
tableName: TableIdentifier,
serdeClassName: Option[String],
serdeProperties: Option[Map[String, String]],
partition: Option[Map[String, String]])(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableStorageProperties(
tableName: TableIdentifier,
buckets: BucketSpec)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableNotClustered(
tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
case class AlterTableNotSorted(
tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
case class AlterTableSkewed(
tableName: TableIdentifier,
// e.g. (dt, country)
skewedCols: Seq[String],
// e.g. ('2008-08-08', 'us), ('2009-09-09', 'uk')
skewedValues: Seq[Seq[String]],
storedAsDirs: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging {
require(skewedValues.forall(_.size == skewedCols.size),
"number of columns in skewed values do not match number of skewed columns provided")
}
case class AlterTableNotSkewed(
tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
case class AlterTableNotStoredAsDirs(
tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
case class AlterTableSkewedLocation(
tableName: TableIdentifier,
skewedMap: Map[String, String])(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableAddPartition(
tableName: TableIdentifier,
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
ifNotExists: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableRenamePartition(
tableName: TableIdentifier,
oldPartition: TablePartitionSpec,
newPartition: TablePartitionSpec)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableExchangePartition(
fromTableName: TableIdentifier,
toTableName: TableIdentifier,
spec: TablePartitionSpec)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableDropPartition(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableArchivePartition(
tableName: TableIdentifier,
spec: TablePartitionSpec)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableUnarchivePartition(
tableName: TableIdentifier,
spec: TablePartitionSpec)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableSetFileFormat(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec],
fileFormat: Seq[String],
genericFormat: Option[String])(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableSetLocation(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec],
location: String)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableTouch(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec])(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableCompact(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec],
compactType: String)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableMerge(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec])(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableChangeCol(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec],
oldColName: String,
newColName: String,
dataType: DataType,
comment: Option[String],
afterColName: Option[String],
restrict: Boolean,
cascade: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableAddCol(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec],
columns: StructType,
restrict: Boolean,
cascade: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging
case class AlterTableReplaceCol(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec],
columns: StructType,
restrict: Boolean,
cascade: Boolean)(sql: String)
extends NativeDDLCommand(sql) with Logging

View file

@ -0,0 +1,544 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.execution.SparkQl
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._
class DDLCommandSuite extends PlanTest {
private val parser = new SparkQl
test("create database") {
val sql =
"""
|CREATE DATABASE IF NOT EXISTS database_name
|COMMENT 'database_comment' LOCATION '/home/user/db'
|WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
""".stripMargin
val parsed = parser.parsePlan(sql)
val expected = CreateDatabase(
"database_name",
ifNotExists = true,
Some("/home/user/db"),
Some("database_comment"),
Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql)
comparePlans(parsed, expected)
}
test("create function") {
val sql1 =
"""
|CREATE TEMPORARY FUNCTION helloworld as
|'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar',
|FILE 'path/to/file'
""".stripMargin
val sql2 =
"""
|CREATE FUNCTION hello.world as
|'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
|FILE 'path/to/file'
""".stripMargin
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val expected1 = CreateFunction(
"helloworld",
"com.matthewrathbone.example.SimpleUDFExample",
Map("jar" -> "/path/to/jar", "file" -> "path/to/file"),
isTemp = true)(sql1)
val expected2 = CreateFunction(
"hello.world",
"com.matthewrathbone.example.SimpleUDFExample",
Map("archive" -> "/path/to/archive", "file" -> "path/to/file"),
isTemp = false)(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
test("alter table: rename table") {
val sql = "ALTER TABLE table_name RENAME TO new_table_name"
val parsed = parser.parsePlan(sql)
val expected = AlterTableRename(
TableIdentifier("table_name", None),
TableIdentifier("new_table_name", None))(sql)
comparePlans(parsed, expected)
}
test("alter table: alter table properties") {
val sql1 = "ALTER TABLE table_name SET TBLPROPERTIES ('test' = 'test', " +
"'comment' = 'new_comment')"
val sql2 = "ALTER TABLE table_name UNSET TBLPROPERTIES ('comment', 'test')"
val sql3 = "ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'test')"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSetProperties(
tableIdent, Map("test" -> "test", "comment" -> "new_comment"))(sql1)
val expected2 = AlterTableUnsetProperties(
tableIdent, Map("comment" -> null, "test" -> null), ifExists = false)(sql2)
val expected3 = AlterTableUnsetProperties(
tableIdent, Map("comment" -> null, "test" -> null), ifExists = true)(sql3)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
}
test("alter table: SerDe properties") {
val sql1 = "ALTER TABLE table_name SET SERDE 'org.apache.class'"
val sql2 =
"""
|ALTER TABLE table_name SET SERDE 'org.apache.class'
|WITH SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',')
""".stripMargin
val sql3 =
"""
|ALTER TABLE table_name SET SERDEPROPERTIES ('columns'='foo,bar',
|'field.delim' = ',')
""".stripMargin
val sql4 =
"""
|ALTER TABLE table_name PARTITION (test, dt='2008-08-08',
|country='us') SET SERDE 'org.apache.class' WITH SERDEPROPERTIES ('columns'='foo,bar',
|'field.delim' = ',')
""".stripMargin
val sql5 =
"""
|ALTER TABLE table_name PARTITION (test, dt='2008-08-08',
|country='us') SET SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',')
""".stripMargin
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val parsed4 = parser.parsePlan(sql4)
val parsed5 = parser.parsePlan(sql5)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSerDeProperties(
tableIdent, Some("org.apache.class"), None, None)(sql1)
val expected2 = AlterTableSerDeProperties(
tableIdent,
Some("org.apache.class"),
Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
None)(sql2)
val expected3 = AlterTableSerDeProperties(
tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)(sql3)
val expected4 = AlterTableSerDeProperties(
tableIdent,
Some("org.apache.class"),
Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql4)
val expected5 = AlterTableSerDeProperties(
tableIdent,
None,
Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql5)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
comparePlans(parsed5, expected5)
}
test("alter table: storage properties") {
val sql1 = "ALTER TABLE table_name CLUSTERED BY (dt, country) INTO 10 BUCKETS"
val sql2 = "ALTER TABLE table_name CLUSTERED BY (dt, country) SORTED BY " +
"(dt, country DESC) INTO 10 BUCKETS"
val sql3 = "ALTER TABLE table_name NOT CLUSTERED"
val sql4 = "ALTER TABLE table_name NOT SORTED"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val parsed4 = parser.parsePlan(sql4)
val tableIdent = TableIdentifier("table_name", None)
val cols = List("dt", "country")
// TODO: also test the sort directions once we keep track of that
val expected1 = AlterTableStorageProperties(
tableIdent, BucketSpec(10, cols, Nil))(sql1)
val expected2 = AlterTableStorageProperties(
tableIdent, BucketSpec(10, cols, cols))(sql2)
val expected3 = AlterTableNotClustered(tableIdent)(sql3)
val expected4 = AlterTableNotSorted(tableIdent)(sql4)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
}
test("alter table: skewed") {
val sql1 =
"""
|ALTER TABLE table_name SKEWED BY (dt, country) ON
|(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn')) STORED AS DIRECTORIES
""".stripMargin
val sql2 =
"""
|ALTER TABLE table_name SKEWED BY (dt, country) ON
|('2008-08-08', 'us') STORED AS DIRECTORIES
""".stripMargin
val sql3 =
"""
|ALTER TABLE table_name SKEWED BY (dt, country) ON
|(('2008-08-08', 'us'), ('2009-09-09', 'uk'))
""".stripMargin
val sql4 = "ALTER TABLE table_name NOT SKEWED"
val sql5 = "ALTER TABLE table_name NOT STORED AS DIRECTORIES"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val parsed4 = parser.parsePlan(sql4)
val parsed5 = parser.parsePlan(sql5)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSkewed(
tableIdent,
Seq("dt", "country"),
Seq(List("2008-08-08", "us"), List("2009-09-09", "uk"), List("2010-10-10", "cn")),
storedAsDirs = true)(sql1)
val expected2 = AlterTableSkewed(
tableIdent,
Seq("dt", "country"),
Seq(List("2008-08-08", "us")),
storedAsDirs = true)(sql2)
val expected3 = AlterTableSkewed(
tableIdent,
Seq("dt", "country"),
Seq(List("2008-08-08", "us"), List("2009-09-09", "uk")),
storedAsDirs = false)(sql3)
val expected4 = AlterTableNotSkewed(tableIdent)(sql4)
val expected5 = AlterTableNotStoredAsDirs(tableIdent)(sql5)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
comparePlans(parsed5, expected5)
}
test("alter table: skewed location") {
val sql1 =
"""
|ALTER TABLE table_name SET SKEWED LOCATION
|('123'='location1', 'test'='location2')
""".stripMargin
val sql2 =
"""
|ALTER TABLE table_name SET SKEWED LOCATION
|(('2008-08-08', 'us')='location1', 'test'='location2')
""".stripMargin
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSkewedLocation(
tableIdent,
Map("123" -> "location1", "test" -> "location2"))(sql1)
val expected2 = AlterTableSkewedLocation(
tableIdent,
Map("2008-08-08" -> "location1", "us" -> "location1", "test" -> "location2"))(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
test("alter table: add partition") {
val sql =
"""
|ALTER TABLE table_name ADD IF NOT EXISTS PARTITION
|(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION
|(dt='2009-09-09', country='uk')
""".stripMargin
val parsed = parser.parsePlan(sql)
val expected = AlterTableAddPartition(
TableIdentifier("table_name", None),
Seq(
(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
(Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
ifNotExists = true)(sql)
comparePlans(parsed, expected)
}
test("alter table: rename partition") {
val sql =
"""
|ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
|RENAME TO PARTITION (dt='2008-09-09', country='uk')
""".stripMargin
val parsed = parser.parsePlan(sql)
val expected = AlterTableRenamePartition(
TableIdentifier("table_name", None),
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2008-09-09", "country" -> "uk"))(sql)
comparePlans(parsed, expected)
}
test("alter table: exchange partition") {
val sql =
"""
|ALTER TABLE table_name_1 EXCHANGE PARTITION
|(dt='2008-08-08', country='us') WITH TABLE table_name_2
""".stripMargin
val parsed = parser.parsePlan(sql)
val expected = AlterTableExchangePartition(
TableIdentifier("table_name_1", None),
TableIdentifier("table_name_2", None),
Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
comparePlans(parsed, expected)
}
test("alter table: drop partitions") {
val sql1 =
"""
|ALTER TABLE table_name DROP IF EXISTS PARTITION
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
""".stripMargin
val sql2 =
"""
|ALTER TABLE table_name DROP PARTITION
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk') PURGE
""".stripMargin
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableDropPartition(
tableIdent,
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
ifExists = true,
purge = false)(sql1)
val expected2 = AlterTableDropPartition(
tableIdent,
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
ifExists = false,
purge = true)(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
test("alter table: archive partition") {
val sql = "ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')"
val parsed = parser.parsePlan(sql)
val expected = AlterTableArchivePartition(
TableIdentifier("table_name", None),
Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
comparePlans(parsed, expected)
}
test("alter table: unarchive partition") {
val sql = "ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')"
val parsed = parser.parsePlan(sql)
val expected = AlterTableUnarchivePartition(
TableIdentifier("table_name", None),
Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
comparePlans(parsed, expected)
}
test("alter table: set file format") {
val sql1 =
"""
|ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test'
|OUTPUTFORMAT 'test' SERDE 'test' INPUTDRIVER 'test' OUTPUTDRIVER 'test'
""".stripMargin
val sql2 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
"OUTPUTFORMAT 'test' SERDE 'test'"
val sql3 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
"SET FILEFORMAT PARQUET"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSetFileFormat(
tableIdent,
None,
List("test", "test", "test", "test", "test"),
None)(sql1)
val expected2 = AlterTableSetFileFormat(
tableIdent,
None,
List("test", "test", "test"),
None)(sql2)
val expected3 = AlterTableSetFileFormat(
tableIdent,
Some(Map("dt" -> "2008-08-08", "country" -> "us")),
Seq(),
Some("PARQUET"))(sql3)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
}
test("alter table: set location") {
val sql1 = "ALTER TABLE table_name SET LOCATION 'new location'"
val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
"SET LOCATION 'new location'"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSetLocation(
tableIdent,
None,
"new location")(sql1)
val expected2 = AlterTableSetLocation(
tableIdent,
Some(Map("dt" -> "2008-08-08", "country" -> "us")),
"new location")(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
test("alter table: touch") {
val sql1 = "ALTER TABLE table_name TOUCH"
val sql2 = "ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableTouch(
tableIdent,
None)(sql1)
val expected2 = AlterTableTouch(
tableIdent,
Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
test("alter table: compact") {
val sql1 = "ALTER TABLE table_name COMPACT 'compaction_type'"
val sql2 =
"""
|ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
|COMPACT 'MAJOR'
""".stripMargin
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableCompact(
tableIdent,
None,
"compaction_type")(sql1)
val expected2 = AlterTableCompact(
tableIdent,
Some(Map("dt" -> "2008-08-08", "country" -> "us")),
"MAJOR")(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
test("alter table: concatenate") {
val sql1 = "ALTER TABLE table_name CONCATENATE"
val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableMerge(tableIdent, None)(sql1)
val expected2 = AlterTableMerge(
tableIdent, Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
test("alter table: change column name/type/position/comment") {
val sql1 = "ALTER TABLE table_name CHANGE col_old_name col_new_name INT"
val sql2 =
"""
|ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT
|COMMENT 'col_comment' FIRST CASCADE
""".stripMargin
val sql3 =
"""
|ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT
|COMMENT 'col_comment' AFTER column_name RESTRICT
""".stripMargin
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableChangeCol(
tableName = tableIdent,
partitionSpec = None,
oldColName = "col_old_name",
newColName = "col_new_name",
dataType = IntegerType,
comment = None,
afterColName = None,
restrict = false,
cascade = false)(sql1)
val expected2 = AlterTableChangeCol(
tableName = tableIdent,
partitionSpec = None,
oldColName = "col_old_name",
newColName = "col_new_name",
dataType = IntegerType,
comment = Some("col_comment"),
afterColName = None,
restrict = false,
cascade = true)(sql2)
val expected3 = AlterTableChangeCol(
tableName = tableIdent,
partitionSpec = None,
oldColName = "col_old_name",
newColName = "col_new_name",
dataType = IntegerType,
comment = Some("col_comment"),
afterColName = Some("column_name"),
restrict = true,
cascade = false)(sql3)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
}
test("alter table: add/replace columns") {
val sql1 =
"""
|ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
|ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
|COMMENT 'test_comment2') CASCADE
""".stripMargin
val sql2 =
"""
|ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT
|COMMENT 'test_comment', new_col2 LONG COMMENT 'test_comment2') RESTRICT
""".stripMargin
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val meta1 = new MetadataBuilder().putString("comment", "test_comment").build()
val meta2 = new MetadataBuilder().putString("comment", "test_comment2").build()
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableAddCol(
tableIdent,
Some(Map("dt" -> "2008-08-08", "country" -> "us")),
StructType(Seq(
StructField("new_col1", IntegerType, nullable = true, meta1),
StructField("new_col2", LongType, nullable = true, meta2))),
restrict = false,
cascade = true)(sql1)
val expected2 = AlterTableReplaceCol(
tableIdent,
None,
StructType(Seq(
StructField("new_col1", IntegerType, nullable = true, meta1),
StructField("new_col2", LongType, nullable = true, meta2))),
restrict = true,
cascade = false)(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
}

View file

@ -535,6 +535,15 @@ class HiveContext private[hive](
}
}
/**
* Executes a SQL query without parsing it, but instead passing it directly to Hive.
* This is currently only used for DDLs and will be removed as soon as Spark can parse
* all supported Hive DDLs itself.
*/
protected[sql] override def runNativeSql(sqlText: String): Seq[Row] = {
runSqlHive(sqlText).map { s => Row(s) }
}
/** Extends QueryExecution with hive specific features. */
protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) {

View file

@ -88,29 +88,14 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
"TOK_ALTERDATABASE_PROPERTIES",
"TOK_ALTERINDEX_PROPERTIES",
"TOK_ALTERINDEX_REBUILD",
"TOK_ALTERTABLE",
"TOK_ALTERTABLE_ADDCOLS",
"TOK_ALTERTABLE_ADDPARTS",
"TOK_ALTERTABLE_ALTERPARTS",
"TOK_ALTERTABLE_ARCHIVE",
"TOK_ALTERTABLE_CLUSTER_SORT",
"TOK_ALTERTABLE_DROPPARTS",
"TOK_ALTERTABLE_PARTITION",
"TOK_ALTERTABLE_PROPERTIES",
"TOK_ALTERTABLE_RENAME",
"TOK_ALTERTABLE_RENAMECOL",
"TOK_ALTERTABLE_REPLACECOLS",
"TOK_ALTERTABLE_SKEWED",
"TOK_ALTERTABLE_TOUCH",
"TOK_ALTERTABLE_UNARCHIVE",
"TOK_ALTERVIEW_ADDPARTS",
"TOK_ALTERVIEW_AS",
"TOK_ALTERVIEW_DROPPARTS",
"TOK_ALTERVIEW_PROPERTIES",
"TOK_ALTERVIEW_RENAME",
"TOK_CREATEDATABASE",
"TOK_CREATEFUNCTION",
"TOK_CREATEINDEX",
"TOK_CREATEMACRO",
"TOK_CREATEROLE",
@ -164,7 +149,8 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
protected val noExplainCommands = Seq(
"TOK_DESCTABLE",
"TOK_SHOWTABLES",
"TOK_TRUNCATETABLE" // truncate table" is a NativeCommand, does not need to explain.
"TOK_TRUNCATETABLE", // truncate table" is a NativeCommand, does not need to explain.
"TOK_ALTERTABLE"
) ++ nativeCommands
/**