diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala index c105b53f1f..0c2e481954 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.parser +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.types._ @@ -29,6 +30,7 @@ import org.apache.spark.sql.types._ object ParserUtils { object Token { + // Match on (text, children) def unapply(node: ASTNode): Some[(String, List[ASTNode])] = { CurrentOrigin.setPosition(node.line, node.positionInLine) node.pattern @@ -160,7 +162,14 @@ object ParserUtils { } /** - * Throw an exception because we cannot parse the given node. + * Throw an exception because we cannot parse the given node for some unexpected reason. + */ + def parseFailed(msg: String, node: ASTNode): Nothing = { + throw new AnalysisException(s"$msg: '${node.source}") + } + + /** + * Throw an exception because there are no rules to parse the node. */ def noParseRule(msg: String, node: ASTNode): Nothing = { throw new NotImplementedError( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 49a70a7c5f..36fe57f78b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -784,6 +784,15 @@ class SQLContext private[sql]( Dataset.newDataFrame(this, parseSql(sqlText)) } + /** + * Executes a SQL query without parsing it, but instead passing it directly to an underlying + * system to process. This is currently only used for Hive DDLs and will be removed as soon + * as Spark can parse all supported Hive DDLs itself. + */ + private[sql] def runNativeSql(sqlText: String): Seq[Row] = { + throw new UnsupportedOperationException + } + /** * Returns the specified table as a [[DataFrame]]. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala index 471a5e436c..d12dab567b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala @@ -29,7 +29,26 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly import ParserUtils._ /** Check if a command should not be explained. */ - protected def isNoExplainCommand(command: String): Boolean = "TOK_DESCTABLE" == command + protected def isNoExplainCommand(command: String): Boolean = { + "TOK_DESCTABLE" == command || "TOK_ALTERTABLE" == command + } + + /** + * For each node, extract properties in the form of a list ['key1', 'key2', 'key3', 'value'] + * into a pair (key1.key2.key3, value). + */ + private def extractProps( + props: Seq[ASTNode], + expectedNodeText: String): Seq[(String, String)] = { + props.map { + case Token(x, keysAndValue) if x == expectedNodeText => + val key = keysAndValue.init.map { x => unquoteString(x.text) }.mkString(".") + val value = unquoteString(keysAndValue.last.text) + (key, value) + case p => + parseFailed(s"Expected property '$expectedNodeText' in command", p) + } + } protected override def nodeToPlan(node: ASTNode): LogicalPlan = { node match { @@ -64,10 +83,86 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly val tableIdent = extractTableIdent(nameParts) RefreshTable(tableIdent) + // CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] + // [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)]; + case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) => + val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq( + "TOK_IFNOTEXISTS", + "TOK_DATABASELOCATION", + "TOK_DATABASECOMMENT", + "TOK_DATABASEPROPERTIES"), args) + val location = dbLocation.map { + case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc) + case _ => parseFailed("Invalid CREATE DATABASE command", node) + } + val comment = databaseComment.map { + case Token("TOK_DATABASECOMMENT", Token(com, Nil) :: Nil) => unquoteString(com) + case _ => parseFailed("Invalid CREATE DATABASE command", node) + } + val props = dbprops.toSeq.flatMap { + case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) => + extractProps(propList, "TOK_TABLEPROPERTY") + case _ => parseFailed("Invalid CREATE DATABASE command", node) + }.toMap + CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source) + + // CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name + // [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ]; + case Token("TOK_CREATEFUNCTION", args) => + // Example format: + // + // TOK_CREATEFUNCTION + // :- db_name + // :- func_name + // :- alias + // +- TOK_RESOURCE_LIST + // :- TOK_RESOURCE_URI + // : :- TOK_JAR + // : +- '/path/to/jar' + // +- TOK_RESOURCE_URI + // :- TOK_FILE + // +- 'path/to/file' + val (funcNameArgs, otherArgs) = args.partition { + case Token("TOK_RESOURCE_LIST", _) => false + case Token("TOK_TEMPORARY", _) => false + case Token(_, Nil) => true + case _ => parseFailed("Invalid CREATE FUNCTION command", node) + } + // If database name is specified, there are 3 tokens, otherwise 2. + val (funcName, alias) = funcNameArgs match { + case Token(dbName, Nil) :: Token(fname, Nil) :: Token(aname, Nil) :: Nil => + (unquoteString(dbName) + "." + unquoteString(fname), unquoteString(aname)) + case Token(fname, Nil) :: Token(aname, Nil) :: Nil => + (unquoteString(fname), unquoteString(aname)) + case _ => + parseFailed("Invalid CREATE FUNCTION command", node) + } + // Extract other keywords, if they exist + val Seq(rList, temp) = getClauses(Seq("TOK_RESOURCE_LIST", "TOK_TEMPORARY"), otherArgs) + val resourcesMap = rList.toSeq.flatMap { + case Token("TOK_RESOURCE_LIST", resources) => + resources.map { + case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) => + val resourceType = rType match { + case Token("TOK_JAR", Nil) => "jar" + case Token("TOK_FILE", Nil) => "file" + case Token("TOK_ARCHIVE", Nil) => "archive" + case Token(f, _) => parseFailed(s"Unexpected resource format '$f'", node) + } + (resourceType, unquoteString(rPath)) + case _ => parseFailed("Invalid CREATE FUNCTION command", node) + } + case _ => parseFailed("Invalid CREATE FUNCTION command", node) + }.toMap + CreateFunction(funcName, alias, resourcesMap, temp.isDefined)(node.source) + + case Token("TOK_ALTERTABLE", alterTableArgs) => + AlterTableCommandParser.parse(node) + case Token("TOK_CREATETABLEUSING", createTableArgs) => val Seq( temp, - allowExisting, + ifNotExists, Some(tabName), tableCols, Some(Token("TOK_TABLEPROVIDER", providerNameParts)), @@ -79,30 +174,22 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly "TOK_TABLEPROVIDER", "TOK_TABLEOPTIONS", "TOK_QUERY"), createTableArgs) - val tableIdent: TableIdentifier = extractTableIdent(tabName) - val columns = tableCols.map { case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField)) + case _ => parseFailed("Invalid CREATE TABLE command", node) } - val provider = providerNameParts.map { case Token(name, Nil) => name + case _ => parseFailed("Invalid CREATE TABLE command", node) }.mkString(".") - - val options: Map[String, String] = tableOpts.toSeq.flatMap { - case Token("TOK_TABLEOPTIONS", options) => - options.map { - case Token("TOK_TABLEOPTION", keysAndValue) => - val key = keysAndValue.init.map(_.text).mkString(".") - val value = unquoteString(keysAndValue.last.text) - (key, value) - } + val options = tableOpts.toSeq.flatMap { + case Token("TOK_TABLEOPTIONS", opts) => extractProps(opts, "TOK_TABLEOPTION") + case _ => parseFailed("Invalid CREATE TABLE command", node) }.toMap + val asClause = tableAs.map(nodeToPlan) - val asClause = tableAs.map(nodeToPlan(_)) - - if (temp.isDefined && allowExisting.isDefined) { + if (temp.isDefined && ifNotExists.isDefined) { throw new AnalysisException( "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.") } @@ -113,7 +200,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly "a CREATE TABLE AS SELECT statement does not allow column definitions.") } - val mode = if (allowExisting.isDefined) { + val mode = if (ifNotExists.isDefined) { SaveMode.Ignore } else if (temp.isDefined) { SaveMode.Overwrite @@ -136,7 +223,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly provider, temp.isDefined, options, - allowExisting.isDefined, + ifNotExists.isDefined, managedIfNoPath = false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala new file mode 100644 index 0000000000..58639275c1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommandParser.scala @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, SortDirection} +import org.apache.spark.sql.catalyst.parser._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType + + +/** + * Helper object to parse alter table commands. + */ +object AlterTableCommandParser { + import ParserUtils._ + + /** + * Parse the given node assuming it is an alter table command. + */ + def parse(node: ASTNode): LogicalPlan = { + node.children match { + case (tabName @ Token("TOK_TABNAME", _)) :: otherNodes => + val tableIdent = extractTableIdent(tabName) + val partSpec = getClauseOption("TOK_PARTSPEC", node.children).map(parsePartitionSpec) + matchAlterTableCommands(node, otherNodes, tableIdent, partSpec) + case _ => + parseFailed("Could not parse ALTER TABLE command", node) + } + } + + private def cleanAndUnquoteString(s: String): String = { + cleanIdentifier(unquoteString(s)) + } + + /** + * Extract partition spec from the given [[ASTNode]] as a map, assuming it exists. + * + * Expected format: + * +- TOK_PARTSPEC + * :- TOK_PARTVAL + * : :- dt + * : +- '2008-08-08' + * +- TOK_PARTVAL + * :- country + * +- 'us' + */ + private def parsePartitionSpec(node: ASTNode): Map[String, String] = { + node match { + case Token("TOK_PARTSPEC", partitions) => + partitions.map { + // Note: sometimes there's a "=", "<" or ">" between the key and the value + case Token("TOK_PARTVAL", ident :: conj :: constant :: Nil) => + (cleanAndUnquoteString(ident.text), cleanAndUnquoteString(constant.text)) + case Token("TOK_PARTVAL", ident :: constant :: Nil) => + (cleanAndUnquoteString(ident.text), cleanAndUnquoteString(constant.text)) + case Token("TOK_PARTVAL", ident :: Nil) => + (cleanAndUnquoteString(ident.text), null) + case _ => + parseFailed("Invalid ALTER TABLE command", node) + }.toMap + case _ => + parseFailed("Expected partition spec in ALTER TABLE command", node) + } + } + + /** + * Extract table properties from the given [[ASTNode]] as a map, assuming it exists. + * + * Expected format: + * +- TOK_TABLEPROPERTIES + * +- TOK_TABLEPROPLIST + * :- TOK_TABLEPROPERTY + * : :- 'test' + * : +- 'value' + * +- TOK_TABLEPROPERTY + * :- 'comment' + * +- 'new_comment' + */ + private def extractTableProps(node: ASTNode): Map[String, String] = { + node match { + case Token("TOK_TABLEPROPERTIES", propsList) => + propsList.flatMap { + case Token("TOK_TABLEPROPLIST", props) => + props.map { case Token("TOK_TABLEPROPERTY", key :: value :: Nil) => + val k = cleanAndUnquoteString(key.text) + val v = value match { + case Token("TOK_NULL", Nil) => null + case _ => cleanAndUnquoteString(value.text) + } + (k, v) + } + case _ => + parseFailed("Invalid ALTER TABLE command", node) + }.toMap + case _ => + parseFailed("Expected table properties in ALTER TABLE command", node) + } + } + + /** + * Parse an alter table command from a [[ASTNode]] into a [[LogicalPlan]]. + * This follows https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL. + * + * @param node the original [[ASTNode]] to parse. + * @param otherNodes the other [[ASTNode]]s after the first one containing the table name. + * @param tableIdent identifier of the table, parsed from the first [[ASTNode]]. + * @param partition spec identifying the partition this command is concerned with, if any. + */ + // TODO: This method is massive. Break it down. + private def matchAlterTableCommands( + node: ASTNode, + otherNodes: Seq[ASTNode], + tableIdent: TableIdentifier, + partition: Option[TablePartitionSpec]): LogicalPlan = { + otherNodes match { + // ALTER TABLE table_name RENAME TO new_table_name; + case Token("TOK_ALTERTABLE_RENAME", renameArgs) :: _ => + val tableNameClause = getClause("TOK_TABNAME", renameArgs) + val newTableIdent = extractTableIdent(tableNameClause) + AlterTableRename(tableIdent, newTableIdent)(node.source) + + // ALTER TABLE table_name SET TBLPROPERTIES ('comment' = new_comment); + case Token("TOK_ALTERTABLE_PROPERTIES", args) :: _ => + val properties = extractTableProps(args.head) + AlterTableSetProperties(tableIdent, properties)(node.source) + + // ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'key'); + case Token("TOK_ALTERTABLE_DROPPROPERTIES", args) :: _ => + val properties = extractTableProps(args.head) + val ifExists = getClauseOption("TOK_IFEXISTS", args).isDefined + AlterTableUnsetProperties(tableIdent, properties, ifExists)(node.source) + + // ALTER TABLE table_name [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props]; + case Token("TOK_ALTERTABLE_SERIALIZER", Token(serdeClassName, Nil) :: serdeArgs) :: _ => + AlterTableSerDeProperties( + tableIdent, + Some(cleanAndUnquoteString(serdeClassName)), + serdeArgs.headOption.map(extractTableProps), + partition)(node.source) + + // ALTER TABLE table_name [PARTITION spec] SET SERDEPROPERTIES serde_properties; + case Token("TOK_ALTERTABLE_SERDEPROPERTIES", args) :: _ => + AlterTableSerDeProperties( + tableIdent, + None, + Some(extractTableProps(args.head)), + partition)(node.source) + + // ALTER TABLE table_name CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS; + case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_ALTERTABLE_BUCKETS", b) :: Nil) :: _ => + val clusterCols: Seq[String] = b.head match { + case Token("TOK_TABCOLNAME", children) => children.map(_.text) + case _ => parseFailed("Invalid ALTER TABLE command", node) + } + // If sort columns are specified, num buckets should be the third arg. + // If sort columns are not specified, num buckets should be the second arg. + // TODO: actually use `sortDirections` once we actually store that in the metastore + val (sortCols: Seq[String], sortDirections: Seq[SortDirection], numBuckets: Int) = { + b.tail match { + case Token("TOK_TABCOLNAME", children) :: numBucketsNode :: Nil => + val (cols, directions) = children.map { + case Token("TOK_TABSORTCOLNAMEASC", Token(col, Nil) :: Nil) => (col, Ascending) + case Token("TOK_TABSORTCOLNAMEDESC", Token(col, Nil) :: Nil) => (col, Descending) + }.unzip + (cols, directions, numBucketsNode.text.toInt) + case numBucketsNode :: Nil => + (Nil, Nil, numBucketsNode.text.toInt) + case _ => + parseFailed("Invalid ALTER TABLE command", node) + } + } + AlterTableStorageProperties( + tableIdent, + BucketSpec(numBuckets, clusterCols, sortCols))(node.source) + + // ALTER TABLE table_name NOT CLUSTERED + case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_NOT_CLUSTERED", Nil) :: Nil) :: _ => + AlterTableNotClustered(tableIdent)(node.source) + + // ALTER TABLE table_name NOT SORTED + case Token("TOK_ALTERTABLE_CLUSTER_SORT", Token("TOK_NOT_SORTED", Nil) :: Nil) :: _ => + AlterTableNotSorted(tableIdent)(node.source) + + // ALTER TABLE table_name SKEWED BY (col1, col2) + // ON ((col1_value, col2_value) [, (col1_value, col2_value), ...]) + // [STORED AS DIRECTORIES]; + case Token("TOK_ALTERTABLE_SKEWED", + Token("TOK_TABLESKEWED", + Token("TOK_TABCOLNAME", colNames) :: colValues :: rest) :: Nil) :: _ => + // Example format: + // + // +- TOK_ALTERTABLE_SKEWED + // :- TOK_TABLESKEWED + // : :- TOK_TABCOLNAME + // : : :- dt + // : : +- country + // :- TOK_TABCOLVALUE_PAIR + // : :- TOK_TABCOLVALUES + // : : :- TOK_TABCOLVALUE + // : : : :- '2008-08-08' + // : : : +- 'us' + // : :- TOK_TABCOLVALUES + // : : :- TOK_TABCOLVALUE + // : : : :- '2009-09-09' + // : : : +- 'uk' + // +- TOK_STOREASDIR + val names = colNames.map { n => cleanAndUnquoteString(n.text) } + val values = colValues match { + case Token("TOK_TABCOLVALUE", vals) => + Seq(vals.map { n => cleanAndUnquoteString(n.text) }) + case Token("TOK_TABCOLVALUE_PAIR", pairs) => + pairs.map { + case Token("TOK_TABCOLVALUES", Token("TOK_TABCOLVALUE", vals) :: Nil) => + vals.map { n => cleanAndUnquoteString(n.text) } + case _ => + parseFailed("Invalid ALTER TABLE command", node) + } + case _ => + parseFailed("Invalid ALTER TABLE command", node) + } + val storedAsDirs = rest match { + case Token("TOK_STOREDASDIRS", Nil) :: Nil => true + case _ => false + } + AlterTableSkewed( + tableIdent, + names, + values, + storedAsDirs)(node.source) + + // ALTER TABLE table_name NOT SKEWED + case Token("TOK_ALTERTABLE_SKEWED", Nil) :: _ => + AlterTableNotSkewed(tableIdent)(node.source) + + // ALTER TABLE table_name NOT STORED AS DIRECTORIES + case Token("TOK_ALTERTABLE_SKEWED", Token("TOK_STOREDASDIRS", Nil) :: Nil) :: _ => + AlterTableNotStoredAsDirs(tableIdent)(node.source) + + // ALTER TABLE table_name SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] ); + case Token("TOK_ALTERTABLE_SKEWED_LOCATION", + Token("TOK_SKEWED_LOCATIONS", + Token("TOK_SKEWED_LOCATION_LIST", locationMaps) :: Nil) :: Nil) :: _ => + // Expected format: + // + // +- TOK_ALTERTABLE_SKEWED_LOCATION + // +- TOK_SKEWED_LOCATIONS + // +- TOK_SKEWED_LOCATION_LIST + // :- TOK_SKEWED_LOCATION_MAP + // : :- 'col1' + // : +- 'loc1' + // +- TOK_SKEWED_LOCATION_MAP + // :- TOK_TABCOLVALUES + // : +- TOK_TABCOLVALUE + // : :- 'col2' + // : +- 'col3' + // +- 'loc2' + val skewedMaps = locationMaps.flatMap { + case Token("TOK_SKEWED_LOCATION_MAP", col :: loc :: Nil) => + col match { + case Token(const, Nil) => + Seq((cleanAndUnquoteString(const), cleanAndUnquoteString(loc.text))) + case Token("TOK_TABCOLVALUES", Token("TOK_TABCOLVALUE", keys) :: Nil) => + keys.map { k => (cleanAndUnquoteString(k.text), cleanAndUnquoteString(loc.text)) } + } + case _ => + parseFailed("Invalid ALTER TABLE command", node) + }.toMap + AlterTableSkewedLocation(tableIdent, skewedMaps)(node.source) + + // ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1'] + // spec [LOCATION 'loc2'] ...; + case Token("TOK_ALTERTABLE_ADDPARTS", args) :: _ => + val (ifNotExists, parts) = args.head match { + case Token("TOK_IFNOTEXISTS", Nil) => (true, args.tail) + case _ => (false, args) + } + // List of (spec, location) to describe partitions to add + // Each partition spec may or may not be followed by a location + val parsedParts = new ArrayBuffer[(TablePartitionSpec, Option[String])] + parts.foreach { + case t @ Token("TOK_PARTSPEC", _) => + parsedParts += ((parsePartitionSpec(t), None)) + case Token("TOK_PARTITIONLOCATION", loc :: Nil) => + // Update the location of the last partition we just added + if (parsedParts.nonEmpty) { + val (spec, _) = parsedParts.remove(parsedParts.length - 1) + parsedParts += ((spec, Some(unquoteString(loc.text)))) + } + case _ => + parseFailed("Invalid ALTER TABLE command", node) + } + AlterTableAddPartition(tableIdent, parsedParts, ifNotExists)(node.source) + + // ALTER TABLE table_name PARTITION spec1 RENAME TO PARTITION spec2; + case Token("TOK_ALTERTABLE_RENAMEPART", spec :: Nil) :: _ => + val newPartition = parsePartitionSpec(spec) + val oldPartition = partition.getOrElse { + parseFailed("Expected old partition spec in ALTER TABLE rename partition command", node) + } + AlterTableRenamePartition(tableIdent, oldPartition, newPartition)(node.source) + + // ALTER TABLE table_name_1 EXCHANGE PARTITION spec WITH TABLE table_name_2; + case Token("TOK_ALTERTABLE_EXCHANGEPARTITION", spec :: newTable :: Nil) :: _ => + val parsedSpec = parsePartitionSpec(spec) + val newTableIdent = extractTableIdent(newTable) + AlterTableExchangePartition(tableIdent, newTableIdent, parsedSpec)(node.source) + + // ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + case Token("TOK_ALTERTABLE_DROPPARTS", args) :: _ => + val parts = args.collect { case p @ Token("TOK_PARTSPEC", _) => parsePartitionSpec(p) } + val ifExists = getClauseOption("TOK_IFEXISTS", args).isDefined + val purge = getClauseOption("PURGE", args).isDefined + AlterTableDropPartition(tableIdent, parts, ifExists, purge)(node.source) + + // ALTER TABLE table_name ARCHIVE PARTITION spec; + case Token("TOK_ALTERTABLE_ARCHIVE", spec :: Nil) :: _ => + AlterTableArchivePartition(tableIdent, parsePartitionSpec(spec))(node.source) + + // ALTER TABLE table_name UNARCHIVE PARTITION spec; + case Token("TOK_ALTERTABLE_UNARCHIVE", spec :: Nil) :: _ => + AlterTableUnarchivePartition(tableIdent, parsePartitionSpec(spec))(node.source) + + // ALTER TABLE table_name [PARTITION spec] SET FILEFORMAT file_format; + case Token("TOK_ALTERTABLE_FILEFORMAT", args) :: _ => + val Seq(fileFormat, genericFormat) = + getClauses(Seq("TOK_TABLEFILEFORMAT", "TOK_FILEFORMAT_GENERIC"), args) + // Note: the AST doesn't contain information about which file format is being set here. + // E.g. we can't differentiate between INPUTFORMAT and OUTPUTFORMAT if either is set. + // Right now this just stores the values, but we should figure out how to get the keys. + val fFormat = fileFormat + .map { _.children.map { n => cleanAndUnquoteString(n.text) }} + .getOrElse(Seq()) + val gFormat = genericFormat.map { f => cleanAndUnquoteString(f.children(0).text) } + AlterTableSetFileFormat(tableIdent, partition, fFormat, gFormat)(node.source) + + // ALTER TABLE table_name [PARTITION spec] SET LOCATION "loc"; + case Token("TOK_ALTERTABLE_LOCATION", Token(loc, Nil) :: Nil) :: _ => + AlterTableSetLocation(tableIdent, partition, cleanAndUnquoteString(loc))(node.source) + + // ALTER TABLE table_name TOUCH [PARTITION spec]; + case Token("TOK_ALTERTABLE_TOUCH", args) :: _ => + // Note: the partition spec, if it exists, comes after TOUCH, so `partition` should + // always be None here. Instead, we need to parse it from the TOUCH node's children. + val part = getClauseOption("TOK_PARTSPEC", args).map(parsePartitionSpec) + AlterTableTouch(tableIdent, part)(node.source) + + // ALTER TABLE table_name [PARTITION spec] COMPACT 'compaction_type'; + case Token("TOK_ALTERTABLE_COMPACT", Token(compactType, Nil) :: Nil) :: _ => + AlterTableCompact(tableIdent, partition, cleanAndUnquoteString(compactType))(node.source) + + // ALTER TABLE table_name [PARTITION spec] CONCATENATE; + case Token("TOK_ALTERTABLE_MERGEFILES", _) :: _ => + AlterTableMerge(tableIdent, partition)(node.source) + + // ALTER TABLE table_name [PARTITION spec] CHANGE [COLUMN] col_old_name col_new_name + // column_type [COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT]; + case Token("TOK_ALTERTABLE_RENAMECOL", oldName :: newName :: dataType :: args) :: _ => + val afterColName: Option[String] = + getClauseOption("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", args).map { ap => + ap.children match { + case Token(col, Nil) :: Nil => col + case _ => parseFailed("Invalid ALTER TABLE command", node) + } + } + val restrict = getClauseOption("TOK_RESTRICT", args).isDefined + val cascade = getClauseOption("TOK_CASCADE", args).isDefined + val comment = args.headOption.map { + case Token("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", _) => null + case Token("TOK_RESTRICT", _) => null + case Token("TOK_CASCADE", _) => null + case Token(commentStr, Nil) => cleanAndUnquoteString(commentStr) + case _ => parseFailed("Invalid ALTER TABLE command", node) + } + AlterTableChangeCol( + tableIdent, + partition, + oldName.text, + newName.text, + nodeToDataType(dataType), + comment, + afterColName, + restrict, + cascade)(node.source) + + // ALTER TABLE table_name [PARTITION spec] ADD COLUMNS (name type [COMMENT comment], ...) + // [CASCADE|RESTRICT] + case Token("TOK_ALTERTABLE_ADDCOLS", args) :: _ => + val columnNodes = getClause("TOK_TABCOLLIST", args).children + val columns = StructType(columnNodes.map(nodeToStructField)) + val restrict = getClauseOption("TOK_RESTRICT", args).isDefined + val cascade = getClauseOption("TOK_CASCADE", args).isDefined + AlterTableAddCol(tableIdent, partition, columns, restrict, cascade)(node.source) + + // ALTER TABLE table_name [PARTITION spec] REPLACE COLUMNS (name type [COMMENT comment], ...) + // [CASCADE|RESTRICT] + case Token("TOK_ALTERTABLE_REPLACECOLS", args) :: _ => + val columnNodes = getClause("TOK_TABCOLLIST", args).children + val columns = StructType(columnNodes.map(nodeToStructField)) + val restrict = getClauseOption("TOK_RESTRICT", args).isDefined + val cascade = getClauseOption("TOK_CASCADE", args).isDefined + AlterTableReplaceCol(tableIdent, partition, columns, restrict, cascade)(node.source) + + case _ => + parseFailed("Unsupported ALTER TABLE command", node) + } + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala new file mode 100644 index 0000000000..9df58d214a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.Logging +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.datasources.BucketSpec +import org.apache.spark.sql.types._ + + +// Note: The definition of these commands are based on the ones described in +// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL + +/** + * A DDL command expected to be parsed and run in an underlying system instead of in Spark. + */ +abstract class NativeDDLCommand(val sql: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.runNativeSql(sql) + } + + override val output: Seq[Attribute] = { + Seq(AttributeReference("result", StringType, nullable = false)()) + } + +} + +case class CreateDatabase( + databaseName: String, + ifNotExists: Boolean, + path: Option[String], + comment: Option[String], + props: Map[String, String])(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class CreateFunction( + functionName: String, + alias: String, + resourcesMap: Map[String, String], + isTemp: Boolean)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableRename( + oldName: TableIdentifier, + newName: TableIdentifier)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableSetProperties( + tableName: TableIdentifier, + properties: Map[String, String])(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableUnsetProperties( + tableName: TableIdentifier, + properties: Map[String, String], + ifExists: Boolean)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableSerDeProperties( + tableName: TableIdentifier, + serdeClassName: Option[String], + serdeProperties: Option[Map[String, String]], + partition: Option[Map[String, String]])(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableStorageProperties( + tableName: TableIdentifier, + buckets: BucketSpec)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableNotClustered( + tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging + +case class AlterTableNotSorted( + tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging + +case class AlterTableSkewed( + tableName: TableIdentifier, + // e.g. (dt, country) + skewedCols: Seq[String], + // e.g. ('2008-08-08', 'us), ('2009-09-09', 'uk') + skewedValues: Seq[Seq[String]], + storedAsDirs: Boolean)(sql: String) + extends NativeDDLCommand(sql) with Logging { + + require(skewedValues.forall(_.size == skewedCols.size), + "number of columns in skewed values do not match number of skewed columns provided") +} + +case class AlterTableNotSkewed( + tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging + +case class AlterTableNotStoredAsDirs( + tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging + +case class AlterTableSkewedLocation( + tableName: TableIdentifier, + skewedMap: Map[String, String])(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableAddPartition( + tableName: TableIdentifier, + partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])], + ifNotExists: Boolean)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableRenamePartition( + tableName: TableIdentifier, + oldPartition: TablePartitionSpec, + newPartition: TablePartitionSpec)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableExchangePartition( + fromTableName: TableIdentifier, + toTableName: TableIdentifier, + spec: TablePartitionSpec)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableDropPartition( + tableName: TableIdentifier, + specs: Seq[TablePartitionSpec], + ifExists: Boolean, + purge: Boolean)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableArchivePartition( + tableName: TableIdentifier, + spec: TablePartitionSpec)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableUnarchivePartition( + tableName: TableIdentifier, + spec: TablePartitionSpec)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableSetFileFormat( + tableName: TableIdentifier, + partitionSpec: Option[TablePartitionSpec], + fileFormat: Seq[String], + genericFormat: Option[String])(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableSetLocation( + tableName: TableIdentifier, + partitionSpec: Option[TablePartitionSpec], + location: String)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableTouch( + tableName: TableIdentifier, + partitionSpec: Option[TablePartitionSpec])(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableCompact( + tableName: TableIdentifier, + partitionSpec: Option[TablePartitionSpec], + compactType: String)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableMerge( + tableName: TableIdentifier, + partitionSpec: Option[TablePartitionSpec])(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableChangeCol( + tableName: TableIdentifier, + partitionSpec: Option[TablePartitionSpec], + oldColName: String, + newColName: String, + dataType: DataType, + comment: Option[String], + afterColName: Option[String], + restrict: Boolean, + cascade: Boolean)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableAddCol( + tableName: TableIdentifier, + partitionSpec: Option[TablePartitionSpec], + columns: StructType, + restrict: Boolean, + cascade: Boolean)(sql: String) + extends NativeDDLCommand(sql) with Logging + +case class AlterTableReplaceCol( + tableName: TableIdentifier, + partitionSpec: Option[TablePartitionSpec], + columns: StructType, + restrict: Boolean, + cascade: Boolean)(sql: String) + extends NativeDDLCommand(sql) with Logging diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala new file mode 100644 index 0000000000..0d632a8a13 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.execution.SparkQl +import org.apache.spark.sql.execution.datasources.BucketSpec +import org.apache.spark.sql.types._ + +class DDLCommandSuite extends PlanTest { + private val parser = new SparkQl + + test("create database") { + val sql = + """ + |CREATE DATABASE IF NOT EXISTS database_name + |COMMENT 'database_comment' LOCATION '/home/user/db' + |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c') + """.stripMargin + val parsed = parser.parsePlan(sql) + val expected = CreateDatabase( + "database_name", + ifNotExists = true, + Some("/home/user/db"), + Some("database_comment"), + Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql) + comparePlans(parsed, expected) + } + + test("create function") { + val sql1 = + """ + |CREATE TEMPORARY FUNCTION helloworld as + |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar', + |FILE 'path/to/file' + """.stripMargin + val sql2 = + """ + |CREATE FUNCTION hello.world as + |'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive', + |FILE 'path/to/file' + """.stripMargin + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val expected1 = CreateFunction( + "helloworld", + "com.matthewrathbone.example.SimpleUDFExample", + Map("jar" -> "/path/to/jar", "file" -> "path/to/file"), + isTemp = true)(sql1) + val expected2 = CreateFunction( + "hello.world", + "com.matthewrathbone.example.SimpleUDFExample", + Map("archive" -> "/path/to/archive", "file" -> "path/to/file"), + isTemp = false)(sql2) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: rename table") { + val sql = "ALTER TABLE table_name RENAME TO new_table_name" + val parsed = parser.parsePlan(sql) + val expected = AlterTableRename( + TableIdentifier("table_name", None), + TableIdentifier("new_table_name", None))(sql) + comparePlans(parsed, expected) + } + + test("alter table: alter table properties") { + val sql1 = "ALTER TABLE table_name SET TBLPROPERTIES ('test' = 'test', " + + "'comment' = 'new_comment')" + val sql2 = "ALTER TABLE table_name UNSET TBLPROPERTIES ('comment', 'test')" + val sql3 = "ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'test')" + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableSetProperties( + tableIdent, Map("test" -> "test", "comment" -> "new_comment"))(sql1) + val expected2 = AlterTableUnsetProperties( + tableIdent, Map("comment" -> null, "test" -> null), ifExists = false)(sql2) + val expected3 = AlterTableUnsetProperties( + tableIdent, Map("comment" -> null, "test" -> null), ifExists = true)(sql3) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + } + + test("alter table: SerDe properties") { + val sql1 = "ALTER TABLE table_name SET SERDE 'org.apache.class'" + val sql2 = + """ + |ALTER TABLE table_name SET SERDE 'org.apache.class' + |WITH SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',') + """.stripMargin + val sql3 = + """ + |ALTER TABLE table_name SET SERDEPROPERTIES ('columns'='foo,bar', + |'field.delim' = ',') + """.stripMargin + val sql4 = + """ + |ALTER TABLE table_name PARTITION (test, dt='2008-08-08', + |country='us') SET SERDE 'org.apache.class' WITH SERDEPROPERTIES ('columns'='foo,bar', + |'field.delim' = ',') + """.stripMargin + val sql5 = + """ + |ALTER TABLE table_name PARTITION (test, dt='2008-08-08', + |country='us') SET SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',') + """.stripMargin + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + val parsed4 = parser.parsePlan(sql4) + val parsed5 = parser.parsePlan(sql5) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableSerDeProperties( + tableIdent, Some("org.apache.class"), None, None)(sql1) + val expected2 = AlterTableSerDeProperties( + tableIdent, + Some("org.apache.class"), + Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), + None)(sql2) + val expected3 = AlterTableSerDeProperties( + tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)(sql3) + val expected4 = AlterTableSerDeProperties( + tableIdent, + Some("org.apache.class"), + Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), + Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql4) + val expected5 = AlterTableSerDeProperties( + tableIdent, + None, + Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), + Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql5) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + comparePlans(parsed4, expected4) + comparePlans(parsed5, expected5) + } + + test("alter table: storage properties") { + val sql1 = "ALTER TABLE table_name CLUSTERED BY (dt, country) INTO 10 BUCKETS" + val sql2 = "ALTER TABLE table_name CLUSTERED BY (dt, country) SORTED BY " + + "(dt, country DESC) INTO 10 BUCKETS" + val sql3 = "ALTER TABLE table_name NOT CLUSTERED" + val sql4 = "ALTER TABLE table_name NOT SORTED" + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + val parsed4 = parser.parsePlan(sql4) + val tableIdent = TableIdentifier("table_name", None) + val cols = List("dt", "country") + // TODO: also test the sort directions once we keep track of that + val expected1 = AlterTableStorageProperties( + tableIdent, BucketSpec(10, cols, Nil))(sql1) + val expected2 = AlterTableStorageProperties( + tableIdent, BucketSpec(10, cols, cols))(sql2) + val expected3 = AlterTableNotClustered(tableIdent)(sql3) + val expected4 = AlterTableNotSorted(tableIdent)(sql4) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + comparePlans(parsed4, expected4) + } + + test("alter table: skewed") { + val sql1 = + """ + |ALTER TABLE table_name SKEWED BY (dt, country) ON + |(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn')) STORED AS DIRECTORIES + """.stripMargin + val sql2 = + """ + |ALTER TABLE table_name SKEWED BY (dt, country) ON + |('2008-08-08', 'us') STORED AS DIRECTORIES + """.stripMargin + val sql3 = + """ + |ALTER TABLE table_name SKEWED BY (dt, country) ON + |(('2008-08-08', 'us'), ('2009-09-09', 'uk')) + """.stripMargin + val sql4 = "ALTER TABLE table_name NOT SKEWED" + val sql5 = "ALTER TABLE table_name NOT STORED AS DIRECTORIES" + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + val parsed4 = parser.parsePlan(sql4) + val parsed5 = parser.parsePlan(sql5) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableSkewed( + tableIdent, + Seq("dt", "country"), + Seq(List("2008-08-08", "us"), List("2009-09-09", "uk"), List("2010-10-10", "cn")), + storedAsDirs = true)(sql1) + val expected2 = AlterTableSkewed( + tableIdent, + Seq("dt", "country"), + Seq(List("2008-08-08", "us")), + storedAsDirs = true)(sql2) + val expected3 = AlterTableSkewed( + tableIdent, + Seq("dt", "country"), + Seq(List("2008-08-08", "us"), List("2009-09-09", "uk")), + storedAsDirs = false)(sql3) + val expected4 = AlterTableNotSkewed(tableIdent)(sql4) + val expected5 = AlterTableNotStoredAsDirs(tableIdent)(sql5) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + comparePlans(parsed4, expected4) + comparePlans(parsed5, expected5) + } + + test("alter table: skewed location") { + val sql1 = + """ + |ALTER TABLE table_name SET SKEWED LOCATION + |('123'='location1', 'test'='location2') + """.stripMargin + val sql2 = + """ + |ALTER TABLE table_name SET SKEWED LOCATION + |(('2008-08-08', 'us')='location1', 'test'='location2') + """.stripMargin + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableSkewedLocation( + tableIdent, + Map("123" -> "location1", "test" -> "location2"))(sql1) + val expected2 = AlterTableSkewedLocation( + tableIdent, + Map("2008-08-08" -> "location1", "us" -> "location1", "test" -> "location2"))(sql2) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: add partition") { + val sql = + """ + |ALTER TABLE table_name ADD IF NOT EXISTS PARTITION + |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION + |(dt='2009-09-09', country='uk') + """.stripMargin + val parsed = parser.parsePlan(sql) + val expected = AlterTableAddPartition( + TableIdentifier("table_name", None), + Seq( + (Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")), + (Map("dt" -> "2009-09-09", "country" -> "uk"), None)), + ifNotExists = true)(sql) + comparePlans(parsed, expected) + } + + test("alter table: rename partition") { + val sql = + """ + |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') + |RENAME TO PARTITION (dt='2008-09-09', country='uk') + """.stripMargin + val parsed = parser.parsePlan(sql) + val expected = AlterTableRenamePartition( + TableIdentifier("table_name", None), + Map("dt" -> "2008-08-08", "country" -> "us"), + Map("dt" -> "2008-09-09", "country" -> "uk"))(sql) + comparePlans(parsed, expected) + } + + test("alter table: exchange partition") { + val sql = + """ + |ALTER TABLE table_name_1 EXCHANGE PARTITION + |(dt='2008-08-08', country='us') WITH TABLE table_name_2 + """.stripMargin + val parsed = parser.parsePlan(sql) + val expected = AlterTableExchangePartition( + TableIdentifier("table_name_1", None), + TableIdentifier("table_name_2", None), + Map("dt" -> "2008-08-08", "country" -> "us"))(sql) + comparePlans(parsed, expected) + } + + test("alter table: drop partitions") { + val sql1 = + """ + |ALTER TABLE table_name DROP IF EXISTS PARTITION + |(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk') + """.stripMargin + val sql2 = + """ + |ALTER TABLE table_name DROP PARTITION + |(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk') PURGE + """.stripMargin + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableDropPartition( + tableIdent, + Seq( + Map("dt" -> "2008-08-08", "country" -> "us"), + Map("dt" -> "2009-09-09", "country" -> "uk")), + ifExists = true, + purge = false)(sql1) + val expected2 = AlterTableDropPartition( + tableIdent, + Seq( + Map("dt" -> "2008-08-08", "country" -> "us"), + Map("dt" -> "2009-09-09", "country" -> "uk")), + ifExists = false, + purge = true)(sql2) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: archive partition") { + val sql = "ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')" + val parsed = parser.parsePlan(sql) + val expected = AlterTableArchivePartition( + TableIdentifier("table_name", None), + Map("dt" -> "2008-08-08", "country" -> "us"))(sql) + comparePlans(parsed, expected) + } + + test("alter table: unarchive partition") { + val sql = "ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')" + val parsed = parser.parsePlan(sql) + val expected = AlterTableUnarchivePartition( + TableIdentifier("table_name", None), + Map("dt" -> "2008-08-08", "country" -> "us"))(sql) + comparePlans(parsed, expected) + } + + test("alter table: set file format") { + val sql1 = + """ + |ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' + |OUTPUTFORMAT 'test' SERDE 'test' INPUTDRIVER 'test' OUTPUTDRIVER 'test' + """.stripMargin + val sql2 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " + + "OUTPUTFORMAT 'test' SERDE 'test'" + val sql3 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " + + "SET FILEFORMAT PARQUET" + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableSetFileFormat( + tableIdent, + None, + List("test", "test", "test", "test", "test"), + None)(sql1) + val expected2 = AlterTableSetFileFormat( + tableIdent, + None, + List("test", "test", "test"), + None)(sql2) + val expected3 = AlterTableSetFileFormat( + tableIdent, + Some(Map("dt" -> "2008-08-08", "country" -> "us")), + Seq(), + Some("PARQUET"))(sql3) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + } + + test("alter table: set location") { + val sql1 = "ALTER TABLE table_name SET LOCATION 'new location'" + val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " + + "SET LOCATION 'new location'" + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableSetLocation( + tableIdent, + None, + "new location")(sql1) + val expected2 = AlterTableSetLocation( + tableIdent, + Some(Map("dt" -> "2008-08-08", "country" -> "us")), + "new location")(sql2) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: touch") { + val sql1 = "ALTER TABLE table_name TOUCH" + val sql2 = "ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')" + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableTouch( + tableIdent, + None)(sql1) + val expected2 = AlterTableTouch( + tableIdent, + Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: compact") { + val sql1 = "ALTER TABLE table_name COMPACT 'compaction_type'" + val sql2 = + """ + |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') + |COMPACT 'MAJOR' + """.stripMargin + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableCompact( + tableIdent, + None, + "compaction_type")(sql1) + val expected2 = AlterTableCompact( + tableIdent, + Some(Map("dt" -> "2008-08-08", "country" -> "us")), + "MAJOR")(sql2) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: concatenate") { + val sql1 = "ALTER TABLE table_name CONCATENATE" + val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE" + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableMerge(tableIdent, None)(sql1) + val expected2 = AlterTableMerge( + tableIdent, Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: change column name/type/position/comment") { + val sql1 = "ALTER TABLE table_name CHANGE col_old_name col_new_name INT" + val sql2 = + """ + |ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT + |COMMENT 'col_comment' FIRST CASCADE + """.stripMargin + val sql3 = + """ + |ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT + |COMMENT 'col_comment' AFTER column_name RESTRICT + """.stripMargin + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableChangeCol( + tableName = tableIdent, + partitionSpec = None, + oldColName = "col_old_name", + newColName = "col_new_name", + dataType = IntegerType, + comment = None, + afterColName = None, + restrict = false, + cascade = false)(sql1) + val expected2 = AlterTableChangeCol( + tableName = tableIdent, + partitionSpec = None, + oldColName = "col_old_name", + newColName = "col_new_name", + dataType = IntegerType, + comment = Some("col_comment"), + afterColName = None, + restrict = false, + cascade = true)(sql2) + val expected3 = AlterTableChangeCol( + tableName = tableIdent, + partitionSpec = None, + oldColName = "col_old_name", + newColName = "col_new_name", + dataType = IntegerType, + comment = Some("col_comment"), + afterColName = Some("column_name"), + restrict = true, + cascade = false)(sql3) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + } + + test("alter table: add/replace columns") { + val sql1 = + """ + |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') + |ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG + |COMMENT 'test_comment2') CASCADE + """.stripMargin + val sql2 = + """ + |ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT + |COMMENT 'test_comment', new_col2 LONG COMMENT 'test_comment2') RESTRICT + """.stripMargin + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val meta1 = new MetadataBuilder().putString("comment", "test_comment").build() + val meta2 = new MetadataBuilder().putString("comment", "test_comment2").build() + val tableIdent = TableIdentifier("table_name", None) + val expected1 = AlterTableAddCol( + tableIdent, + Some(Map("dt" -> "2008-08-08", "country" -> "us")), + StructType(Seq( + StructField("new_col1", IntegerType, nullable = true, meta1), + StructField("new_col2", LongType, nullable = true, meta2))), + restrict = false, + cascade = true)(sql1) + val expected2 = AlterTableReplaceCol( + tableIdent, + None, + StructType(Seq( + StructField("new_col1", IntegerType, nullable = true, meta1), + StructField("new_col2", LongType, nullable = true, meta2))), + restrict = true, + cascade = false)(sql2) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 69669d79be..081d849a88 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -535,6 +535,15 @@ class HiveContext private[hive]( } } + /** + * Executes a SQL query without parsing it, but instead passing it directly to Hive. + * This is currently only used for DDLs and will be removed as soon as Spark can parse + * all supported Hive DDLs itself. + */ + protected[sql] override def runNativeSql(sqlText: String): Seq[Row] = { + runSqlHive(sqlText).map { s => Row(s) } + } + /** Extends QueryExecution with hive specific features. */ protected[sql] class QueryExecution(logicalPlan: LogicalPlan) extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 0bdebdc5fd..56acb87c80 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -88,29 +88,14 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging "TOK_ALTERDATABASE_PROPERTIES", "TOK_ALTERINDEX_PROPERTIES", "TOK_ALTERINDEX_REBUILD", - "TOK_ALTERTABLE", - "TOK_ALTERTABLE_ADDCOLS", - "TOK_ALTERTABLE_ADDPARTS", "TOK_ALTERTABLE_ALTERPARTS", - "TOK_ALTERTABLE_ARCHIVE", - "TOK_ALTERTABLE_CLUSTER_SORT", - "TOK_ALTERTABLE_DROPPARTS", "TOK_ALTERTABLE_PARTITION", - "TOK_ALTERTABLE_PROPERTIES", - "TOK_ALTERTABLE_RENAME", - "TOK_ALTERTABLE_RENAMECOL", - "TOK_ALTERTABLE_REPLACECOLS", - "TOK_ALTERTABLE_SKEWED", - "TOK_ALTERTABLE_TOUCH", - "TOK_ALTERTABLE_UNARCHIVE", "TOK_ALTERVIEW_ADDPARTS", "TOK_ALTERVIEW_AS", "TOK_ALTERVIEW_DROPPARTS", "TOK_ALTERVIEW_PROPERTIES", "TOK_ALTERVIEW_RENAME", - "TOK_CREATEDATABASE", - "TOK_CREATEFUNCTION", "TOK_CREATEINDEX", "TOK_CREATEMACRO", "TOK_CREATEROLE", @@ -164,7 +149,8 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging protected val noExplainCommands = Seq( "TOK_DESCTABLE", "TOK_SHOWTABLES", - "TOK_TRUNCATETABLE" // truncate table" is a NativeCommand, does not need to explain. + "TOK_TRUNCATETABLE", // truncate table" is a NativeCommand, does not need to explain. + "TOK_ALTERTABLE" ) ++ nativeCommands /**