#### What changes were proposed in this pull request?
So far, when using In-Memory Catalog, we allow DDL operations for the tables. However, the corresponding DML operations are not supported for the tables that are neither temporary nor data source tables. For example,
```SQL
CREATE TABLE tabName(i INT, j STRING)
SELECT * FROM tabName
INSERT OVERWRITE TABLE tabName SELECT 1, 'a'
```
In the above example, before this PR fix, we will get very confusing exception messages for either `SELECT` or `INSERT`
```
org.apache.spark.sql.AnalysisException: unresolved operator 'SimpleCatalogRelation default, CatalogTable(`default`.`tbl`,CatalogTableType(MANAGED),CatalogStorageFormat(None,Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(i,int,true,None), CatalogColumn(j,string,true,None)),List(),List(),List(),-1,,1463928681802,-1,Map(),None,None,None,List()), None;
```
This PR is to issue appropriate exceptions in this case. The message will be like
```
org.apache.spark.sql.AnalysisException: Please enable Hive support when operating non-temporary tables: `tbl`;
```
#### How was this patch tested?
Added a test case in `DDLSuite`.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#13093 from gatorsmile/selectAfterCreate.
## What changes were proposed in this pull request?
Currently command `ADD FILE|JAR <filepath | jarpath>` is supported natively in SparkSQL. However, when this command is run, the file/jar is added to the resources that can not be looked up by `LIST FILE(s)|JAR(s)` command because the `LIST` command is passed to Hive command processor in Spark-SQL or simply not supported in Spark-shell. There is no way users can find out what files/jars are added to the spark context.
Refer to [Hive commands](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli)
This PR is to support following commands:
`LIST (FILE[s] [filepath ...] | JAR[s] [jarfile ...])`
### For example:
##### LIST FILE(s)
```
scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt")
res2: org.apache.spark.sql.DataFrame = []
scala> spark.sql("list file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt").show(false)
+----------------------------------------------+
|result |
+----------------------------------------------+
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt|
+----------------------------------------------+
scala> spark.sql("list files").show(false)
+----------------------------------------------+
|result |
+----------------------------------------------+
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt|
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt |
+----------------------------------------------+
```
##### LIST JAR(s)
```
scala> spark.sql("add jar /Users/xinwu/spark/core/src/test/resources/TestUDTF.jar")
res9: org.apache.spark.sql.DataFrame = [result: int]
scala> spark.sql("list jar TestUDTF.jar").show(false)
+---------------------------------------------+
|result |
+---------------------------------------------+
|spark://192.168.1.234:50131/jars/TestUDTF.jar|
+---------------------------------------------+
scala> spark.sql("list jars").show(false)
+---------------------------------------------+
|result |
+---------------------------------------------+
|spark://192.168.1.234:50131/jars/TestUDTF.jar|
+---------------------------------------------+
```
## How was this patch tested?
New test cases are added for Spark-SQL, Spark-Shell and SparkContext API code path.
Author: Xin Wu <xinwu@us.ibm.com>
Author: xin Wu <xinwu@us.ibm.com>
Closes#13212 from xwu0226/list_command.
## What changes were proposed in this pull request?
Spark assumes that UDF functions are deterministic. This PR adds explicit notes about that.
## How was this patch tested?
It's only about docs.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#13087 from dongjoon-hyun/SPARK-15282.
## What changes were proposed in this pull request?
The user may do something like:
```
CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS PARQUET
CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS ... SERDE 'myserde'
CREATE TABLE my_tab ROW FORMAT DELIMITED ... STORED AS ORC
CREATE TABLE my_tab ROW FORMAT DELIMITED ... STORED AS ... SERDE 'myserde'
```
None of these should be allowed because the SerDe's conflict. As of this patch:
- `ROW FORMAT DELIMITED` is only compatible with `TEXTFILE`
- `ROW FORMAT SERDE` is only compatible with `TEXTFILE`, `RCFILE` and `SEQUENCEFILE`
## How was this patch tested?
New tests in `DDLCommandSuite`.
Author: Andrew Or <andrew@databricks.com>
Closes#13068 from andrewor14/row-format-conflict.
## What changes were proposed in this pull request?
1. simplify the logic of deserializing option type.
2. simplify the logic of serializing array type, and remove silentSchemaFor
3. remove some unnecessary code.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13250 from cloud-fan/encoder.
## What changes were proposed in this pull request?
When invalid date string like "2015-02-29 00:00:00" are cast as date or timestamp using spark sql, it used to not return null but another valid date (2015-03-01 in this case).
In this pr, invalid date string like "2016-02-29" and "2016-04-31" are returned as null when cast as date or timestamp.
## How was this patch tested?
Unit tests are added.
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: wangyang <wangyang@haizhi.com>
Closes#13169 from wangyang1992/invalid_date.
## What changes were proposed in this pull request?
Fix some typos while browsing the codes.
## How was this patch tested?
None and obvious.
Author: Bo Meng <mengbo@hotmail.com>
Author: bomeng <bmeng@us.ibm.com>
Closes#13246 from bomeng/typo.
## What changes were proposed in this pull request?
Incrementalizing plans of with multiple streaming aggregation is tricky and we dont have the necessary support for "delta" to implement correctly. So disabling the support for multiple streaming aggregations.
## How was this patch tested?
Additional unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#13210 from tdas/SPARK-15428.
## What changes were proposed in this pull request?
This patch simplifies the implementation of Range operator and make the explain string consistent between logical plan and physical plan. To do this, I changed RangeExec to embed a Range logical plan in it.
Before this patch (note that the logical Range and physical Range actually output different information):
```
== Optimized Logical Plan ==
Range 0, 100, 2, 2, [id#8L]
== Physical Plan ==
*Range 0, 2, 2, 50, [id#8L]
```
After this patch:
If step size is 1:
```
== Optimized Logical Plan ==
Range(0, 100, splits=2)
== Physical Plan ==
*Range(0, 100, splits=2)
```
If step size is not 1:
```
== Optimized Logical Plan ==
Range (0, 100, step=2, splits=2)
== Physical Plan ==
*Range (0, 100, step=2, splits=2)
```
## How was this patch tested?
N/A
Author: Reynold Xin <rxin@databricks.com>
Closes#13239 from rxin/SPARK-15459.
#### What changes were proposed in this pull request?
When there are duplicate keys in the partition specs or table properties, we always use the last value and ignore all the previous values. This is caused by the function call `toMap`.
partition specs or table properties are widely used in multiple DDL statements.
This PR is to detect the duplicates and issue an exception if found.
#### How was this patch tested?
Added test cases in DDLSuite
Author: gatorsmile <gatorsmile@gmail.com>
Closes#13095 from gatorsmile/detectDuplicate.
## What changes were proposed in this pull request?
This PR makes BroadcastHint more deterministic by using a special isBroadcastable property
instead of setting the sizeInBytes to 1.
See https://issues.apache.org/jira/browse/SPARK-15415
## How was this patch tested?
Added testcases to test if the broadcast hash join is included in the plan when the BroadcastHint is supplied and also tests for propagation of the joins.
Author: Jurriaan Pruis <email@jurriaanpruis.nl>
Closes#13244 from jurriaan/broadcast-hint.
#### What changes were proposed in this pull request?
Like `Set` Command in Hive, `Reset` is also supported by Hive. See the link: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli
Below is the related Hive JIRA: https://issues.apache.org/jira/browse/HIVE-3202
This PR is to implement such a command for resetting the SQL-related configuration to the default values. One of the use case shown in HIVE-3202 is listed below:
> For the purpose of optimization we set various configs per query. It's worthy but all those configs should be reset every time for next query.
#### How was this patch tested?
Added a test case.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#13121 from gatorsmile/resetCommand.
## What changes were proposed in this pull request?
Generate a shorter default alias for `AggregateExpression `, In this PR, aggregate function name along with a index is used for generating the alias name.
```SQL
val ds = Seq(1, 3, 2, 5).toDS()
ds.select(typed.sum((i: Int) => i), typed.avg((i: Int) => i)).show()
```
Output before change.
```SQL
+-----------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------+
|typedsumdouble(unresolveddeserializer(upcast(input[0, int], IntegerType, - root class: "scala.Int"), value#1), upcast(value))|typedaverage(unresolveddeserializer(upcast(input[0, int], IntegerType, - root class: "scala.Int"), value#1), newInstance(class scala.Tuple2))|
+-----------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------+
| 11.0| 2.75|
+-----------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------+
```
Output after change:
```SQL
+-----------------+---------------+
|typedsumdouble_c1|typedaverage_c2|
+-----------------+---------------+
| 11.0| 2.75|
+-----------------+---------------+
```
Note: There is one test in ParquetSuites.scala which shows that that the system picked alias
name is not usable and is rejected. [test](https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala#L672-#L687)
## How was this patch tested?
A new test was added in DataSetAggregatorSuite.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#13045 from dilipbiswal/spark-15114.
## What changes were proposed in this pull request?
In only `catalyst` module, there exists 8 evaluation test cases on unresolved expressions. But, in real-world situation, those cases doesn't happen since they occurs exceptions before evaluations.
```scala
scala> sql("select format_number(null, 3)")
res0: org.apache.spark.sql.DataFrame = [format_number(CAST(NULL AS DOUBLE), 3): string]
scala> sql("select format_number(cast(null as NULL), 3)")
org.apache.spark.sql.catalyst.parser.ParseException:
DataType null() is not supported.(line 1, pos 34)
```
This PR makes those testcases more realistic.
```scala
- checkEvaluation(FormatNumber(Literal.create(null, NullType), Literal(3)), null)
+ assert(FormatNumber(Literal.create(null, NullType), Literal(3)).resolved === false)
```
Also, this PR also removes redundant `resolved` checking in `FoldablePropagation` optimizer.
## How was this patch tested?
Pass the modified Jenkins tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#13241 from dongjoon-hyun/SPARK-15462.
## What changes were proposed in this pull request?
Using longValue() and then checking whether the value is in the range for a long manually.
## How was this patch tested?
Existing tests
Author: Sandeep Singh <sandeep@techaddict.me>
Closes#13223 from techaddict/SPARK-15445.
## What changes were proposed in this pull request?
Currently, the explain of a query with whole-stage codegen looks like this
```
>>> df = sqlCtx.range(1000);df2 = sqlCtx.range(1000);df.join(pyspark.sql.functions.broadcast(df2), 'id').explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [id#1L]
: +- BroadcastHashJoin [id#1L], [id#4L], Inner, BuildRight, None
: :- Range 0, 1, 4, 1000, [id#1L]
: +- INPUT
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint]))
+- WholeStageCodegen
: +- Range 0, 1, 4, 1000, [id#4L]
```
The problem is that the plan looks much different than logical plan, make us hard to understand the plan (especially when the logical plan is not showed together).
This PR will change it to:
```
>>> df = sqlCtx.range(1000);df2 = sqlCtx.range(1000);df.join(pyspark.sql.functions.broadcast(df2), 'id').explain()
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight, None
:- *Range 0, 1, 4, 1000, [id#0L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range 0, 1, 4, 1000, [id#3L]
```
The `*`before the plan means that it's part of whole-stage codegen, it's easy to understand.
## How was this patch tested?
Manually ran some queries and check the explain.
Author: Davies Liu <davies@databricks.com>
Closes#13204 from davies/explain_codegen.
## What changes were proposed in this pull request?
Right now inferring the schema for case classes happens before searching the SQLUserDefinedType annotation, so the SQLUserDefinedType annotation for case classes doesn't work.
This PR simply changes the inferring order to resolve it. I also reenabled the java.math.BigDecimal test and added two tests for `List`.
## How was this patch tested?
`encodeDecodeTest(UDTCaseClass(new java.net.URI("http://spark.apache.org/")), "udt with case class")`
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#12965 from zsxwing/SPARK-15190.
## What changes were proposed in this pull request?
This PR introduce place holder for comment in generated code and the purpose is same for #12939 but much safer.
Generated code to be compiled doesn't include actual comments but includes place holder instead.
Place holders in generated code will be replaced with actual comments only at the time of logging.
Also, this PR can resolve SPARK-15205.
## How was this patch tested?
Existing tests.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#12979 from sarutak/SPARK-15205.
## What changes were proposed in this pull request?
`CreateNamedStruct` and `CreateNamedStructUnsafe` should preserve metadata of value expressions if it is `NamedExpression` like `CreateStruct` or `CreateStructUnsafe` are doing.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#13193 from ueshin/issues/SPARK-15400.
## What changes were proposed in this pull request?
The following code:
```
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
ds.filter(_._1 == "b").select(expr("_1").as[String]).foreach(println(_))
```
throws an Exception:
```
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: _1#420
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
...
Cause: java.lang.RuntimeException: Couldn't find _1#420 in [_1#416,_2#417]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
...
```
This is because `EmbedSerializerInFilter` rule drops the `exprId`s of output of surrounded `SerializeFromObject`.
The analyzed and optimized plans of the above example are as follows:
```
== Analyzed Logical Plan ==
_1: string
Project [_1#420]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, scala.Tuple2]._1, true) AS _1#420,input[0, scala.Tuple2]._2 AS _2#421]
+- Filter <function1>.apply
+- DeserializeToObject newInstance(class scala.Tuple2), obj#419: scala.Tuple2
+- LocalRelation [_1#416,_2#417], [[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
== Optimized Logical Plan ==
!Project [_1#420]
+- Filter <function1>.apply
+- LocalRelation [_1#416,_2#417], [[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
```
This PR fixes `EmbedSerializerInFilter` rule to keep `exprId`s of output of surrounded `SerializeFromObject`.
The plans after this patch are as follows:
```
== Analyzed Logical Plan ==
_1: string
Project [_1#420]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, scala.Tuple2]._1, true) AS _1#420,input[0, scala.Tuple2]._2 AS _2#421]
+- Filter <function1>.apply
+- DeserializeToObject newInstance(class scala.Tuple2), obj#419: scala.Tuple2
+- LocalRelation [_1#416,_2#417], [[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
== Optimized Logical Plan ==
Project [_1#416]
+- Filter <function1>.apply
+- LocalRelation [_1#416,_2#417], [[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
```
## How was this patch tested?
Existing tests and I added a test to check if `filter and then select` works.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes#13096 from ueshin/issues/SPARK-15313.
## What changes were proposed in this pull request?
This patch fixes a bug in TypeUtils.checkForSameTypeInputExpr. Previously the code was testing on strict equality, which does not taking nullability into account.
This is based on https://github.com/apache/spark/pull/12768. This patch fixed a bug there (with empty expression) and added a test case.
## How was this patch tested?
Added a new test suite and test case.
Closes#12768.
Author: Reynold Xin <rxin@databricks.com>
Author: Oleg Danilov <oleg.danilov@wandisco.com>
Closes#13208 from rxin/SPARK-14990.
Hello : Can you help check this PR? I am adding support for the java.math.BigInteger for java bean code path. I saw internally spark is converting the BigInteger to BigDecimal in ColumnType.scala and CatalystRowConverter.scala. I use the similar way and convert the BigInteger to the BigDecimal. .
Author: Kevin Yu <qyu@us.ibm.com>
Closes#10125 from kevinyu98/working_on_spark-11827.
## What changes were proposed in this pull request?
This PR is a follow-up of #13079. It replaces `hasUnsupportedFeatures: Boolean` in `CatalogTable` with `unsupportedFeatures: Seq[String]`, which contains unsupported Hive features of the underlying Hive table. In this way, we can accurately report all unsupported Hive features in the exception message.
## How was this patch tested?
Updated existing test case to check exception message.
Author: Cheng Lian <lian@databricks.com>
Closes#13173 from liancheng/spark-14346-follow-up.
## What changes were proposed in this pull request?
After #12871 is fixed, we are forced to make `/user/hive/warehouse` when SimpleAnalyzer is used but SimpleAnalyzer may not need the directory.
## How was this patch tested?
Manual test.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#13175 from sarutak/SPARK-15387.
#### What changes were proposed in this pull request?
This follow-up PR is to address the remaining comments in https://github.com/apache/spark/pull/12385
The major change in this PR is to issue better error messages in PySpark by using the mechanism that was proposed by davies in https://github.com/apache/spark/pull/7135
For example, in PySpark, if we input the following statement:
```python
>>> l = [('Alice', 1)]
>>> df = sqlContext.createDataFrame(l)
>>> df.createTempView("people")
>>> df.createTempView("people")
```
Before this PR, the exception we will get is like
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/dataframe.py", line 152, in createTempView
self._jdf.createTempView(name)
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o35.createTempView.
: org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: Temporary table 'people' already exists;
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTempView(SessionCatalog.scala:324)
at org.apache.spark.sql.SparkSession.createTempView(SparkSession.scala:523)
at org.apache.spark.sql.Dataset.createTempView(Dataset.scala:2328)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)
```
After this PR, the exception we will get become cleaner:
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/dataframe.py", line 152, in createTempView
self._jdf.createTempView(name)
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/utils.py", line 75, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Temporary table 'people' already exists;"
```
#### How was this patch tested?
Fixed an existing PySpark test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#13126 from gatorsmile/followup-14684.
## What changes were proposed in this pull request?
This PR aims to add new **FoldablePropagation** optimizer that propagates foldable expressions by replacing all attributes with the aliases of original foldable expression. Other optimizations will take advantage of the propagated foldable expressions: e.g. `EliminateSorts` optimizer now can handle the following Case 2 and 3. (Case 1 is the previous implementation.)
1. Literals and foldable expression, e.g. "ORDER BY 1.0, 'abc', Now()"
2. Foldable ordinals, e.g. "SELECT 1.0, 'abc', Now() ORDER BY 1, 2, 3"
3. Foldable aliases, e.g. "SELECT 1.0 x, 'abc' y, Now() z ORDER BY x, y, z"
This PR has been generalized based on cloud-fan 's key ideas many times; he should be credited for the work he did.
**Before**
```
scala> sql("SELECT 1.0, Now() x ORDER BY 1, x").explain
== Physical Plan ==
WholeStageCodegen
: +- Sort [1.0#5 ASC,x#0 ASC], true, 0
: +- INPUT
+- Exchange rangepartitioning(1.0#5 ASC, x#0 ASC, 200), None
+- WholeStageCodegen
: +- Project [1.0 AS 1.0#5,1461873043577000 AS x#0]
: +- INPUT
+- Scan OneRowRelation[]
```
**After**
```
scala> sql("SELECT 1.0, Now() x ORDER BY 1, x").explain
== Physical Plan ==
WholeStageCodegen
: +- Project [1.0 AS 1.0#5,1461873079484000 AS x#0]
: +- INPUT
+- Scan OneRowRelation[]
```
## How was this patch tested?
Pass the Jenkins tests including a new test case.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#12719 from dongjoon-hyun/SPARK-14939.
## What changes were proposed in this pull request?
Whole Stage Codegen depends on `SparkPlan.reference` to do some optimization. For physical object operators, they should be consistent with their logical version and set the `reference` correctly.
## How was this patch tested?
new test in DatasetSuite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13167 from cloud-fan/bug.
## What changes were proposed in this pull request?
This PR adds null check in `SparkSession.createDataFrame`, so that we can make sure the passed in rows matches the given schema.
## How was this patch tested?
new tests in `DatasetSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13008 from cloud-fan/row-encoder.
## What changes were proposed in this pull request?
This PR removes unused pattern matching variable in Optimizers in order to improve readability.
## How was this patch tested?
Pass the existing Jenkins tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#13145 from dongjoon-hyun/remove_unused_pattern_matching_variables.
## What changes were proposed in this pull request?
Scala 2.10 build was broken by #13079. I am reverting the change of that line.
Author: Yin Huai <yhuai@databricks.com>
Closes#13157 from yhuai/SPARK-14346-fix-scala2.10.
## What changes were proposed in this pull request?
This is a follow-up of #12781. It adds native `SHOW CREATE TABLE` support for Hive tables and views. A new field `hasUnsupportedFeatures` is added to `CatalogTable` to indicate whether all table metadata retrieved from the concrete underlying external catalog (i.e. Hive metastore in this case) can be mapped to fields in `CatalogTable`. This flag is useful when the target Hive table contains structures that can't be handled by Spark SQL, e.g., skewed columns and storage handler, etc..
## How was this patch tested?
New test cases are added in `ShowCreateTableSuite` to do round-trip tests.
Author: Cheng Lian <lian@databricks.com>
Closes#13079 from liancheng/spark-14346-show-create-table-for-hive-tables.
## What changes were proposed in this pull request?
toCommentSafeString method replaces "\u" with "\\\\u" to avoid codegen breaking.
But if the even number of "\" is put before "u", like "\\\\u", in the string literal in the query, codegen can break.
Following code causes compilation error.
```
val df = Seq(...).toDF
df.select("'\\\\\\\\u002A/'").show
```
The reason of the compilation error is because "\\\\\\\\\\\\\\\\u002A/" is translated into "*/" (the end of comment).
Due to this unsafety, arbitrary code can be injected like as follows.
```
val df = Seq(...).toDF
// Inject "System.exit(1)"
df.select("'\\\\\\\\u002A/{System.exit(1);}/*'").show
```
## How was this patch tested?
Added new test cases.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Author: sarutak <sarutak@oss.nttdata.co.jp>
Closes#12939 from sarutak/SPARK-15165.
## What changes were proposed in this pull request?
This PR improves `RowEncoder` and `MapObjects`, to support array as the external type for `ArrayType`. The idea is straightforward, we use `Object` as the external input type for `ArrayType`, and determine its type at runtime in `MapObjects`.
## How was this patch tested?
new test in `RowEncoderSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13138 from cloud-fan/map-object.
## What changes were proposed in this pull request?
We originally designed the type coercion rules to match Hive, but over time we have diverged. It does not make sense to call it HiveTypeCoercion anymore. This patch renames it TypeCoercion.
## How was this patch tested?
Updated unit tests to reflect the rename.
Author: Reynold Xin <rxin@databricks.com>
Closes#13091 from rxin/SPARK-15310.
## What changes were proposed in this pull request?
This patch adds support for a few SQL functions to improve compatibility with other databases: IFNULL, NULLIF, NVL and NVL2. In order to do this, this patch introduced a RuntimeReplaceable expression trait that allows replacing an unevaluable expression in the optimizer before evaluation.
Note that the semantics are not completely identical to other databases in esoteric cases.
## How was this patch tested?
Added a new test suite SQLCompatibilityFunctionSuite.
Closes#12373.
Author: Reynold Xin <rxin@databricks.com>
Closes#13084 from rxin/SPARK-14541.
## What changes were proposed in this pull request?
This patch moves all the object related expressions into expressions.objects package, for better code organization.
## How was this patch tested?
N/A
Author: Reynold Xin <rxin@databricks.com>
Closes#13085 from rxin/SPARK-15306.
## What changes were proposed in this pull request?
We currently use the Hive implementations for the collect_list/collect_set aggregate functions. This has a few major drawbacks: the use of HiveUDAF (which has quite a bit of overhead) and the lack of support for struct datatypes. This PR adds native implementation of these functions to Spark.
The size of the collected list/set may vary, this means we cannot use the fast, Tungsten, aggregation path to perform the aggregation, and that we fallback to the slower sort based path. Another big issue with these operators is that when the size of the collected list/set grows too large, we can start experiencing large GC pauzes and OOMEs.
This `collect*` aggregates implemented in this PR rely on the sort based aggregate path for correctness. They maintain their own internal buffer which holds the rows for one group at a time. The sortbased aggregation path is triggered by disabling `partialAggregation` for these aggregates (which is kinda funny); this technique is also employed in `org.apache.spark.sql.hiveHiveUDAFFunction`.
I have done some performance testing:
```scala
import org.apache.spark.sql.{Dataset, Row}
sql("create function collect_list2 as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList'")
val df = range(0, 10000000).select($"id", (rand(213123L) * 100000).cast("int").as("grp"))
df.select(countDistinct($"grp")).show
def benchmark(name: String, plan: Dataset[Row], maxItr: Int = 5): Unit = {
// Do not measure planning.
plan1.queryExecution.executedPlan
// Execute the plan a number of times and average the result.
val start = System.nanoTime
var i = 0
while (i < maxItr) {
plan.rdd.foreach(row => Unit)
i += 1
}
val time = (System.nanoTime - start) / (maxItr * 1000000L)
println(s"[$name] $maxItr iterations completed in an average time of $time ms.")
}
val plan1 = df.groupBy($"grp").agg(collect_list($"id"))
val plan2 = df.groupBy($"grp").agg(callUDF("collect_list2", $"id"))
benchmark("Spark collect_list", plan1)
...
> [Spark collect_list] 5 iterations completed in an average time of 3371 ms.
benchmark("Hive collect_list", plan2)
...
> [Hive collect_list] 5 iterations completed in an average time of 9109 ms.
```
Performance is improved by a factor 2-3.
## How was this patch tested?
Added tests to `DataFrameAggregateSuite`.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#12874 from hvanhovell/implode.
#### What changes were proposed in this pull request?
~~Currently, multiple partitions are allowed to drop by using a single DDL command: Alter Table Drop Partition. However, the internal implementation could break atomicity. That means, we could just drop a subset of qualified partitions, if hitting an exception when dropping one of qualified partitions~~
~~This PR contains the following behavior changes:~~
~~- disallow dropping multiple partitions by a single command ~~
~~- allow users to input predicates in partition specification and issue a nicer error message if the predicate's comparison operator is not `=`.~~
~~- verify the partition spec in SessionCatalog. This can ensure each partition spec in `Drop Partition` does not correspond to multiple partitions.~~
This PR has two major parts:
- Verify the partition spec in SessionCatalog for fixing the following issue:
```scala
sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')")
```
Above example uses an invalid partition spec. Without this PR, we will drop all the partitions. The reason is Hive megastores getPartitions API returns all the partitions if we provide an invalid spec.
- Re-implemented the `dropPartitions` in `HiveClientImpl`. Now, we always check if all the user-specified partition specs exist before attempting to drop the partitions. Previously, we start drop the partition before completing checking the existence of all the partition specs. If any failure happened after we start to drop the partitions, we will log an error message to indicate which partitions have been dropped and which partitions have not been dropped.
#### How was this patch tested?
Modified the existing test cases and added new test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#12801 from gatorsmile/banDropMultiPart.
## What changes were proposed in this pull request?
We will eliminate the pair of `DeserializeToObject` and `SerializeFromObject` in `Optimizer` and add extra `Project`. However, when DeserializeToObject's outputObjectType is ObjectType and its cls can't be processed by unsafe project, it will be failed.
To fix it, we can simply remove the extra `Project` and replace the output attribute of `DeserializeToObject` in another rule.
## How was this patch tested?
`DatasetSuite`.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#12926 from viirya/fix-eliminate-serialization-projection.
## What changes were proposed in this pull request?
Deprecates registerTempTable and add dataset.createTempView, dataset.createOrReplaceTempView.
## How was this patch tested?
Unit tests.
Author: Sean Zhong <seanzhong@databricks.com>
Closes#12945 from clockfly/spark-15171.
## What changes were proposed in this pull request?
This PR adds a new rule to convert `SimpleCatalogRelation` to data source table if its table property contains data source information.
## How was this patch tested?
new test in SQLQuerySuite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#12935 from cloud-fan/ds-table.
## What changes were proposed in this pull request?
This PR adds native `SHOW CREATE TABLE` DDL command for data source tables. Support for Hive tables will be added in follow-up PR(s).
To show table creation DDL for data source tables created by CTAS statements, this PR also added partitioning and bucketing support for normal `CREATE TABLE ... USING ...` syntax.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
A new test suite `ShowCreateTableSuite` is added in sql/hive package to test the new feature.
Author: Cheng Lian <lian@databricks.com>
Closes#12781 from liancheng/spark-14346-show-create-table.
## What changes were proposed in this pull request?
After SPARK-14669 it seems the sort time metric includes both spill and record insertion time. This makes it not very useful since the metric becomes close to the total execution time of the node.
We should track just the time spent for in-memory sort, as before.
## How was this patch tested?
Verified metric in the UI, also unit test on UnsafeExternalRowSorter.
cc davies
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>
Closes#13035 from ericl/fix-metrics.
## What changes were proposed in this pull request?
SPARK-15241: We now support java decimal and catalyst decimal in external row, it makes sense to also support scala decimal.
SPARK-15242: This is a long-standing bug, and is exposed after https://github.com/apache/spark/pull/12364, which eliminate the `If` expression if the field is not nullable:
```
val fieldValue = serializerFor(
GetExternalRowField(inputObject, i, externalDataTypeForInput(f.dataType)),
f.dataType)
if (f.nullable) {
If(
Invoke(inputObject, "isNullAt", BooleanType, Literal(i) :: Nil),
Literal.create(null, f.dataType),
fieldValue)
} else {
fieldValue
}
```
Previously, we always use `DecimalType.SYSTEM_DEFAULT` as the output type of converted decimal field, which is wrong as it doesn't match the real decimal type. However, it works well because we always put converted field into `If` expression to do the null check, and `If` use its `trueValue`'s data type as its output type.
Now if we have a not nullable decimal field, then the converted field's output type will be `DecimalType.SYSTEM_DEFAULT`, and we will write wrong data into unsafe row.
The fix is simple, just use the given decimal type as the output type of converted decimal field.
These 2 issues was found at https://github.com/apache/spark/pull/13008
## How was this patch tested?
new tests in RowEncoderSuite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#13019 from cloud-fan/encoder-decimal.
## What changes were proposed in this pull request?
We have a private `UDTRegistration` API to register user defined type. Currently `JavaTypeInference` can't work with it. So `SparkSession.createDataFrame` from a bean class will not correctly infer the schema of the bean class.
## How was this patch tested?
`VectorUDTSuite`.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#13046 from viirya/fix-udt-registry-javatypeinference.
## What changes were proposed in this pull request?
This issue fixes the error message indentation consistently with other set queries (EXCEPT/INTERSECT).
**Before (4 lines)**
```
scala> sql("(select 1) union (select 1, 2)").head
org.apache.spark.sql.AnalysisException:
Unions can only be performed on tables with the same number of columns,
but one table has '2' columns and another table has
'1' columns;
```
**After (one-line)**
```
scala> sql("(select 1) union (select 1, 2)").head
org.apache.spark.sql.AnalysisException: Unions can only be performed on tables with the same number of columns, but one table has '2' columns and another table has '1' columns;
```
**Reference (EXCEPT / INTERSECT)**
```
scala> sql("(select 1) intersect (select 1, 2)").head
org.apache.spark.sql.AnalysisException: Intersect can only be performed on tables with the same number of columns, but the left table has 1 columns and the right has 2;
```
## How was this patch tested?
Manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#13043 from dongjoon-hyun/SPARK-15265.
## What changes were proposed in this pull request?
A Generate with the `outer` flag enabled should always return one or more rows for every input row. The optimizer currently violates this by rewriting `outer` Generates that do not contain columns of the child plan into an unjoined generate, for example:
```sql
select e from a lateral view outer explode(a.b) as e
```
The result of this is that `outer` Generate does not produce output at all when the Generators' input expression is empty. This PR fixes this.
## How was this patch tested?
Added test case to `SQLQuerySuite`.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#12906 from hvanhovell/SPARK-14986.
Since we cannot really trust if the underlying external catalog can throw exceptions when there is an invalid metadata operation, let's do it in SessionCatalog.
- [X] The first step is to unify the error messages issued in Hive-specific Session Catalog and general Session Catalog.
- [X] The second step is to verify the inputs of metadata operations for partitioning-related operations. This is moved to a separate PR: https://github.com/apache/spark/pull/12801
- [X] The third step is to add database existence verification in `SessionCatalog`
- [X] The fourth step is to add table existence verification in `SessionCatalog`
- [X] The fifth step is to add function existence verification in `SessionCatalog`
Add test cases and verify the error messages we issued
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#12385 from gatorsmile/verifySessionAPIs.
## What changes were proposed in this pull request?
This PR fixes SQL building for predicate subqueries and correlated scalar subqueries. It also enables most Hive subquery tests.
## How was this patch tested?
Enabled new tests in HiveComparisionSuite.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#12988 from hvanhovell/SPARK-14773.
#### What changes were proposed in this pull request?
This PR is to address a few existing issues in `EXPLAIN`:
- The `EXPLAIN` options `LOGICAL | FORMATTED | EXTENDED | CODEGEN` should not be 0 or more match. It should 0 or one match. Parser does not allow users to use more than one option in a single command.
- The option `LOGICAL` is not supported. Issue an exception when users specify this option in the command.
- The output of `EXPLAIN ` contains a weird empty line when the output of analyzed plan is empty. We should remove it. For example:
```
== Parsed Logical Plan ==
CreateTable CatalogTable(`t`,CatalogTableType(MANAGED),CatalogStorageFormat(None,Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io. HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(col,int,true,None)),List(),List(),List(),-1,,1462725171656,-1,Map(),None,None,None), false
== Analyzed Logical Plan ==
CreateTable CatalogTable(`t`,CatalogTableType(MANAGED),CatalogStorageFormat(None,Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io. HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(col,int,true,None)),List(),List(),List(),-1,,1462725171656,-1,Map(),None,None,None), false
== Optimized Logical Plan ==
CreateTable CatalogTable(`t`,CatalogTableType(MANAGED),CatalogStorageFormat(None,Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io. HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(col,int,true,None)),List(),List(),List(),-1,,1462725171656,-1,Map(),None,None,None), false
...
```
#### How was this patch tested?
Added and modified a few test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#12991 from gatorsmile/explainCreateTable.
#### What changes were proposed in this pull request?
In Hive Metastore, dropping default database is not allowed. However, in `InMemoryCatalog`, this is allowed.
This PR is to disallow users to drop default database.
#### How was this patch tested?
Previously, we already have a test case in HiveDDLSuite. Now, we also add the same one in DDLSuite
Author: gatorsmile <gatorsmile@gmail.com>
Closes#12962 from gatorsmile/dropDefaultDB.
## What changes were proposed in this pull request?
Before:
```
scala> spark.catalog.listDatabases.show()
+--------------------+-----------+-----------+
| name|description|locationUri|
+--------------------+-----------+-----------+
|Database[name='de...|
|Database[name='my...|
|Database[name='so...|
+--------------------+-----------+-----------+
```
After:
```
+-------+--------------------+--------------------+
| name| description| locationUri|
+-------+--------------------+--------------------+
|default|Default Hive data...|file:/user/hive/w...|
| my_db| This is a database|file:/Users/andre...|
|some_db| |file:/private/var...|
+-------+--------------------+--------------------+
```
## How was this patch tested?
New test in `CatalogSuite`
Author: Andrew Or <andrew@databricks.com>
Closes#13015 from andrewor14/catalog-show.
This patch improves the performance of `InferSchema.compatibleType` and `inferField`. The net result of this patch is a 6x speedup in local benchmarks running against cached data with a massive nested schema.
The key idea is to remove unnecessary sorting in `compatibleType`'s `StructType` merging code. This code takes two structs, merges the fields with matching names, and copies over the unique fields, producing a new schema which is the union of the two structs' schemas. Previously, this code performed a very inefficient `groupBy()` to match up fields with the same name, but this is unnecessary because `inferField` already sorts structs' fields by name: since both lists of fields are sorted, we can simply merge them in a single pass.
This patch also speeds up the existing field sorting in `inferField`: the old sorting code allocated unnecessary intermediate collections, while the new code uses mutable collects and performs in-place sorting.
I rewrote inefficient `equals()` implementations in `StructType` and `Metadata`, significantly reducing object allocations in those methods.
Finally, I replaced a `treeAggregate` call with `fold`: I doubt that `treeAggregate` will benefit us very much because the schemas would have to be enormous to realize large savings in network traffic. Since most schemas are probably fairly small in serialized form, they should typically fit within a direct task result and therefore can be incrementally merged at the driver as individual tasks finish. This change eliminates an entire (short) scheduler stage.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#12750 from JoshRosen/schema-inference-speedups.
`Encoder`'s doc mentions `sqlContext.implicits._`. We should use `sparkSession.implicits._` instead now.
Only doc update.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#13002 from viirya/encoder-doc.
## What changes were proposed in this pull request?
following operations have file system operation now:
1. CREATE DATABASE: create a dir
2. DROP DATABASE: delete the dir
3. CREATE TABLE: create a dir
4. DROP TABLE: delete the dir
5. RENAME TABLE: rename the dir
6. CREATE PARTITIONS: create a dir
7. RENAME PARTITIONS: rename the dir
8. DROP PARTITIONS: drop the dir
## How was this patch tested?
new tests in `ExternalCatalogSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#12871 from cloud-fan/catalog.
## What changes were proposed in this pull request?
This detects a relation's partitioning and adds checks to the analyzer.
If an InsertIntoTable node has no partitioning, it is replaced by the
relation's partition scheme and input columns are correctly adjusted,
placing the partition columns at the end in partition order. If an
InsertIntoTable node has partitioning, it is checked against the table's
reported partitions.
These changes required adding a PartitionedRelation trait to the catalog
interface because Hive's MetastoreRelation doesn't extend
CatalogRelation.
This commit also includes a fix to InsertIntoTable's resolved logic,
which now detects that all expected columns are present, including
dynamic partition columns. Previously, the number of expected columns
was not checked and resolved was true if there were missing columns.
## How was this patch tested?
This adds new tests to the InsertIntoTableSuite that are fixed by this PR.
Author: Ryan Blue <blue@apache.org>
Closes#12239 from rdblue/SPARK-14459-detect-hive-partitioning.
#### What changes were proposed in this pull request?
Currently, if we rename a temp table `Tab1` to another existent temp table `Tab2`. `Tab2` will be silently removed. This PR is to detect it and issue an exception message.
In addition, this PR also detects another issue in the rename table command. When the destination table identifier does have database name, we should not ignore them. That might mean users could rename a regular table.
#### How was this patch tested?
Added two related test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#12959 from gatorsmile/rewriteTable.
#### What changes were proposed in this pull request?
So far, in the implementation of InMemoryCatalog, we do not check if the new/destination table/function/partition exists or not. Thus, we just silently remove the existent table/function/partition.
This PR is to detect them and issue an appropriate exception.
#### How was this patch tested?
Added the related test cases. They also verify if HiveExternalCatalog also detects these errors.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#12960 from gatorsmile/renameInMemoryCatalog.
## What changes were proposed in this pull request?
The official TPC-DS 41 query currently fails because it contains a scalar subquery with a disjunctive correlated predicate (the correlated predicates were nested in ORs). This makes the `Analyzer` pull out the entire predicate which is wrong and causes the following (correct) analysis exception: `The correlated scalar subquery can only contain equality predicates`
This PR fixes this by first simplifing (or normalizing) the correlated predicates before pulling them out of the subquery.
## How was this patch tested?
Manual testing on TPC-DS 41, and added a test to SubquerySuite.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#12954 from hvanhovell/SPARK-15122.
#### What changes were proposed in this pull request?
When Describe a UDTF, the command returns a wrong result. The command is unable to find the function, which has been created and cataloged in the catalog but not in the functionRegistry.
This PR is to correct it. If the function is not in the functionRegistry, we will check the catalog for collecting the information of the UDTF function.
#### How was this patch tested?
Added test cases to verify the results
Author: gatorsmile <gatorsmile@gmail.com>
Closes#12885 from gatorsmile/showFunction.
## What changes were proposed in this pull request?
Minor doc and code style fixes
## How was this patch tested?
local build
Author: Jacek Laskowski <jacek@japila.pl>
Closes#12928 from jaceklaskowski/SPARK-15152.
## What changes were proposed in this pull request?
Went through SparkSession and its members and fixed non-thread-safe classes used by SparkSession
## How was this patch tested?
Existing unit tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#12915 from zsxwing/spark-session-thread-safe.
#### What changes were proposed in this pull request?
First, a few test cases failed in mac OS X because the property value of `java.io.tmpdir` does not include a trailing slash on some platform. Hive always removes the last trailing slash. For example, what I got in the web:
```
Win NT --> C:\TEMP\
Win XP --> C:\TEMP
Solaris --> /var/tmp/
Linux --> /var/tmp
```
Second, a couple of test cases are added to verify if the commands work properly.
#### How was this patch tested?
Added a test case for it and correct the previous test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes#12081 from gatorsmile/mkdir.
## What changes were proposed in this pull request?
The problem is: In `RowEncoder`, we use `Invoke` to get the field of an external row, which lose the nullability information. This PR creates a `GetExternalRowField` expression, so that we can preserve the nullability info.
TODO: simplify the null handling logic in `RowEncoder`, to remove so many if branches, in follow-up PR.
## How was this patch tested?
new tests in `RowEncoderSuite`
Note that, This PR takes over https://github.com/apache/spark/pull/11980, with a little simplification, so all credits should go to koertkuipers
Author: Wenchen Fan <wenchen@databricks.com>
Author: Koert Kuipers <koert@tresata.com>
Closes#12364 from cloud-fan/nullable.
## What changes were proposed in this pull request?
Similar to #11990, GenerateOrdering and GenerateColumnAccessor should print debug log for generated code with proper indentation.
## How was this patch tested?
Manually checked.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#12908 from sarutak/SPARK-15132.
## What changes were proposed in this pull request?
This PR support new SQL syntax CREATE TEMPORARY VIEW.
Like:
```
CREATE TEMPORARY VIEW viewName AS SELECT * from xx
CREATE OR REPLACE TEMPORARY VIEW viewName AS SELECT * from xx
CREATE TEMPORARY VIEW viewName (c1 COMMENT 'blabla', c2 COMMENT 'blabla') AS SELECT * FROM xx
```
## How was this patch tested?
Unit tests.
Author: Sean Zhong <clockfly@gmail.com>
Closes#12872 from clockfly/spark-6399.
## What changes were proposed in this pull request?
We can support subexpression elimination in TungstenAggregate by using current `EquivalentExpressions` which is already used in subexpression elimination for expression codegen.
However, in wholestage codegen, we can't wrap the common expression's codes in functions as before, we simply generate the code snippets for common expressions. These code snippets are inserted before the common expressions are actually used in generated java codes.
For multiple `TypedAggregateExpression` used in aggregation operator, since their input type should be the same. So their `inputDeserializer` will be the same too. This patch can also reduce redundant input deserialization.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#12729 from viirya/subexpr-elimination-tungstenaggregate.
## What changes were proposed in this pull request?
This PR improve the error message for `Generate` in 3 cases:
1. generator is nested in expressions, e.g. `SELECT explode(list) + 1 FROM tbl`
2. generator appears more than one time in SELECT, e.g. `SELECT explode(list), explode(list) FROM tbl`
3. generator appears in other operator which is not project, e.g. `SELECT * FROM tbl SORT BY explode(list)`
## How was this patch tested?
new tests in `AnalysisErrorSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#12810 from cloud-fan/bug.
## What changes were proposed in this pull request?
Just a bunch of small tweaks on DDL exception messages.
## How was this patch tested?
`DDLCommandSuite` et al.
Author: Andrew Or <andrew@databricks.com>
Closes#12853 from andrewor14/make-exceptions-consistent.
#### What changes were proposed in this pull request?
Compared with the current Spark parser, there are two extra syntax are supported in Hive for sampling
- In `On` clauses, `rand()` is used for indicating sampling on the entire row instead of an individual column. For example,
```SQL
SELECT * FROM source TABLESAMPLE(BUCKET 3 OUT OF 32 ON rand()) s;
```
- Users can specify the total length to be read. For example,
```SQL
SELECT * FROM source TABLESAMPLE(100M) s;
```
Below is the link for references:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Sampling
This PR is to parse and capture these two extra syntax, and issue a better error message.
#### How was this patch tested?
Added test cases to verify the thrown exceptions
Author: gatorsmile <gatorsmile@gmail.com>
Closes#12838 from gatorsmile/bucketOnRand.
## What changes were proposed in this pull request?
This is a follow up PR for #11583. It makes 3 lazy vals into just vals and adds unit test coverage.
## How was this patch tested?
Existing unit tests and additional unit tests.
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#12861 from aray/fast-pivot-follow-up.
## What changes were proposed in this pull request?
Make serializer correctly inferred if the input type is `List[_]`, since `List[_]` is type of `Seq[_]`, before it was matched to different case (`case t if definedByConstructorParams(t)`).
## How was this patch tested?
New test case was added.
Author: bomeng <bmeng@us.ibm.com>
Closes#12849 from bomeng/SPARK-15062.
## What changes were proposed in this pull request?
This PR addresses a few minor issues in SQL parser:
- Removes some unused rules and keywords in the grammar.
- Removes code path for fallback SQL parsing (was needed for Hive native parsing).
- Use `UnresolvedGenerator` instead of hard-coding `Explode` & `JsonTuple`.
- Adds a more generic way of creating error messages for unsupported Hive features.
- Use `visitFunctionName` as much as possible.
- Interpret a `CatalogColumn`'s `DataType` directly instead of parsing it again.
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#12826 from hvanhovell/SPARK-15047.
## What changes were proposed in this pull request?
In this PR we add support for correlated scalar subqueries. An example of such a query is:
```SQL
select * from tbl1 a where a.value > (select max(value) from tbl2 b where b.key = a.key)
```
The implementation adds the `RewriteCorrelatedScalarSubquery` rule to the Optimizer. This rule plans these subqueries using `LEFT OUTER` joins. It currently supports rewrites for `Project`, `Aggregate` & `Filter` logical plans.
I could not find a well defined semantics for the use of scalar subqueries in an `Aggregate`. The current implementation currently evaluates the scalar subquery *before* aggregation. This means that you either have to make scalar subquery part of the grouping expression, or that you have to aggregate it further on. I am open to suggestions on this.
The implementation currently forces the uniqueness of a scalar subquery by enforcing that it is aggregated and that the resulting column is wrapped in an `AggregateExpression`.
## How was this patch tested?
Added tests to `SubquerySuite`.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#12822 from hvanhovell/SPARK-14785.
## What changes were proposed in this pull request?
In order to support nested predicate subquery, this PR introduce an internal join type ExistenceJoin, which will emit all the rows from left, plus an additional column, which presents there are any rows matched from right or not (it's not null-aware right now). This additional column could be used to replace the subquery in Filter.
In theory, all the predicate subquery could use this join type, but it's slower than LeftSemi and LeftAnti, so it's only used for nested subquery (subquery inside OR).
For example, the following SQL:
```sql
SELECT a FROM t WHERE EXISTS (select 0) OR EXISTS (select 1)
```
This PR also fix a bug in predicate subquery push down through join (they should not).
Nested null-aware subquery is still not supported. For example, `a > 3 OR b NOT IN (select bb from t)`
After this, we could run TPCDS query Q10, Q35, Q45
## How was this patch tested?
Added unit tests.
Author: Davies Liu <davies@databricks.com>
Closes#12820 from davies/or_exists.
## What changes were proposed in this pull request?
The existing implementation of pivot translates into a single aggregation with one aggregate per distinct pivot value. When the number of distinct pivot values is large (say 1000+) this can get extremely slow since each input value gets evaluated on every aggregate even though it only affects the value of one of them.
I'm proposing an alternate strategy for when there are 10+ (somewhat arbitrary threshold) distinct pivot values. We do two phases of aggregation. In the first we group by the grouping columns plus the pivot column and perform the specified aggregations (one or sometimes more). In the second aggregation we group by the grouping columns and use the new (non public) PivotFirst aggregate that rearranges the outputs of the first aggregation into an array indexed by the pivot value. Finally we do a project to extract the array entries into the appropriate output column.
## How was this patch tested?
Additional unit tests in DataFramePivotSuite and manual larger scale testing.
Author: Andrew Ray <ray.andrew@gmail.com>
Closes#11583 from aray/fast-pivot.
## What changes were proposed in this pull request?
Simplify and clean up some object expressions:
1. simplify the logic to handle `propagateNull`
2. add `propagateNull` parameter to `Invoke`
3. simplify the unbox logic in `Invoke`
4. other minor cleanup
TODO: simplify `MapObjects`
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#12399 from cloud-fan/object.
This PR contains three changes:
1. We will use spark.sql.warehouse.dir set warehouse location. We will not use hive.metastore.warehouse.dir.
2. SessionCatalog needs to set the location to default db. Otherwise, when creating a table in SparkSession without hive support, the default db's path will be an empty string.
3. When we create a database, we need to make the path qualified.
Existing tests and new tests
Author: Yin Huai <yhuai@databricks.com>
Closes#12812 from yhuai/warehouse.
## What changes were proposed in this pull request?
This PR adds `fromPrimitiveArray` and `toPrimitiveArray` in `UnsafeArrayData`, so that we can do the conversion much faster in VectorUDT/MatrixUDT.
## How was this patch tested?
existing tests and new test suite `UnsafeArraySuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#12640 from cloud-fan/ml.
## What changes were proposed in this pull request?
CatalystSqlParser can parse data types. So, we do not need to have an individual DataTypeParser.
## How was this patch tested?
Existing tests
Author: Yin Huai <yhuai@databricks.com>
Closes#12796 from yhuai/removeDataTypeParser.
## What changes were proposed in this pull request?
This patch fixes a null handling bug in EqualNullSafe's code generation.
## How was this patch tested?
Updated unit test so they would fail without the fix.
Closes#12628.
Author: Reynold Xin <rxin@databricks.com>
Author: Arash Nabili <arash@levyx.com>
Closes#12799 from rxin/equalnullsafe.
The previous subquery PRs did not include support for pushing subqueries used in filters (`WHERE`/`HAVING`) down. This PR adds this support. For example :
```scala
range(0, 10).registerTempTable("a")
range(5, 15).registerTempTable("b")
range(7, 25).registerTempTable("c")
range(3, 12).registerTempTable("d")
val plan = sql("select * from a join b on a.id = b.id left join c on c.id = b.id where a.id in (select id from d)")
plan.explain(true)
```
Leads to the following Analyzed & Optimized plans:
```
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
id: bigint, id: bigint, id: bigint
Project [id#0L,id#4L,id#8L]
+- Filter predicate-subquery#16 [(id#0L = id#12L)]
: +- SubqueryAlias predicate-subquery#16 [(id#0L = id#12L)]
: +- Project [id#12L]
: +- SubqueryAlias d
: +- Range 3, 12, 1, 8, [id#12L]
+- Join LeftOuter, Some((id#8L = id#4L))
:- Join Inner, Some((id#0L = id#4L))
: :- SubqueryAlias a
: : +- Range 0, 10, 1, 8, [id#0L]
: +- SubqueryAlias b
: +- Range 5, 15, 1, 8, [id#4L]
+- SubqueryAlias c
+- Range 7, 25, 1, 8, [id#8L]
== Optimized Logical Plan ==
Join LeftOuter, Some((id#8L = id#4L))
:- Join Inner, Some((id#0L = id#4L))
: :- Join LeftSemi, Some((id#0L = id#12L))
: : :- Range 0, 10, 1, 8, [id#0L]
: : +- Range 3, 12, 1, 8, [id#12L]
: +- Range 5, 15, 1, 8, [id#4L]
+- Range 7, 25, 1, 8, [id#8L]
== Physical Plan ==
...
```
I have also taken the opportunity to move quite a bit of code around:
- Rewriting subqueris and pulling out correlated predicated from subqueries has been moved into the analyzer. The analyzer transforms `Exists` and `InSubQuery` into `PredicateSubquery` expressions. A PredicateSubquery exposes the 'join' expressions and the proper references. This makes things like type coercion, optimization and planning easier to do.
- I have added support for `Aggregate` plans in subqueries. Any correlated expressions will be added to the grouping expressions. I have removed support for `Union` plans, since pulling in an outer reference from beneath a Union has no value (a filtered value could easily be part of another Union child).
- Resolution of subqueries is now done using `OuterReference`s. These are used to wrap any outer reference; this makes the identification of these references easier, and also makes dealing with duplicate attributes in the outer and inner plans easier. The resolution of subqueries initially used a resolution loop which would alternate between calling the analyzer and trying to resolve the outer references. We now use a dedicated analyzer which uses a special rule for outer reference resolution.
These changes are a stepping stone for enabling correlated scalar subqueries, enabling all Hive tests & allowing us to use predicate subqueries anywhere.
Current tests and added test cases in FilterPushdownSuite.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#12720 from hvanhovell/SPARK-14858.
## What changes were proposed in this pull request?
dapply() applies an R function on each partition of a DataFrame and returns a new DataFrame.
The function signature is:
dapply(df, function(localDF) {}, schema = NULL)
R function input: local data.frame from the partition on local node
R function output: local data.frame
Schema specifies the Row format of the resulting DataFrame. It must match the R function's output.
If schema is not specified, each partition of the result DataFrame will be serialized in R into a single byte array. Such resulting DataFrame can be processed by successive calls to dapply().
## How was this patch tested?
SparkR unit tests.
Author: Sun Rui <rui.sun@intel.com>
Author: Sun Rui <sunrui2016@gmail.com>
Closes#12493 from sun-rui/SPARK-12919.
## What changes were proposed in this pull request?
This patch removes executionHive from HiveSessionState and HiveSharedState.
## How was this patch tested?
Updated test cases.
Author: Reynold Xin <rxin@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#12770 from rxin/SPARK-14994.
#### What changes were proposed in this pull request?
Replaces a logical `Except` operator with a `Left-anti Join` operator. This way, we can take advantage of all the benefits of join implementations (e.g. managed memory, code generation, broadcast joins).
```SQL
SELECT a1, a2 FROM Tab1 EXCEPT SELECT b1, b2 FROM Tab2
==> SELECT DISTINCT a1, a2 FROM Tab1 LEFT ANTI JOIN Tab2 ON a1<=>b1 AND a2<=>b2
```
Note:
1. This rule is only applicable to EXCEPT DISTINCT. Do not use it for EXCEPT ALL.
2. This rule has to be done after de-duplicating the attributes; otherwise, the enerated
join conditions will be incorrect.
This PR also corrects the existing behavior in Spark. Before this PR, the behavior is like
```SQL
test("except") {
val df_left = Seq(1, 2, 2, 3, 3, 4).toDF("id")
val df_right = Seq(1, 3).toDF("id")
checkAnswer(
df_left.except(df_right),
Row(2) :: Row(2) :: Row(4) :: Nil
)
}
```
After this PR, the result is corrected. We strictly follow the SQL compliance of `Except Distinct`.
#### How was this patch tested?
Modified and added a few test cases to verify the optimization rule and the results of operators.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#12736 from gatorsmile/exceptByAntiJoin.
## What changes were proposed in this pull request?
This patch removes HiveNativeCommand, so we can continue to remove the dependency on Hive. This pull request also removes the ability to generate golden result file using Hive.
## How was this patch tested?
Updated tests to reflect this.
Author: Reynold Xin <rxin@databricks.com>
Closes#12769 from rxin/SPARK-14991.
## What changes were proposed in this pull request?
Fix to ScalaDoc for StructType.
## How was this patch tested?
Built locally.
Author: Gregory Hart <greg.hart@thinkbiganalytics.com>
Closes#12758 from freastro/hotfix/SPARK-14965.
## What changes were proposed in this pull request?
Currently we use `SQLUserDefinedType` annotation to register UDTs for user classes. However, by doing this, we add Spark dependency to user classes.
For some user classes, it is unnecessary to add such dependency that will increase deployment difficulty.
We should provide alternative approach to register UDTs for user classes without `SQLUserDefinedType` annotation.
## How was this patch tested?
`UserDefinedTypeSuite`
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#12259 from viirya/improve-sql-usertype.
## What changes were proposed in this pull request?
`interfaces.scala` was getting big. This just moves the biggest class in there to a new file for cleanliness.
## How was this patch tested?
Just moving things around.
Author: Andrew Or <andrew@databricks.com>
Closes#12721 from andrewor14/move-external-catalog.
Currently, we can only create persisted partitioned and/or bucketed data source tables using the Dataset API but not using SQL DDL. This PR implements the following syntax to add partitioning and bucketing support to the SQL DDL:
```
CREATE TABLE <table-name>
USING <provider> [OPTIONS (<key1> <value1>, <key2> <value2>, ...)]
[PARTITIONED BY (col1, col2, ...)]
[CLUSTERED BY (col1, col2, ...) [SORTED BY (col1, col2, ...)] INTO <n> BUCKETS]
AS SELECT ...
```
Test cases are added in `MetastoreDataSourcesSuite` to check the newly added syntax.
Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes#12734 from liancheng/spark-14954.
## What changes were proposed in this pull request?
This PR aims to implement decimal aggregation optimization for window queries by improving existing `DecimalAggregates`. Historically, `DecimalAggregates` optimizer is designed to transform general `sum/avg(decimal)`, but it breaks recently added windows queries like the followings. The following queries work well without the current `DecimalAggregates` optimizer.
**Sum**
```scala
scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").head
java.lang.RuntimeException: Unsupported window function: MakeDecimal((sum(UnscaledValue(a#31)),mode=Complete,isDistinct=false),12,1)
scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [sum(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#23]
: +- INPUT
+- Window [MakeDecimal((sum(UnscaledValue(a#21)),mode=Complete,isDistinct=false),12,1) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#23]
+- Exchange SinglePartition, None
+- Generate explode([1.0,2.0]), false, false, [a#21]
+- Scan OneRowRelation[]
```
**Average**
```scala
scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").head
java.lang.RuntimeException: Unsupported window function: cast(((avg(UnscaledValue(a#40)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5))
scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [avg(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#44]
: +- INPUT
+- Window [cast(((avg(UnscaledValue(a#42)),mode=Complete,isDistinct=false) / 10.0) as decimal(6,5)) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS avg(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#44]
+- Exchange SinglePartition, None
+- Generate explode([1.0,2.0]), false, false, [a#42]
+- Scan OneRowRelation[]
```
After this PR, those queries work fine and new optimized physical plans look like the followings.
**Sum**
```scala
scala> sql("select sum(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [sum(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#35]
: +- INPUT
+- Window [MakeDecimal((sum(UnscaledValue(a#33)),mode=Complete,isDistinct=false) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),12,1) AS sum(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#35]
+- Exchange SinglePartition, None
+- Generate explode([1.0,2.0]), false, false, [a#33]
+- Scan OneRowRelation[]
```
**Average**
```scala
scala> sql("select avg(a) over () from (select explode(array(1.0,2.0)) a) t").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [avg(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#47]
: +- INPUT
+- Window [cast(((avg(UnscaledValue(a#45)),mode=Complete,isDistinct=false) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) / 10.0) as decimal(6,5)) AS avg(a) OVER ( ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#47]
+- Exchange SinglePartition, None
+- Generate explode([1.0,2.0]), false, false, [a#45]
+- Scan OneRowRelation[]
```
In this PR, *SUM over window* pattern matching is based on the code of hvanhovell ; he should be credited for the work he did.
## How was this patch tested?
Pass the Jenkins tests (with newly added testcases)
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#12421 from dongjoon-hyun/SPARK-14664.
## What changes were proposed in this pull request?
This PR will make Spark SQL not allow ALTER TABLE ADD/REPLACE/CHANGE COLUMN, ALTER TABLE SET FILEFORMAT, DFS, and transaction related commands.
## How was this patch tested?
Existing tests. For those tests that I put in the blacklist, I am adding the useful parts back to SQLQuerySuite.
Author: Yin Huai <yhuai@databricks.com>
Closes#12714 from yhuai/banNativeCommand.
## What changes were proposed in this pull request?
#12625 exposed a new user-facing conf interface in `SparkSession`. This patch adds a catalog interface.
## How was this patch tested?
See `CatalogSuite`.
Author: Andrew Or <andrew@databricks.com>
Closes#12713 from andrewor14/user-facing-catalog.
## What changes were proposed in this pull request?
This PR adds Native execution of SHOW COLUMNS and SHOW PARTITION commands.
Command Syntax:
``` SQL
SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]
```
``` SQL
SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
```
## How was this patch tested?
Added test cases in HiveCommandSuite to verify execution and DDLCommandSuite
to verify plans.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#12222 from dilipbiswal/dkb_show_columns.