[SPARK-15171][SQL] Deprecate registerTempTable and add dataset.createTempView

## What changes were proposed in this pull request?

Deprecates registerTempTable and add dataset.createTempView, dataset.createOrReplaceTempView.

## How was this patch tested?

Unit tests.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #12945 from clockfly/spark-15171.
This commit is contained in:
Sean Zhong 2016-05-12 15:51:53 +08:00 committed by Cheng Lian
parent 5207a005cc
commit 33c6eb5218
45 changed files with 197 additions and 120 deletions

View file

@ -41,7 +41,7 @@ object SparkSqlExample {
import sqlContext._
val people = sc.makeRDD(1 to 100, 10).map(x => Person(s"Name$x", x)).toDF()
people.registerTempTable("people")
people.createOrReplaceTempView("people")
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
val teenagerNames = teenagers.map(t => "Name: " + t(0)).collect()
teenagerNames.foreach(println)
@ -52,7 +52,7 @@ object SparkSqlExample {
System.exit(-1)
}
}
test(teenagerNames.size == 7, "Unexpected number of selected elements: " + teenagerNames)
println("Test succeeded")
sc.stop()

View file

@ -75,7 +75,7 @@ public class JavaSparkSQL {
// Apply a schema to an RDD of Java Beans and register it as a table.
Dataset<Row> schemaPeople = spark.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");
schemaPeople.createOrReplaceTempView("people");
// SQL can be run over RDDs that have been registered as tables.
Dataset<Row> teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
@ -102,7 +102,7 @@ public class JavaSparkSQL {
Dataset<Row> parquetFile = spark.read().parquet("people.parquet");
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
parquetFile.createOrReplaceTempView("parquetFile");
Dataset<Row> teenagers2 =
spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
@ -131,7 +131,7 @@ public class JavaSparkSQL {
// |-- name: StringType
// Register this DataFrame as a table.
peopleFromJsonFile.registerTempTable("people");
peopleFromJsonFile.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by `spark`
Dataset<Row> teenagers3 = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
@ -163,7 +163,7 @@ public class JavaSparkSQL {
// | |-- state: StringType
// |-- name: StringType
peopleFromJsonRDD.registerTempTable("people2");
peopleFromJsonRDD.createOrReplaceTempView("people2");
Dataset<Row> peopleWithCity = spark.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {

View file

@ -95,7 +95,7 @@ public final class JavaSqlNetworkWordCount {
Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);
// Register as table
wordsDataFrame.registerTempTable("words");
wordsDataFrame.createOrReplaceTempView("words");
// Do word count on table using SQL and print it
Dataset<Row> wordCountsDataFrame =

View file

@ -67,7 +67,7 @@ if __name__ == "__main__":
# |-- name: string (nullable = true)
# Register this DataFrame as a temporary table.
people.registerTempTable("people")
people.createOrReplaceTempView("people")
# SQL statements can be run by using the sql methods provided by `spark`
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

View file

@ -71,7 +71,7 @@ if __name__ == "__main__":
wordsDataFrame = spark.createDataFrame(rowRdd)
# Register as table
wordsDataFrame.registerTempTable("words")
wordsDataFrame.createOrReplaceTempView("words")
# Do word count on table using SQL and print it
wordCountsDataFrame = \

View file

@ -37,7 +37,7 @@ object RDDRelation {
val df = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
// Any RDD containing case classes can be registered as a table. The schema of the table is
// automatically inferred using scala reflection.
df.registerTempTable("records")
df.createOrReplaceTempView("records")
// Once tables have been registered, you can run SQL queries over them.
println("Result of SELECT *:")
@ -67,7 +67,7 @@ object RDDRelation {
parquetFile.where($"key" === 1).select($"value".as("a")).collect().foreach(println)
// These files can also be registered as tables.
parquetFile.registerTempTable("parquetFile")
parquetFile.createOrReplaceTempView("parquetFile")
spark.sql("SELECT * FROM parquetFile").collect().foreach(println)
spark.stop()

View file

@ -67,7 +67,7 @@ object SqlNetworkWordCount {
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Register as table
wordsDataFrame.registerTempTable("words")
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =

View file

@ -48,6 +48,7 @@ class SQLTransformer @Since("1.6.0") (override val uid: String) extends Transfor
/**
* SQL statement parameter. The statement is provided in string form.
*
* @group param
*/
@Since("1.6.0")
@ -66,7 +67,7 @@ class SQLTransformer @Since("1.6.0") (override val uid: String) extends Transfor
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val tableName = Identifiable.randomUID(uid)
dataset.registerTempTable(tableName)
dataset.createOrReplaceTempView(tableName)
val realStatement = $(statement).replace(tableIdentifier, tableName)
dataset.sparkSession.sql(realStatement)
}
@ -79,7 +80,7 @@ class SQLTransformer @Since("1.6.0") (override val uid: String) extends Transfor
val dummyDF = sqlContext.createDataFrame(dummyRDD, schema)
val tableName = Identifiable.randomUID(uid)
val realStatement = $(statement).replace(tableIdentifier, tableName)
dummyDF.registerTempTable(tableName)
dummyDF.createOrReplaceTempView(tableName)
val outputSchema = sqlContext.sql(realStatement).schema
sqlContext.dropTempTable(tableName)
outputSchema

View file

@ -166,34 +166,20 @@ class Catalog(object):
return DataFrame(df, self._sparkSession._wrapped)
@since(2.0)
def dropTempTable(self, tableName):
"""Drops the temporary table with the given table name in the catalog.
If the table has been cached before, then it will also be uncached.
def dropTempView(self, viewName):
"""Drops the temporary view with the given view name in the catalog.
If the view has been cached before, then it will also be uncached.
>>> spark.createDataFrame([(1, 1)]).registerTempTable("my_table")
>>> spark.createDataFrame([(1, 1)]).createTempView("my_table")
>>> spark.table("my_table").collect()
[Row(_1=1, _2=1)]
>>> spark.catalog.dropTempTable("my_table")
>>> spark.catalog.dropTempView("my_table")
>>> spark.table("my_table") # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
AnalysisException: ...
"""
self._jcatalog.dropTempTable(tableName)
@since(2.0)
def registerTable(self, df, tableName):
"""Registers the given :class:`DataFrame` as a temporary table in the catalog.
>>> df = spark.createDataFrame([(2, 1), (3, 1)])
>>> spark.catalog.registerTable(df, "my_cool_table")
>>> spark.table("my_cool_table").collect()
[Row(_1=2, _2=1), Row(_1=3, _2=1)]
"""
if isinstance(df, DataFrame):
self._jsparkSession.registerTable(df._jdf, tableName)
else:
raise ValueError("Can only register DataFrame as table")
self._jcatalog.dropTempView(viewName)
@ignore_unicode_prefix
@since(2.0)

View file

@ -302,7 +302,7 @@ class SQLContext(object):
>>> sqlContext.registerDataFrameAsTable(df, "table1")
"""
self.sparkSession.catalog.registerTable(df, tableName)
df.createOrReplaceTempView(tableName)
@since(1.6)
def dropTempTable(self, tableName):
@ -311,7 +311,7 @@ class SQLContext(object):
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> sqlContext.dropTempTable("table1")
"""
self.sparkSession.catalog.dropTempTable(tableName)
self.sparkSession.catalog.dropTempView(tableName)
@since(1.3)
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):

View file

@ -119,11 +119,55 @@ class DataFrame(object):
that was used to create this :class:`DataFrame`.
>>> df.registerTempTable("people")
>>> df2 = sqlContext.sql("select * from people")
>>> df2 = spark.sql("select * from people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
>>> spark.catalog.dropTempView("people")
.. note:: Deprecated in 2.0, use createOrReplaceTempView instead.
"""
self._jdf.registerTempTable(name)
self._jdf.createOrReplaceTempView(name)
@since(2.0)
def createTempView(self, name):
"""Creates a temporary view with this DataFrame.
The lifetime of this temporary table is tied to the :class:`SparkSession`
that was used to create this :class:`DataFrame`.
throws :class:`TempTableAlreadyExistsException`, if the view name already exists in the
catalog.
>>> df.createTempView("people")
>>> df2 = spark.sql("select * from people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
>>> df.createTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
Py4JJavaError: ...
: org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException...
>>> spark.catalog.dropTempView("people")
"""
self._jdf.createTempView(name)
@since(2.0)
def createOrReplaceTempView(self, name):
"""Creates or replaces a temporary view with this DataFrame.
The lifetime of this temporary table is tied to the :class:`SparkSession`
that was used to create this :class:`DataFrame`.
>>> df.createOrReplaceTempView("people")
>>> df2 = df.filter(df.age > 3)
>>> df2.createOrReplaceTempView("people")
>>> df3 = spark.sql("select * from people")
>>> sorted(df3.collect()) == sorted(df2.collect())
True
>>> spark.catalog.dropTempView("people")
"""
self._jdf.createOrReplaceTempView(name)
@property
@since(1.4)
@ -1479,12 +1523,13 @@ class DataFrameStatFunctions(object):
def _test():
import doctest
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext
from pyspark.sql import Row, SQLContext, SparkSession
import pyspark.sql.dataframe
globs = pyspark.sql.dataframe.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlContext'] = SQLContext(sc)
globs['spark'] = SparkSession(sc)
globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')])\
.toDF(StructType([StructField('age', IntegerType()),
StructField('name', StringType())]))

View file

@ -160,7 +160,7 @@ class SparkSession(object):
... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
... time=datetime(2014, 8, 1, 14, 1, 5))])
>>> df = allTypes.toDF()
>>> df.registerTempTable("allTypes")
>>> df.createOrReplaceTempView("allTypes")
>>> spark.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a '
... 'from allTypes where b and i > 0').collect()
[Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2, \
@ -484,7 +484,7 @@ class SparkSession(object):
:return: :class:`DataFrame`
>>> spark.catalog.registerTable(df, "table1")
>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
@ -497,7 +497,7 @@ class SparkSession(object):
:return: :class:`DataFrame`
>>> spark.catalog.registerTable(df, "table1")
>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True

View file

@ -35,7 +35,7 @@ import org.apache.spark.sql.types.IntegerType
* ("a", "ca1", "cb2", 5),
* ("b", "ca1", "cb1", 13))
* .toDF("key", "cat1", "cat2", "value")
* data.registerTempTable("data")
* data.createOrReplaceTempView("data")
*
* val agg = data.groupBy($"key")
* .agg(

View file

@ -315,7 +315,7 @@ class SessionCatalog(
/**
* Create a temporary table.
*/
def createTempTable(
def createTempView(
name: String,
tableDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = synchronized {

View file

@ -31,7 +31,7 @@ trait AnalysisTest extends PlanTest {
private def makeAnalyzer(caseSensitive: Boolean): Analyzer = {
val conf = new SimpleCatalystConf(caseSensitive)
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
catalog.createTempTable("TaBlE", TestRelations.testRelation, overrideIfExists = true)
catalog.createTempView("TaBlE", TestRelations.testRelation, overrideIfExists = true)
new Analyzer(catalog, conf) {
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
}

View file

@ -52,7 +52,7 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
private val b: Expression = UnresolvedAttribute("b")
before {
catalog.createTempTable("table", relation, overrideIfExists = true)
catalog.createTempView("table", relation, overrideIfExists = true)
}
private def checkType(expression: Expression, expectedType: DataType): Unit = {

View file

@ -199,17 +199,17 @@ class SessionCatalogSuite extends SparkFunSuite {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable1 = Range(1, 10, 1, 10, Seq())
val tempTable2 = Range(1, 20, 2, 10, Seq())
catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false)
catalog.createTempTable("tbl2", tempTable2, overrideIfExists = false)
catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
catalog.createTempView("tbl2", tempTable2, overrideIfExists = false)
assert(catalog.getTempTable("tbl1") == Option(tempTable1))
assert(catalog.getTempTable("tbl2") == Option(tempTable2))
assert(catalog.getTempTable("tbl3").isEmpty)
// Temporary table already exists
intercept[TempTableAlreadyExistsException] {
catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false)
catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
}
// Temporary table already exists but we override it
catalog.createTempTable("tbl1", tempTable2, overrideIfExists = true)
catalog.createTempView("tbl1", tempTable2, overrideIfExists = true)
assert(catalog.getTempTable("tbl1") == Option(tempTable2))
}
@ -244,7 +244,7 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
val tempTable = Range(1, 10, 2, 10, Seq())
sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
@ -256,7 +256,7 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
// If database is specified, temp tables are never dropped
sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false)
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
@ -305,7 +305,7 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
val tempTable = Range(1, 10, 2, 10, Seq())
sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
@ -385,7 +385,7 @@ class SessionCatalogSuite extends SparkFunSuite {
val sessionCatalog = new SessionCatalog(externalCatalog)
val tempTable1 = Range(1, 10, 1, 10, Seq())
val metastoreTable1 = externalCatalog.getTable("db2", "tbl1")
sessionCatalog.createTempTable("tbl1", tempTable1, overrideIfExists = false)
sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
@ -423,7 +423,7 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
// If database is explicitly specified, do not check temporary tables
val tempTable = Range(1, 10, 1, 10, Seq())
catalog.createTempTable("tbl3", tempTable, overrideIfExists = false)
catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
// If database is not explicitly specified, check the current database
catalog.setCurrentDatabase("db2")
@ -435,8 +435,8 @@ class SessionCatalogSuite extends SparkFunSuite {
test("list tables without pattern") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10, Seq())
catalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
catalog.createTempTable("tbl4", tempTable, overrideIfExists = false)
catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
assert(catalog.listTables("db1").toSet ==
Set(TableIdentifier("tbl1"), TableIdentifier("tbl4")))
assert(catalog.listTables("db2").toSet ==
@ -452,8 +452,8 @@ class SessionCatalogSuite extends SparkFunSuite {
test("list tables with pattern") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10, Seq())
catalog.createTempTable("tbl1", tempTable, overrideIfExists = false)
catalog.createTempTable("tbl4", tempTable, overrideIfExists = false)
catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet)
assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet)
assert(catalog.listTables("db2", "tbl*").toSet ==

View file

@ -321,7 +321,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
val sink = new MemorySink(df.schema)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
resultDf.registerTempTable(queryName)
resultDf.createOrReplaceTempView(queryName)
val continuousQuery = df.sparkSession.sessionState.continuousQueryManager.startQuery(
queryName,
checkpointLocation,

View file

@ -2303,13 +2303,39 @@ class Dataset[T] private[sql](
/**
* Registers this [[Dataset]] as a temporary table using the given name. The lifetime of this
* temporary table is tied to the [[SQLContext]] that was used to create this Dataset.
* temporary table is tied to the [[SparkSession]] that was used to create this Dataset.
*
* @group basic
* @since 1.6.0
*/
@deprecated("Use createOrReplaceTempView(viewName) instead.", "2.0.0")
def registerTempTable(tableName: String): Unit = {
sparkSession.registerTable(toDF(), tableName)
createOrReplaceTempView(tableName)
}
/**
* Creates a temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*
* @throws AnalysisException if the view name already exists
*
* @group basic
* @since 2.0.0
*/
@throws[AnalysisException]
def createTempView(viewName: String): Unit = {
sparkSession.createTempView(viewName, toDF(), replaceIfExists = false)
}
/**
* Creates a temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*
* @group basic
* @since 2.0.0
*/
def createOrReplaceTempView(viewName: String): Unit = {
sparkSession.createTempView(viewName, toDF(), replaceIfExists = true)
}
/**

View file

@ -597,7 +597,7 @@ class SQLContext private[sql](
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
sparkSession.registerTable(df, tableName)
sparkSession.createTempView(tableName, df, replaceIfExists = true)
}
/**
@ -609,7 +609,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def dropTempTable(tableName: String): Unit = {
sparkSession.catalog.dropTempTable(tableName)
sparkSession.catalog.dropTempView(tableName)
}
/**

View file

@ -284,7 +284,7 @@ class SparkSession private(
* // |-- name: string (nullable = false)
* // |-- age: integer (nullable = true)
*
* dataFrame.registerTempTable("people")
* dataFrame.createOrReplaceTempView("people")
* sparkSession.sql("select name from people").collect.foreach(println)
* }}}
*
@ -515,17 +515,16 @@ class SparkSession private(
}
/**
* Registers the given [[DataFrame]] as a temporary table in the catalog.
* Temporary tables exist only during the lifetime of this instance of [[SparkSession]].
* Creates a temporary view with a DataFrame. The lifetime of this temporary view is tied to
* this [[SparkSession]].
*/
protected[sql] def registerTable(df: DataFrame, tableName: String): Unit = {
sessionState.catalog.createTempTable(
sessionState.sqlParser.parseTableIdentifier(tableName).table,
df.logicalPlan,
overrideIfExists = true)
protected[sql] def createTempView(
viewName: String, df: DataFrame, replaceIfExists: Boolean) = {
sessionState.catalog.createTempView(
sessionState.sqlParser.parseTableIdentifier(viewName).table,
df.logicalPlan, replaceIfExists)
}
/* ----------------- *
| Everything else |
* ----------------- */

View file

@ -175,13 +175,13 @@ abstract class Catalog {
options: Map[String, String]): DataFrame
/**
* Drops the temporary table with the given table name in the catalog.
* If the table has been cached before, then it will also be uncached.
* Drops the temporary view with the given view name in the catalog.
* If the view has been cached before, then it will also be uncached.
*
* @param tableName the name of the table to be dropped.
* @param viewName the name of the view to be dropped.
* @since 2.0.0
*/
def dropTempTable(tableName: String): Unit
def dropTempView(viewName: String): Unit
/**
* Returns true if the table is currently cached in-memory.

View file

@ -753,7 +753,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (temp) {
throw new ParseException(
"CREATE TEMPORARY TABLE is not supported yet. " +
"Please use registerTempTable as an alternative.", ctx)
"Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
}
if (ctx.skewSpec != null) {
throw operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)

View file

@ -30,7 +30,8 @@ case class CacheTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan =>
sparkSession.registerTable(Dataset.ofRows(sparkSession, logicalPlan), tableName)
sparkSession.createTempView(
tableName, Dataset.ofRows(sparkSession, logicalPlan), replaceIfExists = true)
}
sparkSession.catalog.cacheTable(tableName)

View file

@ -136,7 +136,7 @@ case class CreateViewCommand(
}
}
catalog.createTempTable(table.table, logicalPlan, replace)
catalog.createTempView(table.table, logicalPlan, replace)
}
/**

View file

@ -82,7 +82,7 @@ case class CreateTempTableUsing(
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
options = options)
sparkSession.sessionState.catalog.createTempTable(
sparkSession.sessionState.catalog.createTempView(
tableIdent.table,
Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
overrideIfExists = true)
@ -113,7 +113,7 @@ case class CreateTempTableUsingAsSelect(
bucketSpec = None,
options = options)
val result = dataSource.write(mode, df)
sparkSession.sessionState.catalog.createTempTable(
sparkSession.sessionState.catalog.createTempView(
tableIdent.table,
Dataset.ofRows(sparkSession, LogicalRelation(result)).logicalPlan,
overrideIfExists = true)

View file

@ -283,16 +283,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
/**
* Drops the temporary table with the given table name in the catalog.
* If the table has been cached/persisted before, it's also unpersisted.
* Drops the temporary view with the given view name in the catalog.
* If the view has been cached/persisted before, it's also unpersisted.
*
* @param tableName the name of the table to be unregistered.
* @param viewName the name of the view to be dropped.
* @group ddl_ops
* @since 2.0.0
*/
override def dropTempTable(tableName: String): Unit = {
sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(tableName))
sessionCatalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
override def dropTempView(viewName: String): Unit = {
sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(viewName))
sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true)
}
/**

View file

@ -288,7 +288,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
test("Drops temporary table") {
testData.select('key).registerTempTable("t1")
spark.table("t1")
spark.catalog.dropTempTable("t1")
spark.catalog.dropTempView("t1")
intercept[AnalysisException](spark.table("t1"))
}
@ -300,7 +300,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
assert(spark.catalog.isCached("t1"))
assert(spark.catalog.isCached("t2"))
spark.catalog.dropTempTable("t1")
spark.catalog.dropTempView("t1")
intercept[AnalysisException](spark.table("t1"))
assert(!spark.catalog.isCached("t2"))
}
@ -382,7 +382,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
sql("SELECT key, count(*) FROM orderedTable GROUP BY key ORDER BY key"),
sql("SELECT key, count(*) FROM testData3x GROUP BY key ORDER BY key").collect())
spark.catalog.uncacheTable("orderedTable")
spark.catalog.dropTempTable("orderedTable")
spark.catalog.dropTempView("orderedTable")
// Set up two tables distributed in the same way. Try this with the data distributed into
// different number of partitions.

View file

@ -994,17 +994,18 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
// pass case: parquet table (HadoopFsRelation)
df.write.mode(SaveMode.Overwrite).parquet(tempParquetFile.getCanonicalPath)
val pdf = spark.read.parquet(tempParquetFile.getCanonicalPath)
pdf.registerTempTable("parquet_base")
pdf.createOrReplaceTempView("parquet_base")
insertion.write.insertInto("parquet_base")
// pass case: json table (InsertableRelation)
df.write.mode(SaveMode.Overwrite).json(tempJsonFile.getCanonicalPath)
val jdf = spark.read.json(tempJsonFile.getCanonicalPath)
jdf.registerTempTable("json_base")
jdf.createOrReplaceTempView("json_base")
insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")
// error cases: insert into an RDD
df.registerTempTable("rdd_base")
df.createOrReplaceTempView("rdd_base")
val e1 = intercept[AnalysisException] {
insertion.write.insertInto("rdd_base")
}
@ -1012,14 +1013,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
// error case: insert into a logical plan that is not a LeafNode
val indirectDS = pdf.select("_1").filter($"_1" > 5)
indirectDS.registerTempTable("indirect_ds")
indirectDS.createOrReplaceTempView("indirect_ds")
val e2 = intercept[AnalysisException] {
insertion.write.insertInto("indirect_ds")
}
assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed."))
// error case: insert into an OneRowRelation
Dataset.ofRows(spark, OneRowRelation).registerTempTable("one_row")
Dataset.ofRows(spark, OneRowRelation).createOrReplaceTempView("one_row")
val e3 = intercept[AnalysisException] {
insertion.write.insertInto("one_row")
}
@ -1443,13 +1444,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("SPARK-12982: Add table name validation in temp table registration") {
val df = Seq("foo", "bar").map(Tuple1.apply).toDF("col")
// invalid table name test as below
intercept[AnalysisException](df.registerTempTable("t~"))
intercept[AnalysisException](df.createOrReplaceTempView("t~"))
// valid table name test as below
df.registerTempTable("table1")
df.createOrReplaceTempView("table1")
// another invalid table name test as below
intercept[AnalysisException](df.registerTempTable("#$@sum"))
intercept[AnalysisException](df.createOrReplaceTempView("#$@sum"))
// another invalid table name test as below
intercept[AnalysisException](df.registerTempTable("table!#"))
intercept[AnalysisException](df.createOrReplaceTempView("table!#"))
}
test("assertAnalyzed shouldn't replace original stack trace") {

View file

@ -249,7 +249,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
try {
f(tableName)
} finally {
spark.catalog.dropTempTable(tableName)
spark.catalog.dropTempView(tableName)
}
}

View file

@ -22,6 +22,8 @@ import java.sql.{Date, Timestamp}
import scala.language.postfixOps
import org.scalatest.words.MatcherWords.be
import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
@ -674,6 +676,22 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
}.getMessage
assert(message.contains("The 0th field of input row cannot be null"))
}
test("createTempView") {
val dataset = Seq(1, 2, 3).toDS()
dataset.createOrReplaceTempView("tempView")
// Overrrides the existing temporary view with same name
// No exception should be thrown here.
dataset.createOrReplaceTempView("tempView")
// Throws AnalysisException if temp view with same name already exists
val e = intercept[AnalysisException](
dataset.createTempView("tempView"))
intercept[AnalysisException](dataset.createTempView("tempView"))
assert(e.message.contains("already exists"))
dataset.sparkSession.catalog.dropTempView("tempView")
}
}
case class OtherTuple(_1: String, _2: Int)

View file

@ -83,7 +83,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
checkAnswer(
spark.wrapped.tables().filter("tableName = 'tables'").select("tableName", "isTemporary"),
Row("tables", true))
spark.catalog.dropTempTable("tables")
spark.catalog.dropTempView("tables")
}
}
}

View file

@ -333,7 +333,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
"SELECT sum('a'), avg('a'), count(null) FROM testData",
Row(null, null, 0) :: Nil)
} finally {
spark.catalog.dropTempTable("testData3x")
spark.catalog.dropTempView("testData3x")
}
}
@ -1453,12 +1453,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
spark.read.json(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil))
.registerTempTable("data")
checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), Row(1))
spark.catalog.dropTempTable("data")
spark.catalog.dropTempView("data")
spark.read.json(
sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).registerTempTable("data")
checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), Row(2))
spark.catalog.dropTempTable("data")
spark.catalog.dropTempView("data")
}
test("SPARK-4432 Fix attribute reference resolution error when using ORDER BY") {

View file

@ -55,7 +55,7 @@ class UDFSuite extends QueryTest with SharedSQLContext {
val df = Seq((1, "Tearing down the walls that divide us")).toDF("id", "saying")
df.registerTempTable("tmp_table")
checkAnswer(sql("select spark_partition_id() from tmp_table").toDF(), Row(0))
spark.catalog.dropTempTable("tmp_table")
spark.catalog.dropTempView("tmp_table")
}
test("SPARK-8005 input_file_name") {
@ -66,7 +66,7 @@ class UDFSuite extends QueryTest with SharedSQLContext {
val answer = sql("select input_file_name() from test_table").head().getString(0)
assert(answer.contains(dir.getCanonicalPath))
assert(sql("select input_file_name() from test_table").distinct().collect().length >= 2)
spark.catalog.dropTempTable("test_table")
spark.catalog.dropTempView("test_table")
}
}

View file

@ -191,7 +191,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(
sql(s"SELECT DISTINCT ${allColumns} FROM InMemoryCache_different_data_types"),
spark.table("InMemoryCache_different_data_types").collect())
spark.catalog.dropTempTable("InMemoryCache_different_data_types")
spark.catalog.dropTempView("InMemoryCache_different_data_types")
}
test("SPARK-10422: String column in InMemoryColumnarCache needs to override clone method") {

View file

@ -52,7 +52,7 @@ object ParquetReadBenchmark {
}
def withTempTable(tableNames: String*)(f: => Unit): Unit = {
try f finally tableNames.foreach(spark.catalog.dropTempTable)
try f finally tableNames.foreach(spark.catalog.dropTempView)
}
def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {

View file

@ -58,7 +58,7 @@ class CatalogSuite
}
private def createTempTable(name: String): Unit = {
sessionCatalog.createTempTable(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true)
sessionCatalog.createTempView(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true)
}
private def dropTable(name: String, db: Option[String] = None): Unit = {

View file

@ -153,7 +153,7 @@ private[sql] trait SQLTestUtils
try f finally {
// If the test failed part way, we don't want to mask the failure by failing to remove
// temp tables that never got created.
try tableNames.foreach(spark.catalog.dropTempTable) catch {
try tableNames.foreach(spark.catalog.dropTempView) catch {
case _: NoSuchTableException =>
}
}

View file

@ -130,7 +130,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
options = options)
LogicalRelation(
dataSource.resolveRelation(),
dataSource.resolveRelation(checkPathExist = true),
metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database))))
}
}

View file

@ -31,7 +31,7 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd
override protected def beforeEach(): Unit = {
super.beforeEach()
if (spark.wrapped.tableNames().contains("src")) {
spark.catalog.dropTempTable("src")
spark.catalog.dropTempView("src")
}
Seq((1, "")).toDF("key", "value").registerTempTable("src")
Seq((1, 1, 1)).toDF("a", "a", "b").registerTempTable("dupAttributes")
@ -39,8 +39,8 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd
override protected def afterEach(): Unit = {
try {
spark.catalog.dropTempTable("src")
spark.catalog.dropTempTable("dupAttributes")
spark.catalog.dropTempView("src")
spark.catalog.dropTempView("dupAttributes")
} finally {
super.afterEach()
}

View file

@ -348,7 +348,7 @@ class HiveDDLCommandSuite extends PlanTest {
test("create table - temporary") {
val query = "CREATE TEMPORARY TABLE tab1 (id int, name string)"
val e = intercept[ParseException] { parser.parsePlan(query) }
assert(e.message.contains("registerTempTable"))
assert(e.message.contains("CREATE TEMPORARY TABLE is not supported yet"))
}
test("create table - external") {

View file

@ -33,7 +33,7 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
override def beforeAll(): Unit = {
super.beforeAll()
// The catalog in HiveContext is a case insensitive one.
sessionState.catalog.createTempTable(
sessionState.catalog.createTempView(
"ListTablesSuiteTable", df.logicalPlan, overrideIfExists = true)
sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")

View file

@ -193,7 +193,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
spark.sql("DROP TABLE IF EXISTS agg1")
spark.sql("DROP TABLE IF EXISTS agg2")
spark.sql("DROP TABLE IF EXISTS agg3")
spark.catalog.dropTempTable("emptyTable")
spark.catalog.dropTempView("emptyTable")
} finally {
super.afterAll()
}

View file

@ -347,7 +347,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode")
}
spark.catalog.dropTempTable("testUDF")
spark.catalog.dropTempView("testUDF")
}
test("Hive UDF in group by") {

View file

@ -353,7 +353,7 @@ class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSi
checkAnswer(actual, expected)
spark.catalog.dropTempTable("nums")
spark.catalog.dropTempView("nums")
}
test("SPARK-7595: Window will cause resolve failed with self join") {