## What changes were proposed in this pull request?
This PR fixes a comparison of `ExprValue.isNull` with `String`. `ExprValue.isNull` should be compared with `LiteralTrue` or `LiteralFalse`.
This causes the following compilation error using scala-2.12 with sbt. In addition, this code may also generate incorrect code in Spark 2.3.
```
/home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:94: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely always compare unequal
[error] [warn] if (eval.isNull != "true") {
[error] [warn]
[error] [warn] /home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:126: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely never compare equal
[error] [warn] if (eval.isNull == "true") {
[error] [warn]
[error] [warn] /home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:133: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely never compare equal
[error] [warn] if (eval.isNull == "true") {
[error] [warn]
[error] [warn] /home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala:90: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely never compare equal
[error] [warn] if (inputs.map(_.isNull).forall(_ == "false")) {
[error] [warn]
```
## How was this patch tested?
Existing UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#22012 from kiszk/SPARK-25036a.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_intersect`. The behavior of the function is based on Presto's one.
This function returns returns an array of the elements in the intersection of array1 and array2.
Note: The order of elements in the result is not defined.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21102 from kiszk/SPARK-23913.
## What changes were proposed in this pull request?
Having the default value of isAll in the logical plan nodes INTERSECT/EXCEPT could introduce bugs when the callers are not aware of it. This PR removes the default value and makes caller explicitly specify them.
## How was this patch tested?
This is a refactoring change. Existing tests test the functionality already.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#22000 from dilipbiswal/SPARK-25025.
## What changes were proposed in this pull request?
Follow up to fix an unmerged review comment.
## How was this patch tested?
Unit test ResolveHintsSuite.
Author: John Zhuge <jzhuge@apache.org>
Closes#21998 from jzhuge/SPARK-24940.
## What changes were proposed in this pull request?
This pr adds `aggregate` function which applies a binary operator to an initial state and all elements in the array, and reduces this to a single state. The final state is converted into the final result by applying a finish function.
```sql
> SELECT aggregate(array(1, 2, 3), (acc, x) -> acc + x);
6
> SELECT aggregate(array(1, 2, 3), (acc, x) -> acc + x, acc -> acc * 10);
60
```
## How was this patch tested?
Added tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21982 from ueshin/issues/SPARK-23911/aggregate.
## What changes were proposed in this pull request?
simplify the codegen:
1. only do real codegen if the type can be specialized by the hash set
2. change the null handling. Before: track the nullElementIndex, and create a new ArrayData to insert the null in the middle. After: track the nullElementIndex, put a null placeholder in the ArrayBuilder, at the end create ArrayData from ArrayBuilder directly.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21966 from cloud-fan/minor2.
## What changes were proposed in this pull request?
This pr adds `filter` function which filters the input array using the given predicate.
```sql
> SELECT filter(array(1, 2, 3), x -> x % 2 == 1);
array(1, 3)
```
## How was this patch tested?
Added tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21965 from ueshin/issues/SPARK-23909/filter.
## What changes were proposed in this pull request?
Many Spark SQL users in my company have asked for a way to control the number of output files in Spark SQL. The users prefer not to use function repartition(n) or coalesce(n, shuffle) that require them to write and deploy Scala/Java/Python code. We propose adding the following Hive-style Coalesce and Repartition Hint to Spark SQL:
```
... SELECT /*+ COALESCE(numPartitions) */ ...
... SELECT /*+ REPARTITION(numPartitions) */ ...
```
Multiple such hints are allowed. Multiple nodes are inserted into the logical plan, and the optimizer will pick the leftmost hint.
```
INSERT INTO s SELECT /*+ REPARTITION(100), COALESCE(500), COALESCE(10) */ * FROM t
== Logical Plan ==
'InsertIntoTable 'UnresolvedRelation `s`, false, false
+- 'UnresolvedHint REPARTITION, [100]
+- 'UnresolvedHint COALESCE, [500]
+- 'UnresolvedHint COALESCE, [10]
+- 'Project [*]
+- 'UnresolvedRelation `t`
== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand ...
+- Repartition 100, true
+- HiveTableRelation ...
```
## How was this patch tested?
All unit tests. Manual tests using explain.
Author: John Zhuge <jzhuge@apache.org>
Closes#21911 from jzhuge/SPARK-24940.
## What changes were proposed in this pull request?
Enable support for MINUS ALL which was gated at AstBuilder.
## How was this patch tested?
Added tests in SQLQueryTestSuite and modify PlanParserSuite.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21963 from dilipbiswal/minus-all.
## What changes were proposed in this pull request?
Currently the set operations INTERSECT, UNION and EXCEPT are assigned the same precedence. This PR fixes the problem by giving INTERSECT higher precedence than UNION and EXCEPT. UNION and EXCEPT operators are evaluated in the order in which they appear in the query from left to right.
This results in change in behavior because of the change in order of evaluations of set operators in a query. The old behavior is still preserved under a newly added config parameter.
Query `:`
```
SELECT * FROM t1
UNION
SELECT * FROM t2
EXCEPT
SELECT * FROM t3
INTERSECT
SELECT * FROM t4
```
Parsed plan before the change `:`
```
== Parsed Logical Plan ==
'Intersect false
:- 'Except false
: :- 'Distinct
: : +- 'Union
: : :- 'Project [*]
: : : +- 'UnresolvedRelation `t1`
: : +- 'Project [*]
: : +- 'UnresolvedRelation `t2`
: +- 'Project [*]
: +- 'UnresolvedRelation `t3`
+- 'Project [*]
+- 'UnresolvedRelation `t4`
```
Parsed plan after the change `:`
```
== Parsed Logical Plan ==
'Except false
:- 'Distinct
: +- 'Union
: :- 'Project [*]
: : +- 'UnresolvedRelation `t1`
: +- 'Project [*]
: +- 'UnresolvedRelation `t2`
+- 'Intersect false
:- 'Project [*]
: +- 'UnresolvedRelation `t3`
+- 'Project [*]
+- 'UnresolvedRelation `t4`
```
## How was this patch tested?
Added tests in PlanParserSuite, SQLQueryTestSuite.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21941 from dilipbiswal/SPARK-24966.
## What changes were proposed in this pull request?
Support reading/writing Avro logical timestamp type with different precisions
https://avro.apache.org/docs/1.8.2/spec.html#Timestamp+%28millisecond+precision%29
To specify the output timestamp type, use Dataframe option `outputTimestampType` or SQL config `spark.sql.avro.outputTimestampType`. The supported values are
* `TIMESTAMP_MICROS`
* `TIMESTAMP_MILLIS`
The default output type is `TIMESTAMP_MICROS`
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21935 from gengliangwang/avro_timestamp.
## What changes were proposed in this pull request?
This PR refactors code to get a value for "spark.sql.codegen.comments" by avoiding `SparkEnv.get.conf`. This PR uses `SQLConf.get.codegenComments` since `SQLConf.get` always returns an instance of `SQLConf`.
## How was this patch tested?
Added test case to `DebuggingSuite`
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#19449 from kiszk/SPARK-22219.
## What changes were proposed in this pull request?
This pr adds `transform` function which transforms elements in an array using the function.
Optionally we can take the index of each element as the second argument.
```sql
> SELECT transform(array(1, 2, 3), x -> x + 1);
array(2, 3, 4)
> SELECT transform(array(1, 2, 3), (x, i) -> x + i);
array(1, 3, 5)
```
## How was this patch tested?
Added tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21954 from ueshin/issues/SPARK-23908/transform.
## What changes were proposed in this pull request?
This pull request provides a fix for SPARK-24742: SQL Field MetaData was throwing an Exception in the hashCode method when a "null" Metadata was added via "putNull"
## How was this patch tested?
A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala
Author: Kaya Kupferschmidt <k.kupferschmidt@dimajix.de>
Closes#21722 from kupferk/SPARK-24742.
## What changes were proposed in this pull request?
Remove the AnalysisBarrier LogicalPlan node, which is useless now.
## How was this patch tested?
N/A
Author: Xiao Li <gatorsmile@gmail.com>
Closes#21962 from gatorsmile/refactor2.
## What changes were proposed in this pull request?
This PR is to refactor the code in AVERAGE by dsl.
## How was this patch tested?
N/A
Author: Xiao Li <gatorsmile@gmail.com>
Closes#21951 from gatorsmile/refactor1.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_except`. The behavior of the function is based on Presto's one.
This function returns returns an array of the elements in array1 but not in array2.
Note: The order of elements in the result is not defined.
## How was this patch tested?
Added UTs.
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21103 from kiszk/SPARK-23915.
## What changes were proposed in this pull request?
When user calls anUDAF with the wrong number of arguments, Spark previously throws an AssertionError, which is not supposed to be a user-facing exception. This patch updates it to throw AnalysisException instead, so it is consistent with a regular UDF.
## How was this patch tested?
Updated test case udaf.sql.
Author: Reynold Xin <rxin@databricks.com>
Closes#21938 from rxin/SPARK-24982.
## What changes were proposed in this pull request?
Previously TVF resolution could throw IllegalArgumentException if the data type is null type. This patch replaces that exception with AnalysisException, enriched with positional information, to improve error message reporting and to be more consistent with rest of Spark SQL.
## How was this patch tested?
Updated the test case in table-valued-functions.sql.out, which is how I identified this problem in the first place.
Author: Reynold Xin <rxin@databricks.com>
Closes#21934 from rxin/SPARK-24951.
## What changes were proposed in this pull request?
Similar to SPARK-24890, if all the outputs of `CaseWhen` are semantic equivalence, `CaseWhen` can be removed.
## How was this patch tested?
Tests added.
Author: DB Tsai <d_tsai@apple.com>
Closes#21852 from dbtsai/short-circuit-when.
## What changes were proposed in this pull request?
It proposes a version in which nullable expressions are not valid in the limit clause
## How was this patch tested?
It was tested with unit and e2e tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Mauro Palsgraaf <mauropalsgraaf@hotmail.com>
Closes#21807 from mauropalsgraaf/SPARK-24536.
## What changes were proposed in this pull request?
When the pivot column is of a complex type, the eval() result will be an UnsafeRow, while the keys of the HashMap for column value matching is a GenericInternalRow. As a result, there will be no match and the result will always be empty.
So for a pivot column of complex-types, we should:
1) If the complex-type is not comparable (orderable), throw an Exception. It cannot be a pivot column.
2) Otherwise, if it goes through the `PivotFirst` code path, `PivotFirst` should use a TreeMap instead of HashMap for such columns.
This PR has also reverted the walk-around in Analyzer that had been introduced to avoid this `PivotFirst` issue.
## How was this patch tested?
Added UT.
Author: maryannxue <maryannxue@apache.org>
Closes#21926 from maryannxue/pivot_followup.
## What changes were proposed in this pull request?
In the PR, I propose to support `LZMA2` (`XZ`) and `BZIP2` compressions by `AVRO` datasource in write since the codecs may have better characteristics like compression ratio and speed comparing to already supported `snappy` and `deflate` codecs.
## How was this patch tested?
It was tested manually and by an existing test which was extended to check the `xz` and `bzip2` compressions.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21902 from MaxGekk/avro-xz-bzip2.
## What changes were proposed in this pull request?
I didn't want to pollute the diff in the previous PR and left some TODOs. This is a follow-up to address those TODOs.
## How was this patch tested?
Should be covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes#21896 from rxin/SPARK-24865-addendum.
## What changes were proposed in this pull request?
This pr supported Date/Timestamp in a JDBC partition column (a numeric column is only supported in the master). This pr also modified code to verify a partition column type;
```
val jdbcTable = spark.read
.option("partitionColumn", "text")
.option("lowerBound", "aaa")
.option("upperBound", "zzz")
.option("numPartitions", 2)
.jdbc("jdbc:postgresql:postgres", "t", options)
// with this pr
org.apache.spark.sql.AnalysisException: Partition column type should be numeric, date, or timestamp, but string found.;
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.verifyAndGetNormalizedPartitionColumn(JDBCRelation.scala:165)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.columnPartition(JDBCRelation.scala:85)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317)
// without this pr
java.lang.NumberFormatException: For input string: "aaa"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
```
Closes#19999
## How was this patch tested?
Added tests in `JDBCSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21834 from maropu/SPARK-22814.
## What changes were proposed in this pull request?
When we do an average, the result is computed dividing the sum of the values by their count. In the case the result is a DecimalType, the way we are casting/managing the precision and scale is not really optimized and it is not coherent with what we do normally.
In particular, a problem can happen when the `Divide` operand returns a result which contains a precision and scale different by the ones which are expected as output of the `Divide` operand. In the case reported in the JIRA, for instance, the result of the `Divide` operand is a `Decimal(38, 36)`, while the output data type for `Divide` is 38, 22. This is not an issue when the `Divide` is followed by a `CheckOverflow` or a `Cast` to the right data type, as these operations return a decimal with the defined precision and scale. Despite in the `Average` operator we do have a `Cast`, this may be bypassed if the result of `Divide` is the same type which it is casted to, hence the issue reported in the JIRA may arise.
The PR proposes to use the normal rules/handling of the arithmetic operators with Decimal data type, so we both reuse the existing code (having a single logic for operations between decimals) and we fix this problem as the result is always guarded by `CheckOverflow`.
## How was this patch tested?
added UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21910 from mgaido91/SPARK-24957.
## What changes were proposed in this pull request?
Implements INTERSECT ALL clause through query rewrites using existing operators in Spark. Please refer to [Link](https://drive.google.com/open?id=1nyW0T0b_ajUduQoPgZLAsyHK8s3_dko3ulQuxaLpUXE) for the design.
Input Query
``` SQL
SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
```
Rewritten Query
```SQL
SELECT c1
FROM (
SELECT replicate_row(min_count, c1)
FROM (
SELECT c1,
IF (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count
FROM (
SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt
FROM (
SELECT c1, true as vcol1, null as vcol2 FROM ut1
UNION ALL
SELECT c1, null as vcol1, true as vcol2 FROM ut2
) AS union_all
GROUP BY c1
HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
)
)
)
```
## How was this patch tested?
Added test cases in SQLQueryTestSuite, DataFrameSuite, SetOperationSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21886 from dilipbiswal/dkb_intersect_all_final.
## What changes were proposed in this pull request?
Implements EXCEPT ALL clause through query rewrites using existing operators in Spark. In this PR, an internal UDTF (replicate_rows) is added to aid in preserving duplicate rows. Please refer to [Link](https://drive.google.com/open?id=1nyW0T0b_ajUduQoPgZLAsyHK8s3_dko3ulQuxaLpUXE) for the design.
**Note** This proposed UDTF is kept as a internal function that is purely used to aid with this particular rewrite to give us flexibility to change to a more generalized UDTF in future.
Input Query
``` SQL
SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2
```
Rewritten Query
```SQL
SELECT c1
FROM (
SELECT replicate_rows(sum_val, c1)
FROM (
SELECT c1, sum_val
FROM (
SELECT c1, sum(vcol) AS sum_val
FROM (
SELECT 1L as vcol, c1 FROM ut1
UNION ALL
SELECT -1L as vcol, c1 FROM ut2
) AS union_all
GROUP BY union_all.c1
)
WHERE sum_val > 0
)
)
```
## How was this patch tested?
Added test cases in SQLQueryTestSuite, DataFrameSuite and SetOperationSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21857 from dilipbiswal/dkb_except_all_final.
## What changes were proposed in this pull request?
In the PR, I added new option for Avro datasource - `compression`. The option allows to specify compression codec for saved Avro files. This option is similar to `compression` option in another datasources like `JSON` and `CSV`.
Also I added the SQL configs `spark.sql.avro.compression.codec` and `spark.sql.avro.deflate.level`. I put the configs into `SQLConf`. If the `compression` option is not specified by an user, the first SQL config is taken into account.
## How was this patch tested?
I added new test which read meta info from written avro files and checks `avro.codec` property.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21837 from MaxGekk/avro-compression.
## What changes were proposed in this pull request?
This PR adds a new collection function: shuffle. It generates a random permutation of the given array. This implementation uses the "inside-out" version of Fisher-Yates algorithm.
## How was this patch tested?
New tests are added to CollectionExpressionsSuite.scala and DataFrameFunctionsSuite.scala.
Author: Takuya UESHIN <ueshin@databricks.com>
Author: pkuwm <ihuizhi.lu@gmail.com>
Closes#21802 from ueshin/issues/SPARK-23928/shuffle.
## What changes were proposed in this pull request?
AnalysisBarrier was introduced in SPARK-20392 to improve analysis speed (don't re-analyze nodes that have already been analyzed).
Before AnalysisBarrier, we already had some infrastructure in place, with analysis specific functions (resolveOperators and resolveExpressions). These functions do not recursively traverse down subplans that are already analyzed (with a mutable boolean flag _analyzed). The issue with the old system was that developers started using transformDown, which does a top-down traversal of the plan tree, because there was not top-down resolution function, and as a result analyzer performance became pretty bad.
In order to fix the issue in SPARK-20392, AnalysisBarrier was introduced as a special node and for this special node, transform/transformUp/transformDown don't traverse down. However, the introduction of this special node caused a lot more troubles than it solves. This implicit node breaks assumptions and code in a few places, and it's hard to know when analysis barrier would exist, and when it wouldn't. Just a simple search of AnalysisBarrier in PR discussions demonstrates it is a source of bugs and additional complexity.
Instead, this pull request removes AnalysisBarrier and reverts back to the old approach. We added infrastructure in tests that fail explicitly if transform methods are used in the analyzer.
## How was this patch tested?
Added a test suite AnalysisHelperSuite for testing the resolve* methods and transform* methods.
Author: Reynold Xin <rxin@databricks.com>
Author: Xiao Li <gatorsmile@gmail.com>
Closes#21822 from rxin/SPARK-24865.
## What changes were proposed in this pull request?
This is an extension to the original PR, in which rule exclusion did not work for classes derived from Optimizer, e.g., SparkOptimizer.
To solve this issue, Optimizer and its derived classes will define/override `defaultBatches` and `nonExcludableRules` in order to define its default rule set as well as rules that cannot be excluded by the SQL config. In the meantime, Optimizer's `batches` method is dedicated to the rule exclusion logic and is defined "final".
## How was this patch tested?
Added UT.
Author: maryannxue <maryannxue@apache.org>
Closes#21876 from maryannxue/rule-exclusion.
## What changes were proposed in this pull request?
If we use `reverse` function for array type of primitive type containing `null` and the child array is `UnsafeArrayData`, the function returns a wrong result because `UnsafeArrayData` doesn't define the behavior of re-assignment, especially we can't set a valid value after we set `null`.
## How was this patch tested?
Added some tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21830 from ueshin/issues/SPARK-24878/fix_reverse.
## What changes were proposed in this pull request?
Besides spark setting spark.sql.sources.partitionOverwriteMode also allow setting partitionOverWriteMode per write
## How was this patch tested?
Added unit test in InsertSuite
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Koert Kuipers <koert@tresata.com>
Closes#21818 from koertkuipers/feat-partition-overwrite-mode-per-write.
## What changes were proposed in this pull request?
In the PR, I propose to extend the `StructType`/`StructField` classes by new method `toDDL` which converts a value of the `StructType`/`StructField` type to a string formatted in DDL style. The resulted string can be used in a table creation.
The `toDDL` method of `StructField` is reused in `SHOW CREATE TABLE`. In this way the PR fixes the bug of unquoted names of nested fields.
## How was this patch tested?
I add a test for checking the new method and 2 round trip tests: `fromDDL` -> `toDDL` and `toDDL` -> `fromDDL`
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21803 from MaxGekk/to-ddl.
## What changes were proposed in this pull request?
Improvement `IN` predicate type mismatched message:
```sql
Mismatched columns:
[(, t, 4, ., `, t, 4, a, `, :, d, o, u, b, l, e, ,, , t, 5, ., `, t, 5, a, `, :, d, e, c, i, m, a, l, (, 1, 8, ,, 0, ), ), (, t, 4, ., `, t, 4, c, `, :, s, t, r, i, n, g, ,, , t, 5, ., `, t, 5, c, `, :, b, i, g, i, n, t, )]
```
After this patch:
```sql
Mismatched columns:
[(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, t5.`t5c`:bigint)]
```
## How was this patch tested?
unit tests
Author: Yuming Wang <yumwang@ebay.com>
Closes#21863 from wangyum/SPARK-18874.
## What changes were proposed in this pull request?
Thanks to henryr for the original idea at https://github.com/apache/spark/pull/21049
Description from the original PR :
Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering
them is therefore redundant (unless combined with a limit).
This patch removes the top sort operators from the subquery plans.
This closes https://github.com/apache/spark/pull/21049.
## How was this patch tested?
Added test cases in SubquerySuite to cover in, exists and scalar subqueries.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21853 from dilipbiswal/SPARK-23957.
## What changes were proposed in this pull request?
When `trueValue` and `falseValue` are semantic equivalence, the condition expression in `if` can be removed to avoid extra computation in runtime.
## How was this patch tested?
Test added.
Author: DB Tsai <d_tsai@apple.com>
Closes#21848 from dbtsai/short-circuit-if.
## What changes were proposed in this pull request?
The HandleNullInputsForUDF would always add a new `If` node every time it is applied. That would cause a difference between the same plan being analyzed once and being analyzed twice (or more), thus raising issues like plan not matched in the cache manager. The solution is to mark the arguments as null-checked, which is to add a "KnownNotNull" node above those arguments, when adding the UDF under an `If` node, because clearly the UDF will not be called when any of those arguments is null.
## How was this patch tested?
Add new tests under sql/UDFSuite and AnalysisSuite.
Author: maryannxue <maryannxue@apache.org>
Closes#21851 from maryannxue/spark-24891.
## What changes were proposed in this pull request?
Last Access Time will always displayed wrong date Thu Jan 01 05:30:00 IST 1970 when user run DESC FORMATTED table command
In hive its displayed as "UNKNOWN" which makes more sense than displaying wrong date. seems to be a limitation as of now even from hive, better we can follow the hive behavior unless the limitation has been resolved from hive.
spark client output
![spark_desc table](https://user-images.githubusercontent.com/12999161/42753448-ddeea66a-88a5-11e8-94aa-ef8d017f94c5.png)
Hive client output
![hive_behaviour](https://user-images.githubusercontent.com/12999161/42753489-f4fd366e-88a5-11e8-83b0-0f3a53ce83dd.png)
## How was this patch tested?
UT has been added which makes sure that the wrong date "Thu Jan 01 05:30:00 IST 1970 "
shall not be added as value for the Last Access property
Author: s71955 <sujithchacko.2010@gmail.com>
Closes#21775 from sujith71955/master_hive.
## What changes were proposed in this pull request?
Modified the canonicalized to not case-insensitive.
Before the PR, cache can't work normally if there are case letters in SQL,
for example:
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("select key, sum(case when Key > 0 then 1 else 0 end) as positiveNum " +
"from src group by key").cache().createOrReplaceTempView("src_cache")
sql(
s"""select a.key
from
(select key from src_cache where positiveNum = 1)a
left join
(select key from src_cache )b
on a.key=b.key
""").explain
The physical plan of the sql is:
![image](https://user-images.githubusercontent.com/26834091/42979518-3decf0fa-8c05-11e8-9837-d5e4c334cb1f.png)
The subquery "select key from src_cache where positiveNum = 1" on the left of join can use the cache data, but the subquery "select key from src_cache" on the right of join cannot use the cache data.
## How was this patch tested?
new added test
Author: 10129659 <chen.yanshan@zte.com.cn>
Closes#21823 from eatoncys/canonicalized.
## What changes were proposed in this pull request?
Modify the strategy in ColumnPruning to add a Project between ScriptTransformation and its child, this strategy can reduce the scan time especially in the scenario of the table has many columns.
## How was this patch tested?
Add UT in ColumnPruningSuite and ScriptTransformationSuite.
Author: Yuanjian Li <xyliyuanjian@gmail.com>
Closes#21839 from xuanyuanking/SPARK-24339.
## What changes were proposed in this pull request?
Since Spark has provided fairly clear interfaces for adding user-defined optimization rules, it would be nice to have an easy-to-use interface for excluding an optimization rule from the Spark query optimizer as well.
This would make customizing Spark optimizer easier and sometimes could debugging issues too.
- Add a new config spark.sql.optimizer.excludedRules, with the value being a list of rule names separated by comma.
- Modify the current batches method to remove the excluded rules from the default batches. Log the rules that have been excluded.
- Split the existing default batches into "post-analysis batches" and "optimization batches" so that only rules in the "optimization batches" can be excluded.
## How was this patch tested?
Add a new test suite: OptimizerRuleExclusionSuite
Author: maryannxue <maryannxue@apache.org>
Closes#21764 from maryannxue/rule-exclusion.
## What changes were proposed in this pull request?
Currently, the Analyzer throws an exception if your try to nest a generator. However, it special cases generators "nested" in an alias, and allows that. If you try to alias a generator twice, it is not caught by the special case, so an exception is thrown.
This PR trims the unnecessary, non-top-level aliases, so that the generator is allowed.
## How was this patch tested?
new tests in AnalysisSuite.
Author: Brandon Krieger <bkrieger@palantir.com>
Closes#21508 from bkrieger/bk/SPARK-24488.
## What changes were proposed in this pull request?
Refactor `Concat` and `MapConcat` to:
- avoid creating concatenator object for each row.
- make `Concat` handle `containsNull` properly.
- make `Concat` shortcut if `null` child is found.
## How was this patch tested?
Added some tests and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21824 from ueshin/issues/SPARK-24871/refactor_concat_mapconcat.
## What changes were proposed in this pull request?
Enhances the parser and analyzer to support ANSI compliant syntax for GROUPING SET. As part of this change we derive the grouping expressions from user supplied groupings in the grouping sets clause.
```SQL
SELECT c1, c2, max(c3)
FROM t1
GROUP BY GROUPING SETS ((c1), (c1, c2))
```
## How was this patch tested?
Added tests in SQLQueryTestSuite and ResolveGroupingAnalyticsSuite.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21813 from dilipbiswal/spark-24424.
## What changes were proposed in this pull request?
As stated in https://github.com/apache/spark/pull/21321, in the error messages we should use `catalogString`. This is not the case, as SPARK-22893 used `simpleString` in order to have the same representation everywhere and it missed some places.
The PR unifies the messages using alway the `catalogString` representation of the dataTypes in the messages.
## How was this patch tested?
existing/modified UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21804 from mgaido91/SPARK-24268_catalog.
## What changes were proposed in this pull request?
Made ExprId hashCode independent of jvmId to make canonicalization independent of JVM, by overriding hashCode (and necessarily also equality) to depend on id only
## How was this patch tested?
Created a unit test ExprIdSuite
Ran all unit tests of sql/catalyst
Author: Ger van Rossum <gvr@users.noreply.github.com>
Closes#21806 from gvr/spark24846-canonicalization.
## What changes were proposed in this pull request?
Currently, the group state of user-defined-type is encoded as top-level columns in the UnsafeRows stores in the state store. The timeout timestamp is also saved as (when needed) as the last top-level column. Since the group state is serialized to top-level columns, you cannot save "null" as a value of state (setting null in all the top-level columns is not equivalent). So we don't let the user set the timeout without initializing the state for a key. Based on user experience, this leads to confusion.
This PR is to change the row format such that the state is saved as nested columns. This would allow the state to be set to null, and avoid these confusing corner cases. However, queries recovering from existing checkpoint will use the previous format to maintain compatibility with existing production queries.
## How was this patch tested?
Refactored existing end-to-end tests and added new tests for explicitly testing obj-to-row conversion for both state formats.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21739 from tdas/SPARK-22187-1.
## What changes were proposed in this pull request?
This patch proposes breaking down configuration of retaining batch size on state into two pieces: files and in memory (cache). While this patch reuses existing configuration for files, it introduces new configuration, "spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of batch to retain in memory.
## How was this patch tested?
Apply this patch on top of SPARK-24441 (https://github.com/apache/spark/pull/21469), and manually tested in various workloads to ensure overall size of states in memory is around 2x or less of the size of latest version of state, while it was 10x ~ 80x before applying the patch.
Author: Jungtaek Lim <kabhwan@gmail.com>
Closes#21700 from HeartSaVioR/SPARK-24717.
## What changes were proposed in this pull request?
Fix regexes in spark-sql command examples.
This takes over https://github.com/apache/spark/pull/18477
## How was this patch tested?
Existing tests. I verified the existing example doesn't work in spark-sql, but new ones does.
Author: Sean Owen <srowen@gmail.com>
Closes#21808 from srowen/SPARK-21261.
## What changes were proposed in this pull request?
1. Extend the Parser to enable parsing a column list as the pivot column.
2. Extend the Parser and the Pivot node to enable parsing complex expressions with aliases as the pivot value.
3. Add type check and constant check in Analyzer for Pivot node.
## How was this patch tested?
Add tests in pivot.sql
Author: maryannxue <maryannxue@apache.org>
Closes#21720 from maryannxue/spark-24164.
## What changes were proposed in this pull request?
Two new rules in the logical plan optimizers are added.
1. When there is only one element in the **`Collection`**, the
physical plan will be optimized to **`EqualTo`**, so predicate
pushdown can be used.
```scala
profileDF.filter( $"profileID".isInCollection(Set(6))).explain(true)
"""
|== Physical Plan ==
|*(1) Project [profileID#0]
|+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6))
| +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet,
| PartitionFilters: [],
| PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)],
| ReadSchema: struct<profileID:int>
""".stripMargin
```
2. When the **`Collection`** is empty, and the input is nullable, the
logical plan will be simplified to
```scala
profileDF.filter( $"profileID".isInCollection(Set())).explain(true)
"""
|== Optimized Logical Plan ==
|Filter if (isnull(profileID#0)) null else false
|+- Relation[profileID#0] parquet
""".stripMargin
```
TODO:
1. For multiple conditions with numbers less than certain thresholds,
we should still allow predicate pushdown.
2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`**
when the numbers of the categories are low, and they are **`Int`**,
**`Long`**.
3. The default immutable hash trees set is slow for query, and we
should do benchmark for using different set implementation for faster
query.
4. **`filter(if (condition) null else false)`** can be optimized to false.
## How was this patch tested?
Couple new tests are added.
Author: DB Tsai <d_tsai@apple.com>
Closes#21797 from dbtsai/optimize-in.
## What changes were proposed in this pull request?
Remove the non-negative checks of window start time to make window support negative start time, and add a check to guarantee the absolute value of start time is less than slide duration.
## How was this patch tested?
New unit tests.
Author: HanShuliang <kevinzwx1992@gmail.com>
Closes#18903 from KevinZwx/dev.
## What changes were proposed in this pull request?
The PR tries to avoid serialization of private fields of already added collection functions and follows up on comments in [SPARK-23922](https://github.com/apache/spark/pull/21028) and [SPARK-23935](https://github.com/apache/spark/pull/21236)
## How was this patch tested?
Run tests from:
- CollectionExpressionSuite.scala
- DataFrameFunctionsSuite.scala
Author: Marek Novotny <mn.mikke@gmail.com>
Closes#21352 from mn-mikke/SPARK-24305.
## What changes were proposed in this pull request?
Two new rules in the logical plan optimizers are added.
1. When there is only one element in the **`Collection`**, the
physical plan will be optimized to **`EqualTo`**, so predicate
pushdown can be used.
```scala
profileDF.filter( $"profileID".isInCollection(Set(6))).explain(true)
"""
|== Physical Plan ==
|*(1) Project [profileID#0]
|+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6))
| +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet,
| PartitionFilters: [],
| PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)],
| ReadSchema: struct<profileID:int>
""".stripMargin
```
2. When the **`Collection`** is empty, and the input is nullable, the
logical plan will be simplified to
```scala
profileDF.filter( $"profileID".isInCollection(Set())).explain(true)
"""
|== Optimized Logical Plan ==
|Filter if (isnull(profileID#0)) null else false
|+- Relation[profileID#0] parquet
""".stripMargin
```
TODO:
1. For multiple conditions with numbers less than certain thresholds,
we should still allow predicate pushdown.
2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`**
when the numbers of the categories are low, and they are **`Int`**,
**`Long`**.
3. The default immutable hash trees set is slow for query, and we
should do benchmark for using different set implementation for faster
query.
4. **`filter(if (condition) null else false)`** can be optimized to false.
## How was this patch tested?
Couple new tests are added.
Author: DB Tsai <d_tsai@apple.com>
Closes#21442 from dbtsai/optimize-in.
## What changes were proposed in this pull request?
We have some functions which need to aware the nullabilities of all children, such as `CreateArray`, `CreateMap`, `Concat`, and so on. Currently we add casts to fix the nullabilities, but the casts might be removed during the optimization phase.
After the discussion, we decided to not add extra casts for just fixing the nullabilities of the nested types, but handle them by functions themselves.
## How was this patch tested?
Modified and added some tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21704 from ueshin/issues/SPARK-24734/concat_containsnull.
## What changes were proposed in this pull request?
Support Decimal type push down to the parquet data sources.
The Decimal comparator used is: [`BINARY_AS_SIGNED_INTEGER_COMPARATOR`](c6764c4a08/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java (L224-L292)).
## How was this patch tested?
unit tests and manual tests.
**manual tests**:
```scala
spark.range(10000000).selectExpr("id", "cast(id as decimal(9)) as d1", "cast(id as decimal(9, 2)) as d2", "cast(id as decimal(18)) as d3", "cast(id as decimal(18, 4)) as d4", "cast(id as decimal(38)) as d5", "cast(id as decimal(38, 18)) as d6").coalesce(1).write.option("parquet.block.size", 1048576).parquet("/tmp/spark/parquet/decimal")
val df = spark.read.parquet("/tmp/spark/parquet/decimal/")
spark.sql("set spark.sql.parquet.filterPushdown.decimal=true")
// Only read about 1 MB data
df.filter("d2 = 10000").show
// Only read about 1 MB data
df.filter("d4 = 10000").show
spark.sql("set spark.sql.parquet.filterPushdown.decimal=false")
// Read 174.3 MB data
df.filter("d2 = 10000").show
// Read 174.3 MB data
df.filter("d4 = 10000").show
```
Author: Yuming Wang <yumwang@ebay.com>
Closes#21556 from wangyum/SPARK-24549.
## What changes were proposed in this pull request?
`Timestamp` support pushdown to parquet data source.
Only `TIMESTAMP_MICROS` and `TIMESTAMP_MILLIS` support push down.
## How was this patch tested?
unit tests and benchmark tests
Author: Yuming Wang <yumwang@ebay.com>
Closes#21741 from wangyum/SPARK-24718.
## What changes were proposed in this pull request?
The original pr is: https://github.com/apache/spark/pull/18424
Add a new optimizer rule to convert an IN predicate to an equivalent Parquet filter and add `spark.sql.parquet.pushdown.inFilterThreshold` to control limit thresholds. Different data types have different limit thresholds, this is a copy of data for reference:
Type | limit threshold
-- | --
string | 370
int | 210
long | 285
double | 270
float | 220
decimal | Won't provide better performance before [SPARK-24549](https://issues.apache.org/jira/browse/SPARK-24549)
## How was this patch tested?
unit tests and manual tests
Author: Yuming Wang <yumwang@ebay.com>
Closes#21603 from wangyum/SPARK-17091.
## What changes were proposed in this pull request?
When we use a reference from Dataset in filter or sort, which was not used in the prior select, an AnalysisException occurs, e.g.,
```scala
val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
df.select(df("name")).filter(df("id") === 0).show()
```
```scala
org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#6 missing from name#5 in operator !Filter (id#6 = 0).;;
!Filter (id#6 = 0)
+- AnalysisBarrier
+- Project [name#5]
+- Project [_1#2 AS name#5, _2#3 AS id#6]
+- LocalRelation [_1#2, _2#3]
```
This change updates the rule `ResolveMissingReferences` so `Filter` and `Sort` with non-empty `missingInputs` will also be transformed.
## How was this patch tested?
Added tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#21745 from viirya/SPARK-24781.
## What changes were proposed in this pull request?
This PR will cache the function name from external catalog, it is used by lookupFunctions in the analyzer, and it is cached for each query plan. The original problem is reported in the [ spark-19737](https://issues.apache.org/jira/browse/SPARK-19737)
## How was this patch tested?
create new test file LookupFunctionsSuite and add test case in SessionCatalogSuite
Author: Kevin Yu <qyu@us.ibm.com>
Closes#20795 from kevinyu98/spark-23486.
## What changes were proposed in this pull request?
Relax the check to allow complex aggregate expressions, like `ceil(sum(col1))` or `sum(col1) + 1`, which roughly means any aggregate expression that could appear in an Aggregate plan except pandas UDF (due to the fact that it is not supported in pivot yet).
## How was this patch tested?
Added 2 tests in pivot.sql
Author: maryannxue <maryannxue@apache.org>
Closes#21753 from maryannxue/pivot-relax-syntax.
## What changes were proposed in this pull request?
The PR adds the SQL function `array_union`. The behavior of the function is based on Presto's one.
This function returns returns an array of the elements in the union of array1 and array2.
Note: The order of elements in the result is not defined.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21061 from kiszk/SPARK-23914.
## What changes were proposed in this pull request?
This PR enables a Java bytecode check tool [spotbugs](https://spotbugs.github.io/) to avoid possible integer overflow at multiplication. When an violation is detected, the build process is stopped.
Due to the tool limitation, some other checks will be enabled. In this PR, [these patterns](http://spotbugs-in-kengo-toda.readthedocs.io/en/lqc-list-detectors/detectors.html#findpuzzlers) in `FindPuzzlers` can be detected.
This check is enabled at `compile` phase. Thus, `mvn compile` or `mvn package` launches this check.
## How was this patch tested?
Existing UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21542 from kiszk/SPARK-24529.
## What changes were proposed in this pull request?
In the PR, I propose to extend `RuntimeConfig` by new method `isModifiable()` which returns `true` if a config parameter can be modified at runtime (for current session state). For static SQL and core parameters, the method returns `false`.
## How was this patch tested?
Added new test to `RuntimeConfigSuite` for checking Spark core and SQL parameters.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21730 from MaxGekk/is-modifiable.
## What changes were proposed in this pull request?
The PR simplifies the retrieval of config in `size`, as we can access them from tasks too thanks to SPARK-24250.
## How was this patch tested?
existing UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21736 from mgaido91/SPARK-24605_followup.
## What changes were proposed in this pull request?
A self-join on a dataset which contains a `FlatMapGroupsInPandas` fails because of duplicate attributes. This happens because we are not dealing with this specific case in our `dedupAttr` rules.
The PR fix the issue by adding the management of the specific case
## How was this patch tested?
added UT + manual tests
Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#21737 from mgaido91/SPARK-24208.
## What changes were proposed in this pull request?
This PR is proposing a fix for the output data type of ```If``` and ```CaseWhen``` expression. Upon till now, the implementation of exprassions has ignored nullability of nested types from different execution branches and returned the type of the first branch.
This could lead to an unwanted ```NullPointerException``` from other expressions depending on a ```If```/```CaseWhen``` expression.
Example:
```
val rows = new util.ArrayList[Row]()
rows.add(Row(true, ("a", 1)))
rows.add(Row(false, (null, 2)))
val schema = StructType(Seq(
StructField("cond", BooleanType, false),
StructField("s", StructType(Seq(
StructField("val1", StringType, true),
StructField("val2", IntegerType, false)
)), false)
))
val df = spark.createDataFrame(rows, schema)
df
.select(when('cond, struct(lit("x").as("val1"), lit(10).as("val2"))).otherwise('s) as "res")
.select('res.getField("val1"))
.show()
```
Exception:
```
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
...
```
Output schema:
```
root
|-- res.val1: string (nullable = false)
```
## How was this patch tested?
New test cases added into
- DataFrameSuite.scala
- conditionalExpressions.scala
Author: Marek Novotny <mn.mikke@gmail.com>
Closes#21687 from mn-mikke/SPARK-24165.
## What changes were proposed in this pull request?
Currently, when a streaming query has multiple watermark, the policy is to choose the min of them as the global watermark. This is safe to do as the global watermark moves with the slowest stream, and is therefore is safe as it does not unexpectedly drop some data as late, etc. While this is indeed the safe thing to do, in some cases, you may want the watermark to advance with the fastest stream, that is, take the max of multiple watermarks. This PR is to add that configuration. It makes the following changes.
- Adds a configuration to specify max as the policy.
- Saves the configuration in OffsetSeqMetadata because changing it in the middle can lead to unpredictable results.
- For old checkpoints without the configuration, it assumes the default policy as min (irrespective of the policy set at the session where the query is being restarted). This is to ensure that existing queries are affected in any way.
TODO
- [ ] Add a test for recovery from existing checkpoints.
## How was this patch tested?
New unit test
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21701 from tdas/SPARK-24730.
## What changes were proposed in this pull request?
Support the LIMIT operator in structured streaming.
For streams in append or complete output mode, a stream with a LIMIT operator will return no more than the specified number of rows. LIMIT is still unsupported for the update output mode.
This change reverts e4fee395ec as part of it because it is a better and more complete implementation.
## How was this patch tested?
New and existing unit tests.
Author: Mukul Murthy <mukul.murthy@gmail.com>
Closes#21662 from mukulmurthy/SPARK-24662.
## What changes were proposed in this pull request?
SPARK-22893 tried to unify error messages about dataTypes. Unfortunately, still many places were missing the `simpleString` method in other to have the same representation everywhere.
The PR unified the messages using alway the simpleString representation of the dataTypes in the messages.
## How was this patch tested?
existing/modified UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21321 from mgaido91/SPARK-24268.
## What changes were proposed in this pull request?
Implement map_concat high order function.
This implementation does not pick a winner when the specified maps have overlapping keys. Therefore, this implementation preserves existing duplicate keys in the maps and potentially introduces new duplicates (After discussion with ueshin, we settled on option 1 from [here](https://issues.apache.org/jira/browse/SPARK-23936?focusedCommentId=16464245&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16464245)).
## How was this patch tested?
New tests
Manual tests
Run all sbt SQL tests
Run all pyspark sql tests
Author: Bruce Robbins <bersprockets@gmail.com>
Closes#21073 from bersprockets/SPARK-23936.
## What changes were proposed in this pull request?
We should use `DataType.sameType` to compare element type in `ArrayContains`, otherwise nullability affects comparison result.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#21724 from viirya/SPARK-24749.
## What changes were proposed in this pull request?
SQL `Aggregator` with output type `Option[Boolean]` creates column of type `StructType`. It's not in consistency with a Dataset of similar java class.
This changes the way `definedByConstructorParams` checks given type. For `Option[_]`, it goes to check its type argument.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#21611 from viirya/SPARK-24569.
## What changes were proposed in this pull request?
We can support type coercion between `StructType`s where all the internal types are compatible.
## How was this patch tested?
Added tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21713 from ueshin/issues/SPARK-24737/structtypecoercion.
## What changes were proposed in this pull request?
If table is renamed to a existing new location, data won't show up.
```
scala> Seq("hello").toDF("a").write.format("parquet").saveAsTable("t")
scala> sql("select * from t").show()
+-----+
| a|
+-----+
|hello|
+-----+
scala> sql("alter table t rename to test")
res2: org.apache.spark.sql.DataFrame = []
scala> sql("select * from test").show()
+---+
| a|
+---+
+---+
```
The file layout is like
```
$ tree test
test
├── gabage
└── t
├── _SUCCESS
└── part-00000-856b0f10-08f1-42d6-9eb3-7719261f3d5e-c000.snappy.parquet
```
In Hive, if the new location exists, the renaming will fail even the location is empty.
We should have the same validation in Catalog, in case of unexpected bugs.
## How was this patch tested?
New unit test.
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21655 from gengliangwang/validate_rename_table.
## What changes were proposed in this pull request?
Current code block manipulation API is immature and hacky. We need a formal API to manipulate code blocks.
The basic idea is making `JavaCode` as `TreeNode`. So we can use familiar `transform` API to manipulate code blocks and expressions in code blocks.
For example, we can replace `SimpleExprValue` in a code block like this:
```scala
code.transformExprValues {
case SimpleExprValue("1 + 1", _) => aliasedParam
}
```
The example use case is splitting code to methods.
For example, we have an `ExprCode` containing generated code. But it is too long and we need to split it as method. Because statement-based expressions can't be directly passed into. We need to transform them as variables first:
```scala
def getExprValues(block: Block): Set[ExprValue] = block match {
case c: CodeBlock =>
c.blockInputs.collect {
case e: ExprValue => e
}.toSet
case _ => Set.empty
}
def currentCodegenInputs(ctx: CodegenContext): Set[ExprValue] = {
// Collects current variables in ctx.currentVars and ctx.INPUT_ROW.
// It looks roughly like...
ctx.currentVars.flatMap { v =>
getExprValues(v.code) ++ Set(v.value, v.isNull)
}.toSet + ctx.INPUT_ROW
}
// A code block of an expression contains too long code, making it as method
if (eval.code.length > 1024) {
val setIsNull = if (!eval.isNull.isInstanceOf[LiteralValue]) {
...
} else {
""
}
// Pick up variables and statements necessary to pass in.
val currentVars = currentCodegenInputs(ctx)
val varsPassIn = getExprValues(eval.code).intersect(currentVars)
val aliasedExprs = HashMap.empty[SimpleExprValue, VariableValue]
// Replace statement-based expressions which can't be directly passed in the method.
val newCode = eval.code.transform {
case block =>
block.transformExprValues {
case s: SimpleExprValue(_, javaType) if varsPassIn.contains(s) =>
if (aliasedExprs.contains(s)) {
aliasedExprs(s)
} else {
val aliasedVariable = JavaCode.variable(ctx.freshName("aliasedVar"), javaType)
aliasedExprs += s -> aliasedVariable
varsPassIn += aliasedVariable
aliasedVariable
}
}
}
val params = varsPassIn.filter(!_.isInstanceOf[SimpleExprValue])).map { variable =>
s"${variable.javaType.getName} ${variable.variableName}"
}.mkString(", ")
val funcName = ctx.freshName("nodeName")
val javaType = CodeGenerator.javaType(dataType)
val newValue = JavaCode.variable(ctx.freshName("value"), dataType)
val funcFullName = ctx.addNewFunction(funcName,
s"""
|private $javaType $funcName($params) {
| $newCode
| $setIsNull
| return ${eval.value};
|}
""".stripMargin))
eval.value = newValue
val args = varsPassIn.filter(!_.isInstanceOf[SimpleExprValue])).map { variable =>
s"${variable.variableName}"
}
// Create a code block to assign statements to aliased variables.
val createVariables = aliasedExprs.foldLeft(EmptyBlock) { (block, (statement, variable)) =>
block + code"${statement.javaType.getName} $variable = $statement;"
}
eval.code = createVariables + code"$javaType $newValue = $funcFullName($args);"
}
```
## How was this patch tested?
Added unite tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#21405 from viirya/codeblock-api.
## What changes were proposed in this pull request?
As mentioned in https://github.com/apache/spark/pull/21586 , `Cast.mayTruncate` is not 100% safe, string to boolean is allowed. Since changing `Cast.mayTruncate` also changes the behavior of Dataset, here I propose to add a new `Cast.canSafeCast` for partition pruning.
## How was this patch tested?
new test cases
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21712 from cloud-fan/safeCast.
## What changes were proposed in this pull request?
The `Blocks` class in `JavaCode` class hierarchy is not necessary. Its function can be taken by `CodeBlock`. We should remove it to make simpler class hierarchy.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#21619 from viirya/SPARK-24635.
## What changes were proposed in this pull request?
Since SPARK-24250 has been resolved, executors correctly references user-defined configurations. So, this pr added a static config to control cache size for generated classes in `CodeGenerator`.
## How was this patch tested?
Added tests in `ExecutorSideSQLConfSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21705 from maropu/SPARK-24727.
## What changes were proposed in this pull request?
Currently we don't allow type coercion between maps.
We can support type coercion between MapTypes where both the key types and the value types are compatible.
## How was this patch tested?
Added tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21703 from ueshin/issues/SPARK-24732/maptypecoercion.
## What changes were proposed in this pull request?
In the PR, I propose to add new function - *schema_of_json()* which infers schema of JSON string literal. The result of the function is a string containing a schema in DDL format.
One of the use cases is using of *schema_of_json()* in the combination with *from_json()*. Currently, _from_json()_ requires a schema as a mandatory argument. The *schema_of_json()* function will allow to point out an JSON string as an example which has the same schema as the first argument of _from_json()_. For instance:
```sql
select from_json(json_column, schema_of_json('{"c1": [0], "c2": [{"c3":0}]}'))
from json_table;
```
## How was this patch tested?
Added new test to `JsonFunctionsSuite`, `JsonExpressionsSuite` and SQL tests to `json-functions.sql`
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21686 from MaxGekk/infer_schema_json.
## What changes were proposed in this pull request?
Use SQLConf for PySpark to manage all sql configs, drop all the hard code in config usage.
## How was this patch tested?
Existing UT.
Author: Yuanjian Li <xyliyuanjian@gmail.com>
Closes#21648 from xuanyuanking/SPARK-24665.
## What changes were proposed in this pull request?
The ColumnPruning rule tries adding an extra Project if an input node produces fields more than needed, but as a post-processing step, it needs to remove the lower Project in the form of "Project - Filter - Project" otherwise it would conflict with PushPredicatesThroughProject and would thus cause a infinite optimization loop. The current post-processing method is defined as:
```
private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan transform {
case p1 Project(_, f Filter(_, p2 Project(_, child)))
if p2.outputSet.subsetOf(child.outputSet) =>
p1.copy(child = f.copy(child = child))
}
```
This method works well when there is only one Filter but would not if there's two or more Filters. In this case, there is a deterministic filter and a non-deterministic filter so they stay as separate filter nodes and cannot be combined together.
An simplified illustration of the optimization process that forms the infinite loop is shown below (F1 stands for the 1st filter, F2 for the 2nd filter, P for project, S for scan of relation, PredicatePushDown as abbrev. of PushPredicatesThroughProject):
```
F1 - F2 - P - S
PredicatePushDown => F1 - P - F2 - S
ColumnPruning => F1 - P - F2 - P - S
=> F1 - P - F2 - S (Project removed)
PredicatePushDown => P - F1 - F2 - S
ColumnPruning => P - F1 - P - F2 - S
=> P - F1 - P - F2 - P - S
=> P - F1 - F2 - P - S (only one Project removed)
RemoveRedundantProject => F1 - F2 - P - S (goes back to the loop start)
```
So the problem is the ColumnPruning rule adds a Project under a Filter (and fails to remove it in the end), and that new Project triggers PushPredicateThroughProject. Once the filters have been push through the Project, a new Project will be added by the ColumnPruning rule and this goes on and on.
The fix should be when adding Projects, the rule applies top-down, but later when removing extra Projects, the process should go bottom-up to ensure all extra Projects can be matched.
## How was this patch tested?
Added a optimization rule test in ColumnPruningSuite; and a end-to-end test in SQLQuerySuite.
Author: maryannxue <maryannxue@apache.org>
Closes#21674 from maryannxue/spark-24696.
## What changes were proposed in this pull request?
Provide a continuous processing implementation of coalesce(1), as well as allowing aggregates on top of it.
The changes in ContinuousQueuedDataReader and such are to use split.index (the ID of the partition within the RDD currently being compute()d) rather than context.partitionId() (the partition ID of the scheduled task within the Spark job - that is, the post coalesce writer). In the absence of a narrow dependency, these values were previously always the same, so there was no need to distinguish.
## How was this patch tested?
new unit test
Author: Jose Torres <torres.joseph.f+github@gmail.com>
Closes#21560 from jose-torres/coalesce.
## What changes were proposed in this pull request?
Set createTime for every hive partition created in Spark SQL, which could be used to manage data lifecycle in Hive warehouse. We found that almost every partition modified by spark sql has not been set createTime.
```
mysql> select * from partitions where create_time=0 limit 1\G;
*************************** 1. row ***************************
PART_ID: 1028584
CREATE_TIME: 0
LAST_ACCESS_TIME: 1502203611
PART_NAME: date=20170130
SD_ID: 1543605
TBL_ID: 211605
LINK_TARGET_ID: NULL
1 row in set (0.27 sec)
```
## How was this patch tested?
N/A
Author: debugger87 <yangchaozhong.2009@gmail.com>
Author: Chaozhong Yang <yangchaozhong.2009@gmail.com>
Closes#18900 from debugger87/fix/set-create-time-for-hive-partition.
## What changes were proposed in this pull request?
Address comments in #21370 and add more test.
## How was this patch tested?
Enhance test in pyspark/sql/test.py and DataFrameSuite
Author: Yuanjian Li <xyliyuanjian@gmail.com>
Closes#21553 from xuanyuanking/SPARK-24215-follow.
## What changes were proposed in this pull request?
This pr is a follow-up pr of #21155.
The #21155 removed unnecessary import at that time, but the import became necessary in another pr.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21646 from ueshin/issues/SPARK-23927/fup1.
## What changes were proposed in this pull request?
The PR adds the SQL function ```sequence```.
https://issues.apache.org/jira/browse/SPARK-23927
The behavior of the function is based on Presto's one.
Ref: https://prestodb.io/docs/current/functions/array.html
- ```sequence(start, stop) → array<bigint>```
Generate a sequence of integers from ```start``` to ```stop```, incrementing by ```1``` if ```start``` is less than or equal to ```stop```, otherwise ```-1```.
- ```sequence(start, stop, step) → array<bigint>```
Generate a sequence of integers from ```start``` to ```stop```, incrementing by ```step```.
- ```sequence(start_date, stop_date) → array<date>```
Generate a sequence of dates from ```start_date``` to ```stop_date```, incrementing by ```interval 1 day``` if ```start_date``` is less than or equal to ```stop_date```, otherwise ```- interval 1 day```.
- ```sequence(start_date, stop_date, step_interval) → array<date>```
Generate a sequence of dates from ```start_date``` to ```stop_date```, incrementing by ```step_interval```. The type of ```step_interval``` is ```CalendarInterval```.
- ```sequence(start_timestemp, stop_timestemp) → array<timestamp>```
Generate a sequence of timestamps from ```start_timestamps``` to ```stop_timestamps```, incrementing by ```interval 1 day``` if ```start_date``` is less than or equal to ```stop_date```, otherwise ```- interval 1 day```.
- ```sequence(start_timestamp, stop_timestamp, step_interval) → array<timestamp>```
Generate a sequence of timestamps from ```start_timestamps``` to ```stop_timestamps```, incrementing by ```step_interval```. The type of ```step_interval``` is ```CalendarInterval```.
## How was this patch tested?
Added unit tests.
Author: Vayda, Oleksandr: IT (PRG) <Oleksandr.Vayda@barclayscapital.com>
Closes#21155 from wajda/feature/array-api-sequence.
## What changes were proposed in this pull request?
In PR, I propose new behavior of `size(null)` under the config flag `spark.sql.legacy.sizeOfNull`. If the former one is disabled, the `size()` function returns `null` for `null` input. By default the `spark.sql.legacy.sizeOfNull` is enabled to keep backward compatibility with previous versions. In that case, `size(null)` returns `-1`.
## How was this patch tested?
Modified existing tests for the `size()` function to check new behavior (`null`) and old one (`-1`).
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21598 from MaxGekk/legacy-size-of-null.
## What changes were proposed in this pull request?
Fix `GenericArrayData.equals`, so that it respects the actual types of the elements.
e.g. an instance that represents an `array<int>` and another instance that represents an `array<long>` should be considered incompatible, and thus should return false for `equals`.
`GenericArrayData` doesn't keep any schema information by itself, and rather relies on the Java objects referenced by its `array` field's elements to keep track of their own object types. So, the most straightforward way to respect their types is to call `equals` on the elements, instead of using Scala's `==` operator, which can have semantics that are not always desirable:
```
new java.lang.Integer(123) == new java.lang.Long(123L) // true in Scala
new java.lang.Integer(123).equals(new java.lang.Long(123L)) // false in Scala
```
## How was this patch tested?
Added unit test in `ComplexDataSuite`
Author: Kris Mok <kris.mok@databricks.com>
Closes#21643 from rednaxelafx/fix-genericarraydata-equals.
## What changes were proposed in this pull request?
Issue antlr/antlr4#781 has already been fixed, so the workaround of extracting the pattern into a separate rule is no longer needed. The presto already removed it: https://github.com/prestodb/presto/pull/10744.
## How was this patch tested?
Existing tests
Author: Yuming Wang <yumwang@ebay.com>
Closes#21641 from wangyum/ANTLR-780.
## What changes were proposed in this pull request?
Presto's implementation accepts arbitrary arrays of primitive types as an input:
```
presto> SELECT array_join(ARRAY [1, 2, 3], ', ');
_col0
---------
1, 2, 3
(1 row)
```
This PR proposes to implement a type coercion rule for ```array_join``` function that converts arrays of primitive as well as non-primitive types to arrays of string.
## How was this patch tested?
New test cases add into:
- sql-tests/inputs/typeCoercion/native/arrayJoin.sql
- DataFrameFunctionsSuite.scala
Author: Marek Novotny <mn.mikke@gmail.com>
Closes#21620 from mn-mikke/SPARK-24636.
## What changes were proposed in this pull request?
Followup to the discussion of the added conf in SPARK-24324 which allows assignment by column position only. This conf is to preserve old behavior and will be removed in future releases, so it should have a note to indicate that.
## How was this patch tested?
NA
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#21637 from BryanCutler/arrow-groupedMap-conf-deprecate-followup-SPARK-24324.
## What changes were proposed in this pull request?
In function array_zip, when split is required by the high number of arguments, a codegen error can happen.
The PR fixes codegen for cases when splitting the code is required.
## How was this patch tested?
added UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21621 from mgaido91/SPARK-24633.
## What changes were proposed in this pull request?
Currently, a `pandas_udf` of type `PandasUDFType.GROUPED_MAP` will assign the resulting columns based on index of the return pandas.DataFrame. If a new DataFrame is returned and constructed using a dict, then the order of the columns could be arbitrary and be different than the defined schema for the UDF. If the schema types still match, then no error will be raised and the user will see column names and column data mixed up.
This change will first try to assign columns using the return type field names. If a KeyError occurs, then the column index is checked if it is string based. If so, then the error is raised as it is most likely a naming mistake, else it will fallback to assign columns by position and raise a TypeError if the field types do not match.
## How was this patch tested?
Added a test that returns a new DataFrame with column order different than the schema.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#21427 from BryanCutler/arrow-grouped-map-mixesup-cols-SPARK-24324.
## What changes were proposed in this pull request?
Currently, restrictions in JSONOptions for `encoding` and `lineSep` are the same for read and for write. For example, a requirement for `lineSep` in the code:
```
df.write.option("encoding", "UTF-32BE").json(file)
```
doesn't allow to skip `lineSep` and use its default value `\n` because it throws the exception:
```
equirement failed: The lineSep option must be specified for the UTF-32BE encoding
java.lang.IllegalArgumentException: requirement failed: The lineSep option must be specified for the UTF-32BE encoding
```
In the PR, I propose to separate JSONOptions in read and write, and make JSONOptions in write less restrictive.
## How was this patch tested?
Added new test for blacklisted encodings in read. And the `lineSep` option was removed in write for some tests.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>
Closes#21247 from MaxGekk/json-options-in-write.
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/19080 we simplified the distribution/partitioning framework, and make all the join-like operators require `HashClusteredDistribution` from children. Unfortunately streaming join operator was missed.
This can cause wrong result. Think about
```
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]
val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
```
The physical plan is
```
*(3) Project [a#5]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
:- Exchange hashpartitioning(a#5, b#6, 5)
: +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6]
: +- StreamingRelation MemoryStream[value#1], [value#1]
+- Exchange hashpartitioning(b#11, 5)
+- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11]
+- StreamingRelation MemoryStream[value#3], [value#3]
```
The left table is hash partitioned by `a, b`, while the right table is hash partitioned by `b`. This means, we may have a matching record that is in different partitions, which should be in the output but not.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21587 from cloud-fan/join.
## What changes were proposed in this pull request?
In the PR, I propose to automatically convert a `Literal` with `Char` type to a `Literal` of `String` type. Currently, the following code:
```scala
val df = Seq("Amsterdam", "San Francisco", "London").toDF("city")
df.where($"city".contains('o')).show(false)
```
fails with the exception:
```
Unsupported literal type class java.lang.Character o
java.lang.RuntimeException: Unsupported literal type class java.lang.Character o
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
```
The PR fixes this issue by converting `char` to `string` of length `1`. I believe it makes sense to does not differentiate `char` and `string(1)` in _a unified, multi-language data platform_ like Spark which supports languages like Python/R.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>
Closes#21578 from MaxGekk/support-char-literals.
## What changes were proposed in this pull request?
Add array_distinct to remove duplicate value from the array.
## How was this patch tested?
Add unit tests
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#21050 from huaxingao/spark-23912.
## What changes were proposed in this pull request?
As discussed [before](https://github.com/apache/spark/pull/19193#issuecomment-393726964), this PR prohibits window expressions inside WHERE and HAVING clauses.
## How was this patch tested?
This PR comes with a dedicated unit test.
Author: aokolnychyi <anton.okolnychyi@sap.com>
Closes#21580 from aokolnychyi/spark-24575.
## What changes were proposed in this pull request?
UDF series UDFXPathXXXX allow users to pass carefully crafted XML to access arbitrary files. Spark does not have built-in access control. When users use the external access control library, users might bypass them and access the file contents.
This PR basically patches the Hive fix to Apache Spark. https://issues.apache.org/jira/browse/HIVE-18879
## How was this patch tested?
A unit test case
Author: Xiao Li <gatorsmile@gmail.com>
Closes#21549 from gatorsmile/xpathSecurity.
## What changes were proposed in this pull request?
When creating tuple expression encoders, we should give the serializer expressions of tuple items correct names, so we can have correct output schema when we use such tuple encoders.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#21576 from viirya/SPARK-24548.
## What changes were proposed in this pull request?
This pr added a new JSON option `dropFieldIfAllNull ` to ignore column of all null values or empty array/struct during JSON schema inference.
## How was this patch tested?
Added tests in `JsonSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Author: Xiangrui Meng <meng@databricks.com>
Closes#20929 from maropu/SPARK-23772.
## What changes were proposed in this pull request?
The supported java.math.BigInteger type is not mentioned in the javadoc of Encoders.bean()
## How was this patch tested?
only Javadoc fix
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: James Yu <james@ispot.tv>
Closes#21544 from yuj/master.
## What changes were proposed in this pull request?
This PR fixes possible overflow in int add or multiply. In particular, their overflows in multiply are detected by [Spotbugs](https://spotbugs.github.io/)
The following assignments may cause overflow in right hand side. As a result, the result may be negative.
```
long = int * int
long = int + int
```
To avoid this problem, this PR performs cast from int to long in right hand side.
## How was this patch tested?
Existing UTs.
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21481 from kiszk/SPARK-24452.
## What changes were proposed in this pull request?
In the PR, I propose to support any DataType represented as DDL string for the from_json function. After the changes, it will be possible to specify `MapType` in SQL like:
```sql
select from_json('{"a":1, "b":2}', 'map<string, int>')
```
and in Scala (similar in other languages)
```scala
val in = Seq("""{"a": {"b": 1}}""").toDS()
val schema = "map<string, map<string, int>>"
val out = in.select(from_json($"value", schema, Map.empty[String, String]))
```
## How was this patch tested?
Added a couple sql tests and modified existing tests for Python and Scala. The former tests were modified because it is not imported for them in which format schema for `from_json` is provided.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21550 from MaxGekk/from_json-ddl-schema.
## What changes were proposed in this pull request?
If you construct catalyst trees using `scala.collection.immutable.Stream` you can run into situations where valid transformations do not seem to have any effect. There are two causes for this behavior:
- `Stream` is evaluated lazily. Note that default implementation will generally only evaluate a function for the first element (this makes testing a bit tricky).
- `TreeNode` and `QueryPlan` use side effects to detect if a tree has changed. Mapping over a stream is lazy and does not need to trigger this side effect. If this happens the node will invalidly assume that it did not change and return itself instead if the newly created node (this is for GC reasons).
This PR fixes this issue by forcing materialization on streams in `TreeNode` and `QueryPlan`.
## How was this patch tested?
Unit tests were added to `TreeNodeSuite` and `LogicalPlanSuite`. An integration test was added to the `PlannerSuite`
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#21539 from hvanhovell/SPARK-24500.
## What changes were proposed in this pull request?
Currently a "StreamingQueryListener" can only be registered programatically. We could have a new config "spark.sql.streamingQueryListeners" similar to "spark.sql.queryExecutionListeners" and "spark.extraListeners" for users to register custom streaming listeners.
## How was this patch tested?
New unit test and running example programs.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Arun Mahadevan <arunm@apache.org>
Closes#21504 from arunmahadevan/SPARK-24480.
## What changes were proposed in this pull request?
This PR enables using a grouped aggregate pandas UDFs as window functions. The semantics is the same as using SQL aggregation function as window functions.
```
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> from pyspark.sql import Window
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> pandas_udf("double", PandasUDFType.GROUPED_AGG)
... def mean_udf(v):
... return v.mean()
>>> w = Window.partitionBy('id')
>>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
+---+----+------+
```
The scope of this PR is somewhat limited in terms of:
(1) Only supports unbounded window, which acts essentially as group by.
(2) Only supports aggregation functions, not "transform" like window functions (n -> n mapping)
Both of these are left as future work. Especially, (1) needs careful thinking w.r.t. how to pass rolling window data to python efficiently. (2) is a bit easier but does require more changes therefore I think it's better to leave it as a separate PR.
## How was this patch tested?
WindowPandasUDFTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#21082 from icexelloss/SPARK-22239-window-udf.
## What changes were proposed in this pull request?
The PR adds the SQL function `map_from_arrays`. The behavior of the function is based on Presto's `map`. Since SparkSQL already had a `map` function, we prepared the different name for this behavior.
This function returns returns a map from a pair of arrays for keys and values.
## How was this patch tested?
Added UTs
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#21258 from kiszk/SPARK-23933.
Signed-off-by: DylanGuedes <djmgguedesgmail.com>
## What changes were proposed in this pull request?
Addition of arrays_zip function to spark sql functions.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Unit tests that checks if the results are correct.
Author: DylanGuedes <djmgguedes@gmail.com>
Closes#21045 from DylanGuedes/SPARK-23931.
## What changes were proposed in this pull request?
In SPARK-22036 we introduced the possibility to allow precision loss in arithmetic operations (according to the SQL standard). The implementation was drawn from Hive's one, where Decimals with a negative scale are not allowed in the operations.
The PR handles the case when the scale is negative, removing the assertion that it is not.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21499 from mgaido91/SPARK-24468.
## What changes were proposed in this pull request?
Implemented eval in SortPrefix expression.
## How was this patch tested?
- ran existing sbt SQL tests
- added unit test
- ran existing Python SQL tests
- manual tests: disabling codegen -- patching code to disable beyond what spark.sql.codegen.wholeStage=false can do -- and running sbt SQL tests
Author: Bruce Robbins <bersprockets@gmail.com>
Closes#21231 from bersprockets/sortprefixeval.
## What changes were proposed in this pull request?
This PR explicitly prohibits window functions inside aggregates. Currently, this will cause StackOverflow during analysis. See PR #19193 for previous discussion.
## How was this patch tested?
This PR comes with a dedicated unit test.
Author: aokolnychyi <anton.okolnychyi@sap.com>
Closes#21473 from aokolnychyi/fix-stackoverflow-window-funcs.
## What changes were proposed in this pull request?
Add support for date `extract` function:
```sql
spark-sql> SELECT EXTRACT(YEAR FROM TIMESTAMP '2000-12-16 12:21:13');
2000
```
Supported field same as [Hive](https://github.com/apache/hive/blob/rel/release-2.3.3/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g#L308-L316): `YEAR`, `QUARTER`, `MONTH`, `WEEK`, `DAY`, `DAYOFWEEK`, `HOUR`, `MINUTE`, `SECOND`.
## How was this patch tested?
unit tests
Author: Yuming Wang <yumwang@ebay.com>
Closes#21479 from wangyum/SPARK-23903.
## What changes were proposed in this pull request?
add array_remove to remove all elements that equal element from array
## How was this patch tested?
add unit tests
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#21069 from huaxingao/spark-23920.
## What changes were proposed in this pull request?
`format_number` support user specifed format as argument. For example:
```sql
spark-sql> SELECT format_number(12332.123456, '##################.###');
12332.123
```
## How was this patch tested?
unit test
Author: Yuming Wang <yumwang@ebay.com>
Closes#21010 from wangyum/SPARK-23900.
## What changes were proposed in this pull request?
When two `In` operators are created with the same list of values, but different order, we are considering them as semantically different. This is wrong, since they have the same semantic meaning.
The PR adds a canonicalization rule which orders the literals in the `In` operator so the semantic equality works properly.
## How was this patch tested?
added UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21331 from mgaido91/SPARK-24276.
## What changes were proposed in this pull request?
The PR adds the masking function as they are described in Hive's documentation: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DataMaskingFunctions.
This means that only `string`s are accepted as parameter for the masking functions.
## How was this patch tested?
added UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21246 from mgaido91/SPARK-23901.
## What changes were proposed in this pull request?
This pr fixed an issue when having multiple distinct aggregations having the same argument set, e.g.,
```
scala>: paste
val df = sql(
s"""SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
| FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
""".stripMargin)
java.lang.RuntimeException
You hit a query analyzer bug. Please report your query to Spark user mailing list.
```
The root cause is that `RewriteDistinctAggregates` can't detect multiple distinct aggregations if they have the same argument set. This pr modified code so that `RewriteDistinctAggregates` could count the number of aggregate expressions with `isDistinct=true`.
## How was this patch tested?
Added tests in `DataFrameAggregateSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21443 from maropu/SPARK-24369.
## What changes were proposed in this pull request?
Implemented **`isInCollection `** in DataFrame API for both Scala and Java, so users can do
```scala
val profileDF = Seq(
Some(1), Some(2), Some(3), Some(4),
Some(5), Some(6), Some(7), None
).toDF("profileID")
val validUsers: Seq[Any] = Seq(6, 7.toShort, 8L, "3")
val result = profileDF.withColumn("isValid", $"profileID". isInCollection(validUsers))
result.show(10)
"""
+---------+-------+
|profileID|isValid|
+---------+-------+
| 1| false|
| 2| false|
| 3| true|
| 4| false|
| 5| false|
| 6| true|
| 7| true|
| null| null|
+---------+-------+
""".stripMargin
```
## How was this patch tested?
Several unit tests are added.
Author: DB Tsai <d_tsai@apple.com>
Closes#21416 from dbtsai/optimize-set.
## What changes were proposed in this pull request?
This PR adds several unit tests along the `cols NOT IN (subquery)` pathway. There are a scattering of tests here and there which cover this codepath, but there doesn't seem to be a unified unit test of the correctness of null-aware anti joins anywhere. I have also added a brief explanation of how this expression behaves in SubquerySuite. Lastly, I made some clarifying changes in the NOT IN pathway in RewritePredicateSubquery.
## How was this patch tested?
Added unit tests! There should be no behavioral change in this PR.
Author: Miles Yucht <miles@databricks.com>
Closes#21425 from mgyucht/spark-24381.
## What changes were proposed in this pull request?
Currently, users are getting the following error messages on type conversions:
```
scala.MatchError: test (of class java.lang.String)
```
The message doesn't give any clues to the users where in the schema the error happened. In this PR, I would like to improve the error message like:
```
The value (test) of the type (java.lang.String) cannot be converted to struct<f1:int>
```
## How was this patch tested?
Added tests for converting of wrong values to `struct`, `map`, `array`, `string` and `decimal`.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21410 from MaxGekk/type-conv-error.
## What changes were proposed in this pull request?
uniVocity parser allows to specify only required column names or indexes for [parsing](https://www.univocity.com/pages/parsers-tutorial) like:
```
// Here we select only the columns by their indexes.
// The parser just skips the values in other columns
parserSettings.selectIndexes(4, 0, 1);
CsvParser parser = new CsvParser(parserSettings);
```
In this PR, I propose to extract indexes from required schema and pass them into the CSV parser. Benchmarks show the following improvements in parsing of 1000 columns:
```
Select 100 columns out of 1000: x1.76
Select 1 column out of 1000: x2
```
**Note**: Comparing to current implementation, the changes can return different result for malformed rows in the `DROPMALFORMED` and `FAILFAST` modes if only subset of all columns is requested. To have previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
## How was this patch tested?
It was tested by new test which selects 3 columns out of 15, by existing tests and by new benchmarks.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>
Closes#21415 from MaxGekk/csv-column-pruning2.
## What changes were proposed in this pull request?
In current parquet version,the conf ENABLE_JOB_SUMMARY is deprecated.
When writing to Parquet files, the warning message
```WARN org.apache.parquet.hadoop.ParquetOutputFormat: Setting parquet.enable.summary-metadata is deprecated, please use parquet.summary.metadata.level```
keeps showing up.
From https://github.com/apache/parquet-mr/blame/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L164 we can see that we should use JOB_SUMMARY_LEVEL.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21411 from gengliangwang/summaryLevel.
## What changes were proposed in this pull request?
Fix `date_trunc` function incorrect examples.
## How was this patch tested?
N/A
Author: Yuming Wang <yumwang@ebay.com>
Closes#21423 from wangyum/SPARK-24378.
## What changes were proposed in this pull request?
### Fixes `ClassCastException` in the `array_position` function - [SPARK-24350](https://issues.apache.org/jira/browse/SPARK-24350)
When calling `array_position` function with a wrong type of the 1st argument an `AnalysisException` should be thrown instead of `ClassCastException`
Example:
```sql
select array_position('foo', 'bar')
```
```
java.lang.ClassCastException: org.apache.spark.sql.types.StringType$ cannot be cast to org.apache.spark.sql.types.ArrayType
at org.apache.spark.sql.catalyst.expressions.ArrayPosition.inputTypes(collectionOperations.scala:1398)
at org.apache.spark.sql.catalyst.expressions.ExpectsInputTypes$class.checkInputDataTypes(ExpectsInputTypes.scala:44)
at org.apache.spark.sql.catalyst.expressions.ArrayPosition.checkInputDataTypes(collectionOperations.scala:1401)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:168)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:168)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveAliases$$assignAliases$1$$anonfun$apply$3.applyOrElse(Analyzer.scala:256)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveAliases$$assignAliases$1$$anonfun$apply$3.applyOrElse(Analyzer.scala:252)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
```
## How was this patch tested?
unit test
Author: Vayda, Oleksandr: IT (PRG) <Oleksandr.Vayda@barclayscapital.com>
Closes#21401 from wajda/SPARK-24350-array_position-error-fix.
## What changes were proposed in this pull request?
Add fallback logic for `UnsafeProjection`. In production we can try to create unsafe projection using codegen implementation. Once any compile error happens, it fallbacks to interpreted implementation.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#21106 from viirya/SPARK-23711.
## What changes were proposed in this pull request?
### Fixes a `scala.MatchError` in the `element_at` operation - [SPARK-24348](https://issues.apache.org/jira/browse/SPARK-24348)
When calling `element_at` with a wrong first operand type an `AnalysisException` should be thrown instead of `scala.MatchError`
*Example:*
```sql
select element_at('foo', 1)
```
results in:
```
scala.MatchError: StringType (of class org.apache.spark.sql.types.StringType$)
at org.apache.spark.sql.catalyst.expressions.ElementAt.inputTypes(collectionOperations.scala:1469)
at org.apache.spark.sql.catalyst.expressions.ExpectsInputTypes$class.checkInputDataTypes(ExpectsInputTypes.scala:44)
at org.apache.spark.sql.catalyst.expressions.ElementAt.checkInputDataTypes(collectionOperations.scala:1478)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:168)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:168)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveAliases$$assignAliases$1$$anonfun$apply$3.applyOrElse(Analyzer.scala:256)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveAliases$$assignAliases$1$$anonfun$apply$3.applyOrElse(Analyzer.scala:252)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
```
## How was this patch tested?
unit tests
Author: Vayda, Oleksandr: IT (PRG) <Oleksandr.Vayda@barclayscapital.com>
Closes#21395 from wajda/SPARK-24348-element_at-error-fix.
## What changes were proposed in this pull request?
This patch tries to implement this [proposal](https://github.com/apache/spark/pull/19813#issuecomment-354045400) to add an API for handling expression code generation. It should allow us to manipulate how to generate codes for expressions.
In details, this adds an new abstraction `CodeBlock` to `JavaCode`. `CodeBlock` holds the code snippet and inputs for generating actual java code.
For example, in following java code:
```java
int ${variable} = 1;
boolean ${isNull} = ${CodeGenerator.defaultValue(BooleanType)};
```
`variable`, `isNull` are two `VariableValue` and `CodeGenerator.defaultValue(BooleanType)` is a string. They are all inputs to this code block and held by `CodeBlock` representing this code.
For codegen, we provide a specified string interpolator `code`, so you can define a code like this:
```scala
val codeBlock =
code"""
|int ${variable} = 1;
|boolean ${isNull} = ${CodeGenerator.defaultValue(BooleanType)};
""".stripMargin
// Generates actual java code.
codeBlock.toString
```
Because those inputs are held separately in `CodeBlock` before generating code, we can safely manipulate them, e.g., replacing statements to aliased variables, etc..
## How was this patch tested?
Added tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#21193 from viirya/SPARK-24121.
## What changes were proposed in this pull request?
uniVocity parser allows to specify only required column names or indexes for [parsing](https://www.univocity.com/pages/parsers-tutorial) like:
```
// Here we select only the columns by their indexes.
// The parser just skips the values in other columns
parserSettings.selectIndexes(4, 0, 1);
CsvParser parser = new CsvParser(parserSettings);
```
In this PR, I propose to extract indexes from required schema and pass them into the CSV parser. Benchmarks show the following improvements in parsing of 1000 columns:
```
Select 100 columns out of 1000: x1.76
Select 1 column out of 1000: x2
```
**Note**: Comparing to current implementation, the changes can return different result for malformed rows in the `DROPMALFORMED` and `FAILFAST` modes if only subset of all columns is requested. To have previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
## How was this patch tested?
It was tested by new test which selects 3 columns out of 15, by existing tests and by new benchmarks.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21296 from MaxGekk/csv-column-pruning.
## What changes were proposed in this pull request?
The interpreted evaluation of several collection operations works only for simple datatypes. For complex data types, for instance, `array_contains` it returns always `false`. The list of the affected functions is `array_contains`, `array_position`, `element_at` and `GetMapValue`.
The PR fixes the behavior for all the datatypes.
## How was this patch tested?
added UT
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21361 from mgaido91/SPARK-24313.
## What changes were proposed in this pull request?
Extract common code from `Divide`/`Remainder` to a new base trait, `DivModLike`.
Further refactoring to make `Pmod` work with `DivModLike` is to be done as a separate task.
## How was this patch tested?
Existing tests in `ArithmeticExpressionSuite` covers the functionality.
Author: Kris Mok <kris.mok@databricks.com>
Closes#21367 from rednaxelafx/catalyst-divmod.
re-submit https://github.com/apache/spark/pull/21299 which broke build.
A few new commits are added to fix the SQLConf problem in `JsonSchemaInference.infer`, and prevent us to access `SQLConf` in DAGScheduler event loop thread.
## What changes were proposed in this pull request?
Previously in #20136 we decided to forbid tasks to access `SQLConf`, because it doesn't work and always give you the default conf value. In #21190 we fixed the check and all the places that violate it.
Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like
```
val someConf = conf.getXXX
child.execute().mapPartitions {
if (someConf == ...) ...
...
}
```
However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is `DataType.sameType`, and see how many changes were made in #21190 .
When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators.
This PR proposes to allow tasks to access `SQLConf`. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild the `SQLConf` from job properties.
## How was this patch tested?
a new test suite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21376 from cloud-fan/config.
## What changes were proposed in this pull request?
Previously in #20136 we decided to forbid tasks to access `SQLConf`, because it doesn't work and always give you the default conf value. In #21190 we fixed the check and all the places that violate it.
Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like
```
val someConf = conf.getXXX
child.execute().mapPartitions {
if (someConf == ...) ...
...
}
```
However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is `DataType.sameType`, and see how many changes were made in #21190 .
When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators.
This PR proposes to allow tasks to access `SQLConf`. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild the `SQLConf` from job properties.
## How was this patch tested?
a new test suite
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21299 from cloud-fan/config.
The old code was relying on a core configuration and extended its
default value to include things that redact desired things in the
app's environment. Instead, add a SQL-specific option for which
options to redact, and apply both the core and SQL-specific rules
when redacting the options in the save command.
This is a little sub-optimal since it adds another config, but it
retains the current default behavior.
While there I also fixed a typo and a couple of minor config API
usage issues in the related redaction option that SQL already had.
Tested with existing unit tests, plus checking the env page on
a shell UI.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#21158 from vanzin/SPARK-23850.
## What changes were proposed in this pull request?
Physical plan of `select colA from t order by colB limit M` is `TakeOrderedAndProject`;
Currently `TakeOrderedAndProject` sorts data in memory, see https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L158
We can add a config – if the number of limit (M) is too big, we can sort by disk. Thus memory issue can be resolved.
## How was this patch tested?
Test added
Author: jinxing <jinxing6042@126.com>
Closes#21252 from jinxing64/SPARK-24193.