[SPARK-15674][SQL] Deprecates "CREATE TEMPORARY TABLE USING...", uses "CREAT TEMPORARY VIEW USING..." instead

## What changes were proposed in this pull request?

The current implementation of "CREATE TEMPORARY TABLE USING datasource..." is NOT creating any intermediate temporary data directory like temporary HDFS folder, instead, it only stores a SQL string in memory. Probably we should use "TEMPORARY VIEW" instead.

This PR assumes a temporary table has to link with some temporary intermediate data. It follows the definition of temporary table like this (from [hortonworks doc](https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/bk_dataintegration/content/temp-tables.html)):
> A temporary table is a convenient way for an application to automatically manage intermediate data generated during a complex query

**Example**:

```
scala> spark.sql("CREATE temporary view  my_tab7 (c1: String, c2: String)  USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat  OPTIONS (PATH '/Users/seanzhong/csv/cars.csv')")
scala> spark.sql("select c1, c2 from my_tab7").show()
+----+-----+
|  c1|   c2|
+----+-----+
|year| make|
|2012|Tesla|
...
```

It NOW prints a **deprecation warning** if "CREATE TEMPORARY TABLE USING..." is used.

```
scala> spark.sql("CREATE temporary table  my_tab7 (c1: String, c2: String)  USING org.apache.spark.sql.execution.datasources.csv.CSVFileFormat  OPTIONS (PATH '/Users/seanzhong/csv/cars.csv')")
16/05/31 10:39:27 WARN SparkStrategies$DDLStrategy: CREATE TEMPORARY TABLE tableName USING... is deprecated, please use CREATE TEMPORARY VIEW viewName USING... instead
```

## How was this patch tested?

Unit test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13414 from clockfly/create_temp_view_using.
This commit is contained in:
Sean Zhong 2016-06-07 15:21:55 -07:00 committed by Herman van Hovell
parent 200f01c8fb
commit 890baaca50
5 changed files with 47 additions and 6 deletions

View file

@ -90,6 +90,9 @@ statement
identifierCommentList? (COMMENT STRING)?
(PARTITIONED ON identifierList)?
(TBLPROPERTIES tablePropertyList)? AS query #createView
| CREATE (OR REPLACE)? TEMPORARY VIEW
tableIdentifier ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTempViewUsing
| ALTER VIEW tableIdentifier AS? query #alterViewQuery
| CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING
(USING resource (',' resource)*)? #createFunction

View file

@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.DataType
@ -346,6 +346,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
}
/**
* Creates a [[CreateTempViewUsing]] logical plan.
*/
override def visitCreateTempViewUsing(
ctx: CreateTempViewUsingContext): LogicalPlan = withOrigin(ctx) {
CreateTempViewUsing(
tableIdent = visitTableIdentifier(ctx.tableIdentifier()),
userSpecifiedSchema = Option(ctx.colTypeList()).map(createStructType),
replace = ctx.REPLACE != null,
provider = ctx.tableProvider.qualifiedName.getText,
options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}
/**
* Create a [[LoadDataCommand]] command.
*

View file

@ -376,9 +376,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case c: CreateTableUsing if c.temporary && !c.allowExisting =>
logWarning(
s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " +
s"please use CREATE TEMPORARY VIEW viewName USING... instead")
ExecutedCommandExec(
CreateTempTableUsing(
c.tableIdent, c.userSpecifiedSchema, c.provider, c.options)) :: Nil
CreateTempViewUsing(
c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil
case c: CreateTableUsing if !c.temporary =>
val cmd =
@ -409,6 +412,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
c.child)
ExecutedCommandExec(cmd) :: Nil
case c: CreateTempViewUsing =>
ExecutedCommandExec(c) :: Nil
case _ => Nil
}
}

View file

@ -64,9 +64,10 @@ case class CreateTableUsingAsSelect(
override def output: Seq[Attribute] = Seq.empty[Attribute]
}
case class CreateTempTableUsing(
case class CreateTempViewUsing(
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
replace: Boolean,
provider: String,
options: Map[String, String]) extends RunnableCommand {
@ -84,7 +85,7 @@ case class CreateTempTableUsing(
sparkSession.sessionState.catalog.createTempView(
tableIdent.table,
Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
overrideIfExists = true)
replace)
Seq.empty[Row]
}

View file

@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
@ -422,6 +422,25 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
test("create temporary view using") {
val csvFile = Thread.currentThread().getContextClassLoader.getResource("cars.csv").toString()
withView("testview") {
sql(s"CREATE OR REPLACE TEMPORARY VIEW testview (c1: String, c2: String) USING " +
"org.apache.spark.sql.execution.datasources.csv.CSVFileFormat " +
s"OPTIONS (PATH '$csvFile')")
checkAnswer(
sql("select c1, c2 from testview order by c1 limit 1"),
Row("1997", "Ford") :: Nil)
// Fails if creating a new view with the same name
intercept[TempTableAlreadyExistsException] {
sql(s"CREATE TEMPORARY VIEW testview USING " +
s"org.apache.spark.sql.execution.datasources.csv.CSVFileFormat OPTIONS (PATH '$csvFile')")
}
}
}
test("alter table: rename") {
val catalog = spark.sessionState.catalog
val tableIdent1 = TableIdentifier("tab1", Some("dbx"))